2016-04-29 19 views
10

Ich untersuche Verarbeitungsprotokolle von Web-Benutzersitzungen über Google Dataflow/Apache Beam und müssen die Protokolle des Benutzers so kombinieren, wie sie kommen (Streaming) mit der Geschichte von eine Benutzersitzung vom letzten Monat.So kombinieren Sie Streaming-Daten mit großen Verlaufsdaten in Dataflow/Beam

Ich habe in den folgenden Ansätzen sieht:

  1. Verwenden Sie ein 30 Tage feststehendes Fenster: höchstwahrscheinlich zu groß ein Fensters in dem Speicher zu passen, und ich brauche nicht den Benutzer Geschichte zu aktualisieren, so verweisen es
  2. Verwenden CoGroupByKey zwei Datensätze anzuschließen, aber die beiden Datensätze müssen die gleiche Fenstergröße (https://cloud.google.com/dataflow/model/group-by-key#join) haben, was nicht wahr ist in meinem Fall (24h vs 30 Tage)
  3. Seitlicher Eingang zum abrufen Sitzungsverlauf des Benutzers für eine gegebene element in processElement(ProcessContext processContext)

Mein Verständnis ist, dass die über .withSideInputs(pCollectionView) geladenen Daten in den Speicher passen müssen. Ich weiß, dass ich den gesamten Sitzungsverlauf eines einzelnen Benutzers in den Speicher aufnehmen kann, aber nicht alle Sitzungsverläufe.

Meine Frage ist, ob es eine Möglichkeit gibt, Daten von einer Nebeneingabe zu laden/zu streamen, die nur für die aktuelle Benutzersitzung relevant ist.

Ich stelle mir eine ParDo-Funktion vor, die die History-Sitzung des Benutzers von der Seiteneingabe lädt, indem sie die ID des Benutzers angibt. Aber nur die History-Sitzung des aktuellen Benutzers würde in den Speicher passen; Laden alle Geschichte Sitzungen durch Seiteneingabe wäre zu groß.

zu zeigen einige Pseudo-Code zu:

public static class MetricFn extends DoFn<LogLine, String> { 

    final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView; 

    public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) { 
     this.pHistoryView = historyView; 
    } 

    @Override 
    public void processElement(ProcessContext processContext) throws Exception { 
     Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView); 

     final LogLine currentLogLine = processContext.element(); 
     final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId()); 
     final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory); 
     processContext.output(outputMetric); 
    } 
} 

Antwort

1

Es für den Zugriff auf pro-Taste Seite Eingänge in Streaming ist nicht ein Weg ist, aber es wäre auf jeden Fall genau nützlich sein, wie Sie beschreiben, und es ist etwas, was wir sind über die Implementierung nachdenken.

Eine mögliche Problemumgehung besteht darin, die Seiteneingaben zu verwenden, um Zeiger auf den tatsächlichen Sitzungsverlauf zu verteilen. Der Code, der die 24h-Sitzungshistorien generiert, könnte sie nach GCS/BigQuery/etc hochladen und dann die Orte als Nebeneingabe an den Verbindungscode senden.

+0

Danke für die Klarstellung Daniel ... wäre es eine Option (in Dataflow), Benutzerhistorie dynamisch über Bigtable mit der Benutzer-ID als Schlüssel zu laden oder würde dies wahrscheinlich die Leistung töten (da die Geschichte abgefragt werden müsste)/für jede Benutzersitzung geladen)? – Florian

+0

Solange Sie den Verlauf pro Sitzung (und nicht pro Ereignis) laden, wird es wahrscheinlich einigermaßen gut funktionieren. Da die Verlaufsdaten konstant sind, können Sie auch einen statischen Cache hinzufügen. Sobald Sie einen GroupByKey/Combine mit Ihren Benutzern als Schlüssel erstellt haben, wird jeder Benutzer in der Regel am selben Prozess verarbeitet, so dass der Cache ziemlich gut funktionieren sollte. – danielm