6

Ich habe gerade Batch data processing with App Engine session of Google I/O 2010 angesehen, habe einige Teile von MapReduce article from Google Research gelesen und jetzt denke ich, MapReduce on Google App Engine zu verwenden, um ein Empfehlungssystem in Python zu implementieren.MapReduce für mehr als einen Datenspeichertyp in Google App Engine

Ich bevorzuge die Verwendung von appengine-mapreduce anstelle der Task Queue API, da die erste eine einfache Iteration über alle Instanzen, automatisches Batching, automatische Taskverkettung usw. bietet. Das Problem ist: mein Empfehlungssystem muss die Korrelation zwischen Instanzen berechnen von zwei verschiedenen Modellen, dh Instanzen von zwei verschiedenen Arten.

Beispiel: Ich habe diese zwei Modelle: Benutzer und Artikel. Jeder hat eine Liste von Tags als Attribut. Im Folgenden finden Sie die Funktionen zum Berechnen der Korrelation zwischen Benutzern und Elementen. Beachten Sie, dass calculateCorrelation sollte für jede Kombination von Benutzer und Produkte genannt werden:

def calculateCorrelation(user, item): 
    return calculateCorrelationAverage(u.tags, i.tags) 

def calculateCorrelationAverage(tags1, tags2): 
    correlationSum = 0.0 
    for (tag1, tag2) in allCombinations(tags1, tags2): 
     correlationSum += correlation(tag1, tag2) 
    return correlationSum/(len(tags1) + len(tags2)) 

def allCombinations(list1, list2): 
    combinations = [] 
    for x in list1: 
     for y in list2: 
      combinations.append((x, y)) 
    return combinations    

Aber das calculateCorrelation ist kein gültiger Mapper in appengine-mapreduce und vielleicht ist diese Funktion nicht auch kompatibel mit MapReduce Berechnungskonzept. Aber ich muss mir sicher sein ... es wäre wirklich toll für mich, wenn ich diese appeengine-mapreduce-Vorteile wie automatisches Batching und Task-Chaining hätte.

Gibt es eine Lösung dafür?

Sollte ich meinen eigenen InputReader definieren? Ein neuer InputReader, der alle Instanzen zweier verschiedener Arten liest, ist mit der aktuellen appengine-mapreduce-Implementierung kompatibel?

Oder sollte ich Folgendes versuchen?

  • Kombinieren Sie alle Schlüssel aller Einheiten dieser beiden Arten, je zwei, in Instanzen eines neuen Modells (möglicherweise mit MapReduce)
  • Iterate Mapper über Instanzen dieses neue Modell
  • Für jede Verwendung Verwenden Sie in diesem Fall Schlüssel, um die zwei Entitäten verschiedener Arten zu erhalten, und berechnen Sie die Korrelation zwischen ihnen.
+0

Was sind die Kriterien für die in-Benutzer und Gegenstände übergeben? Ist es jede Kombination von Benutzer und Artikel? Nur solche, die irgendwie verwandt sind? Auch, welche Sprache ist das? Es ist nicht (ganz) Python! –

+0

'calculateCorrelation' sollte für jede Kombination von Benutzer und Element aufgerufen werden. Und jetzt habe ich die Arten von Variablen entfernt, um Verwirrung zu vermeiden. – fjsj

Antwort

3

Nach Nick Johnson Vorschlag schrieb ich meine eigenen InputReader. Dieser Leser holt Entitäten von zwei verschiedenen Arten. Es ergibt Tupel mit allen Kombinationen dieser Entitäten. Hier ist sie:

class TwoKindsInputReader(InputReader): 
    _APP_PARAM = "_app" 
    _KIND1_PARAM = "kind1" 
    _KIND2_PARAM = "kind2" 
    MAPPER_PARAMS = "mapper_params" 

    def __init__(self, reader1, reader2): 
     self._reader1 = reader1 
     self._reader2 = reader2 

    def __iter__(self): 
     for u in self._reader1: 
      for e in self._reader2: 
       yield (u, e) 

    @classmethod 
    def from_json(cls, input_shard_state): 
     reader1 = DatastoreInputReader.from_json(input_shard_state[cls._KIND1_PARAM]) 
     reader2 = DatastoreInputReader.from_json(input_shard_state[cls._KIND2_PARAM]) 

     return cls(reader1, reader2) 

    def to_json(self): 
     json_dict = {} 
     json_dict[self._KIND1_PARAM] = self._reader1.to_json() 
     json_dict[self._KIND2_PARAM] = self._reader2.to_json() 
     return json_dict 

    @classmethod 
    def split_input(cls, mapper_spec): 
     params = mapper_spec.params 
     app = params.get(cls._APP_PARAM) 
     kind1 = params.get(cls._KIND1_PARAM) 
     kind2 = params.get(cls._KIND2_PARAM) 
     shard_count = mapper_spec.shard_count 
     shard_count_sqrt = int(math.sqrt(shard_count)) 

     splitted1 = DatastoreInputReader._split_input_from_params(app, kind1, params, shard_count_sqrt) 
     splitted2 = DatastoreInputReader._split_input_from_params(app, kind2, params, shard_count_sqrt) 
     inputs = [] 

     for u in splitted1: 
      for e in splitted2: 
       inputs.append(TwoKindsInputReader(u, e)) 

     #mapper_spec.shard_count = len(inputs) #uncomment this in case of "Incorrect number of shard states" (at line 408 in handlers.py) 
     return inputs 

    @classmethod 
    def validate(cls, mapper_spec): 
     return True #TODO 

Dieser Code verwendet werden soll, wenn Sie alle Kombinationen von Einheiten von zwei Arten verarbeiten müssen. Sie können dies auch für mehr als zwei Arten verallgemeinern.

Hier ist ein gültiger der mapreduce.yaml für TwoKindsInputReader:

mapreduce: 
- name: recommendationMapReduce 
    mapper: 
    input_reader: customInputReaders.TwoKindsInputReader 
    handler: recommendation.calculateCorrelationHandler 
    params: 
    - name: kind1 
     default: kinds.User 
    - name: kind2 
     default: kinds.Item 
    - name: shard_count 
     default: 16 
2

Es ist schwierig zu wissen, was zu empfehlen, ohne weitere Details von dem, was Sie tatsächlich berechnen. Eine einfache Option besteht darin, einfach die zugehörige Entität innerhalb des Map-Aufrufs abzurufen - es gibt nichts, was Sie daran hindert, dort Datenspeicheroperationen durchzuführen.

Dies wird jedoch zu vielen kleinen Anrufen führen. Wenn Sie wie vorgeschlagen einen benutzerdefinierten InputReader schreiben, können Sie beide Gruppen von Entitäten parallel abrufen, was die Leistung erheblich verbessert.

Wenn Sie weitere Einzelheiten dazu angeben, wie Sie diesen Entitäten beitreten müssen, können wir möglicherweise konkretere Vorschläge machen.

+0

Nur weitere Informationen hinzugefügt! – fjsj