6

Ich möchte die mapreduce-Bibliothek verwenden, um alle Entitäten zu aktualisieren, die eine Abfrage erfüllen. Es gibt ein paar Komplikationen:Wie verwendet man MapReduce, um Datenspeicher-Entities, die eine Abfrage erfüllen, gemeinsam zu aktualisieren?

  1. Die Abfrage, die die Entitäten findet Kontrollen zu aktualisieren, wenn der Wert einer bestimmte Eigenschaft „property1“ in einer langen Liste von Werten enthalten ist (~ 10000 Einträge) aus einem CSV
  2. Für jede Entität Datei Erfüllung der Abfrage, eine andere Eigenschaft „property2“ muss aktualisiert werden, um gleich den Wert in der zweiten Spalte und derselben Zeile der cSV-Datei

ich weiß, wie die cSV-Upload Datei zu Blobstore und lesen Sie jede Zeile mit einem Blobstore-Eingabe-Reader. Mir ist auch der Datastore-Eingabe-Leser bekannt, der Entitäten mithilfe einer Abfrage abruft.

Meine Frage ist, wie kann ich eine Mapper-Klasse erstellen, die Eingabedaten aus dem Blobstore liest, holt die Datastore-Entitäten und aktualisiert sie so effizient wie möglich?

+0

Ich bezweifle Karte -reduce gibt Ihnen die Leistung, die Sie wollen. Normalerweise arbeitet map-reduce mit Daten, die auf natürliche Weise in Teile übernommen werden können. Aber in Ihrer Situation, da Sie mit einer CSV-Datei arbeiten, würde der größte Teil der Zeit und des Arbeitsspeichers darin bestehen, diese CSV-Datei in mehrere Zeilen (mehrere Zeichenfolgen) zu zerlegen. Wenn Sie diese CSV-Zeichenfolge als einen einzelnen Stream behandeln, werden Sie von dieser seriellen Operation von readLine() blockiert. Da Ihre Verarbeitung nicht zeitaufwendig ist (korrigieren Sie mich wenn nicht) im Vergleich zu den Kosten der Aufgabenteilung, sehe ich keine Verringerung der Karte in irgendeiner Hinsicht. –

+0

Ich hatte gehofft, zumindest von der Stapelverarbeitung von gets und puts zum Datenspeicher zu profitieren, da die Alternative darin besteht, jede vollständige Entität zu erhalten, eine Eigenschaft zu ändern und in den Datenspeicher zurückzulegen. – Price

+0

Während map-reduce, MR, großartig für die Arbeit an vielen Entitäten ist, könnte die Tatsache, dass es auch den csv verarbeiten müsste, die Dinge verlangsamen. Eine Möglichkeit besteht darin, die CSV auch in den Datenspeicher zu laden, einen MR für alle Entitäten auszuführen und dann im Mapper a.Holen Sie sich den CSV-Typ, um zu sehen, ob die übergebene Entität vorhanden ist. Wenn es sonst aktualisiert wird, überspringen. Nicht der beste Weg, aber der einzige, an den ich denken kann. Zu Ihrer Information: Sie können put_multi verwenden, um Stapel zu erstellen https://cloud.google.com/appengine/docs/python/ndb/functions – Ryan

Antwort

3

Da die Liste der möglichen Werte für property1 lang ist, um eine Abfrage-Filter scheint nicht wie eine gute Option (weil Sie einen IN-Filter verwenden müssten, was eigentlich runs one query per value)

Eine Alternative, bei der MR verwendet wird, besteht darin, die CSV-Datei unter Verwendung eines Map (von property1 bis property2) in den Speicher zu laden und anschließend einen MR-Job zu erstellen, der alle Entitäten iteriert und deren property1 Teil der Schlüssel ist Map, modifiziere es mit dem gemappten Wert.

Wie @Ryan B sagt, müssen Sie MR nicht verwenden, wenn Sie nur Batch-Puts nutzen möchten, da Sie mit dem DatastoreService einen Iterable bis put verwenden können.

+1

Danke! Ich stimme zu, dass die Verwendung einer Abfrage mit IN-Filter für eine so lange Liste sehr ineffizient wäre. Ich habe ein paar kurze Fragen: 1. Wüssten Sie, wie Sie die CSV optimal in eine Map laden können? 2. Wäre es nicht besser, über die Schlüssel der Map zu iterieren und die entsprechende Entity aus dem Datenspeicher zu holen, anstatt über alle Entitäten des Datenspeichers zu iterieren, um zu sehen, ob sie sich in der Map befinden? 3. Wenn ich nur Batch-Puts verwende, woher weiß ich, welche Operationen erfolgreich abgeschlossen wurden? – Price

+0

1. Hängt vom Job ab: Wenn es sich um eine einmalige Aufgabe handelt, möchten Sie möglicherweise sogar programmgesteuert laden (z. B. ein Skript zum Laden der CSV und zum Ausgeben von Java-Code verwenden, der die Karte füllt). Wenn nicht, benutze Blobstore, lade die Datei und wiederhole eine Hintergrundaufgabe (und speicher sie vielleicht in Memcache) 2. Nun, es hängt davon ab, wie spärlich deine Map in Relation zu den Entitäten ist, und ob du einen Index über ** hast property1 ** oder nicht ... aber ja, die andere Option ist das. 3. Lesen Sie puts Dokumentation zur Transaktionsverwaltung: Sie können entscheiden, ob Sie ein Commit/Rollback durchführen oder die Standardkonfiguration verwenden möchten. – marianosimone

2

Sie können einen DatastoreInputReader verwenden und in der Kartenfunktion herausfinden, ob die Eigenschaft1 tatsächlich in der CSV enthalten ist: Das Lesen von einer CSV wäre jedes Mal sehr langsam, was Sie tun können, ist memcache, um diese Information bereitzustellen Es wird nur einmal von seinem eigenen Datastore-Modell gelesen. Um das Datenspeichermodell zu füllen, würde ich empfehlen, property1 als benutzerdefinierte ID jeder Zeile zu verwenden. Auf diese Weise ist die Abfrage einfach. Sie würden den Datenspeicher nur für die Werte aktualisieren, die sich tatsächlich ändern, und den Mutationspool verwenden, um ihn performant zu machen (op.db.Put()). Ich lasse Sie Pseudo-Code (sorry ... Ich habe es nur in Python), wie die verschiedenen Stücke aussehen würde, ich Sie weiter empfehlen, diesen Artikel auf MapReduce auf Google App Engine zu lesen: http://sookocheff.com/posts/2014-04-15-app-engine-mapreduce-api-part-1-the-basics/

#to get the to_dict method 
from google.appengine.ext import ndb 
from mapreduce import operation as op 
from mapreduce.lib import pipeline 
from mapreduce import mapreduce_pipeline 

class TouchPipeline(pipeline.Pipeline): 
    """ 
    Pipeline to update the field of entities that have certain condition 
    """ 

    def run(self, *args, **kwargs): 
     """ run """ 
     mapper_params = { 
      "entity_kind": "yourDatastoreKind", 
     } 
     yield mapreduce_pipeline.MapperPipeline(
      "Update entities that have certain condition", 
      handler_spec="datastore_map", 
      input_reader_spec="mapreduce.input_readers.DatastoreInputReader", 
      params=mapper_params, 
      shards=64) 


class csvrow(ndb.Model): 
    #you dont store property 1 because you are going to use its value as key 
    substitutefield=ndb.StringProperty() 

def create_csv_datastore(): 
    # instead of running this, make a 10,000 row function with each csv value, 
    # or read it from the blobstore, iterate and update the values accordingly 
    for i in range(10000): 
    #here we are using our own key as id of this row and just storing the other column that 
    #eventually will be subtitute if it matches 
    csvrow.get_or_insert('property%s' % i, substitutefield = 'substitute%s').put() 


def queryfromcsv(property1): 
    csvrow=ndb.Key('csvrow', property1).get() 
    if csvrow: 
    return csvrow.substitutefield 
    else: 
    return property1 

def property1InCSV(property1): 
    data = memcache.get(property1) 
    if data is not None: 
     return data 
    else: 
     data = self.queryfromcsv(property1) 
     memcache.add(property1, data, 60) 
     return data 

def datastore_map(entity_type): 
    datastorepropertytocheck = entity_type.property1 
    newvalue = property1InCSV(datastorepropertytocheck) 
    if newvalue!=datastoreproperty: 
    entity_type.property11 = newvalue 
    #use the mutation pool 
    yield op.db.Put(entity)