Sie können einen DatastoreInputReader verwenden und in der Kartenfunktion herausfinden, ob die Eigenschaft1 tatsächlich in der CSV enthalten ist: Das Lesen von einer CSV wäre jedes Mal sehr langsam, was Sie tun können, ist memcache, um diese Information bereitzustellen Es wird nur einmal von seinem eigenen Datastore-Modell gelesen. Um das Datenspeichermodell zu füllen, würde ich empfehlen, property1 als benutzerdefinierte ID jeder Zeile zu verwenden. Auf diese Weise ist die Abfrage einfach. Sie würden den Datenspeicher nur für die Werte aktualisieren, die sich tatsächlich ändern, und den Mutationspool verwenden, um ihn performant zu machen (op.db.Put()). Ich lasse Sie Pseudo-Code (sorry ... Ich habe es nur in Python), wie die verschiedenen Stücke aussehen würde, ich Sie weiter empfehlen, diesen Artikel auf MapReduce auf Google App Engine zu lesen: http://sookocheff.com/posts/2014-04-15-app-engine-mapreduce-api-part-1-the-basics/
#to get the to_dict method
from google.appengine.ext import ndb
from mapreduce import operation as op
from mapreduce.lib import pipeline
from mapreduce import mapreduce_pipeline
class TouchPipeline(pipeline.Pipeline):
"""
Pipeline to update the field of entities that have certain condition
"""
def run(self, *args, **kwargs):
""" run """
mapper_params = {
"entity_kind": "yourDatastoreKind",
}
yield mapreduce_pipeline.MapperPipeline(
"Update entities that have certain condition",
handler_spec="datastore_map",
input_reader_spec="mapreduce.input_readers.DatastoreInputReader",
params=mapper_params,
shards=64)
class csvrow(ndb.Model):
#you dont store property 1 because you are going to use its value as key
substitutefield=ndb.StringProperty()
def create_csv_datastore():
# instead of running this, make a 10,000 row function with each csv value,
# or read it from the blobstore, iterate and update the values accordingly
for i in range(10000):
#here we are using our own key as id of this row and just storing the other column that
#eventually will be subtitute if it matches
csvrow.get_or_insert('property%s' % i, substitutefield = 'substitute%s').put()
def queryfromcsv(property1):
csvrow=ndb.Key('csvrow', property1).get()
if csvrow:
return csvrow.substitutefield
else:
return property1
def property1InCSV(property1):
data = memcache.get(property1)
if data is not None:
return data
else:
data = self.queryfromcsv(property1)
memcache.add(property1, data, 60)
return data
def datastore_map(entity_type):
datastorepropertytocheck = entity_type.property1
newvalue = property1InCSV(datastorepropertytocheck)
if newvalue!=datastoreproperty:
entity_type.property11 = newvalue
#use the mutation pool
yield op.db.Put(entity)
Ich bezweifle Karte -reduce gibt Ihnen die Leistung, die Sie wollen. Normalerweise arbeitet map-reduce mit Daten, die auf natürliche Weise in Teile übernommen werden können. Aber in Ihrer Situation, da Sie mit einer CSV-Datei arbeiten, würde der größte Teil der Zeit und des Arbeitsspeichers darin bestehen, diese CSV-Datei in mehrere Zeilen (mehrere Zeichenfolgen) zu zerlegen. Wenn Sie diese CSV-Zeichenfolge als einen einzelnen Stream behandeln, werden Sie von dieser seriellen Operation von readLine() blockiert. Da Ihre Verarbeitung nicht zeitaufwendig ist (korrigieren Sie mich wenn nicht) im Vergleich zu den Kosten der Aufgabenteilung, sehe ich keine Verringerung der Karte in irgendeiner Hinsicht. –
Ich hatte gehofft, zumindest von der Stapelverarbeitung von gets und puts zum Datenspeicher zu profitieren, da die Alternative darin besteht, jede vollständige Entität zu erhalten, eine Eigenschaft zu ändern und in den Datenspeicher zurückzulegen. – Price
Während map-reduce, MR, großartig für die Arbeit an vielen Entitäten ist, könnte die Tatsache, dass es auch den csv verarbeiten müsste, die Dinge verlangsamen. Eine Möglichkeit besteht darin, die CSV auch in den Datenspeicher zu laden, einen MR für alle Entitäten auszuführen und dann im Mapper a.Holen Sie sich den CSV-Typ, um zu sehen, ob die übergebene Entität vorhanden ist. Wenn es sonst aktualisiert wird, überspringen. Nicht der beste Weg, aber der einzige, an den ich denken kann. Zu Ihrer Information: Sie können put_multi verwenden, um Stapel zu erstellen https://cloud.google.com/appengine/docs/python/ndb/functions – Ryan