Ich untersuche Verarbeitungsprotokolle von Web-Benutzersitzungen über Google Dataflow/Apache Beam und müssen die Protokolle des Benutzers so kombinieren, wie sie kommen (Streaming) mit der Geschichte von eine Benutzersitzung vom letzten Monat.So kombinieren Sie Streaming-Daten mit großen Verlaufsdaten in Dataflow/Beam
Ich habe in den folgenden Ansätzen sieht:
- Verwenden Sie ein 30 Tage feststehendes Fenster: höchstwahrscheinlich zu groß ein Fensters in dem Speicher zu passen, und ich brauche nicht den Benutzer Geschichte zu aktualisieren, so verweisen es
- Verwenden CoGroupByKey zwei Datensätze anzuschließen, aber die beiden Datensätze müssen die gleiche Fenstergröße (https://cloud.google.com/dataflow/model/group-by-key#join) haben, was nicht wahr ist in meinem Fall (24h vs 30 Tage)
- Seitlicher Eingang zum abrufen Sitzungsverlauf des Benutzers für eine gegebene
element
inprocessElement(ProcessContext processContext)
Mein Verständnis ist, dass die über .withSideInputs(pCollectionView)
geladenen Daten in den Speicher passen müssen. Ich weiß, dass ich den gesamten Sitzungsverlauf eines einzelnen Benutzers in den Speicher aufnehmen kann, aber nicht alle Sitzungsverläufe.
Meine Frage ist, ob es eine Möglichkeit gibt, Daten von einer Nebeneingabe zu laden/zu streamen, die nur für die aktuelle Benutzersitzung relevant ist.
Ich stelle mir eine ParDo-Funktion vor, die die History-Sitzung des Benutzers von der Seiteneingabe lädt, indem sie die ID des Benutzers angibt. Aber nur die History-Sitzung des aktuellen Benutzers würde in den Speicher passen; Laden alle Geschichte Sitzungen durch Seiteneingabe wäre zu groß.
zu zeigen einige Pseudo-Code zu:
public static class MetricFn extends DoFn<LogLine, String> {
final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;
public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
this.pHistoryView = historyView;
}
@Override
public void processElement(ProcessContext processContext) throws Exception {
Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);
final LogLine currentLogLine = processContext.element();
final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
processContext.output(outputMetric);
}
}
Danke für die Klarstellung Daniel ... wäre es eine Option (in Dataflow), Benutzerhistorie dynamisch über Bigtable mit der Benutzer-ID als Schlüssel zu laden oder würde dies wahrscheinlich die Leistung töten (da die Geschichte abgefragt werden müsste)/für jede Benutzersitzung geladen)? – Florian
Solange Sie den Verlauf pro Sitzung (und nicht pro Ereignis) laden, wird es wahrscheinlich einigermaßen gut funktionieren. Da die Verlaufsdaten konstant sind, können Sie auch einen statischen Cache hinzufügen. Sobald Sie einen GroupByKey/Combine mit Ihren Benutzern als Schlüssel erstellt haben, wird jeder Benutzer in der Regel am selben Prozess verarbeitet, so dass der Cache ziemlich gut funktionieren sollte. – danielm