2015-08-24 7 views
5

Wenn ich beispielsweise einen Dataflow-Streaming-Job mit einem 5-Minuten-Fenster habe, das von PubSub liest, verstehe ich, dass wenn ich einem Element einen zwei Tage zurückliegenden Zeitstempel zuweiße, ein Fenster mit diesem Element vorhanden ist Beispiel, das tägliche Tabellen an BigQuery ausgibt, die in BigQueryIO.java beschrieben werden, schreibt der Job das zwei Tage nachher Element in einer BigQuery-Tabelle mit dem tatsächlichen Datum.Ist es möglich, BigQuery-Tabellennamen basierend auf den Zeitstempeln der Elemente eines Fensters dynamisch zu generieren?

Ich möchte vergangene Elemente in BigQuery-Tabellen mit dem Zeitstempel der Elemente des Fensters statt der Zeit des aktuellen Fensters schreiben, ist das möglich?

Jetzt verfolge ich das Beispiel in DataflowJavaSDK beschrieben/sdk/src/main/java/com/google/Cloud/Datenfluß/sdk/io/BigQueryIO.java:

PCollection<TableRow> quotes = ... 
    quotes.apply(Window.<TableRow>info(CalendarWindows.days(1))) 
     .apply(BigQueryIO.Write 
     .named("Write") 
     .withSchema(schema) 
     .to(new SerializableFunction<BoundedWindow, String>() { 
       public String apply(BoundedWindow window) { 
       String dayString = DateTimeFormat.forPattern("yyyy_MM_dd").parseDateTime(
        ((DaysWindow) window).getStartDate()); 
       return "my-project:output.output_table_" + dayString; 
       } 
      })); 
+0

uns Zeigen Sie etwas Code, da es nicht klar, was Sie fordern. – Pentium10

+0

Pentium10 - Ich habe die Frage mit dem Code aktualisiert, den ich jetzt verwende. – bsmarcosj

Antwort

5

Wenn ich richtig verstehe möchten Sie sicherstellen, dass BigQuery-Tabellen gemäß den inhärenten Zeitstempeln der Elemente (Anführungszeichen) erstellt werden und nicht in der Uhrzeit, zu der die Pipeline ausgeführt wird.

TL; DR der Code sollte schon tun, was Sie wollen; Wenn nicht, bitte posten Sie weitere Details.

Längere Erklärung: Eine der wichtigsten Neuerungen in der Verarbeitung in Dataflow ist Event-Zeit-Verarbeitung. Dies bedeutet, dass die Datenverarbeitung in Dataflow fast vollständig entkoppelt ist, wenn die Verarbeitung passiert - was zählt ist, wenn die Ereignisse verarbeitet passiert ist. Dies ist ein Schlüsselelement zum Ermöglichen, dass genau derselbe Code in Stapel- oder Streaming-Datenquellen ausgeführt wird (z. B. Verarbeiten von Echtzeit-Benutzerklickereignissen unter Verwendung des gleichen Codes, der historische Klickprotokolle verarbeitet). Es ermöglicht auch eine flexible Handhabung von spät ankommenden Daten.

Eine Beschreibung dieses Aspekts des Dataflow-Verarbeitungsmodells finden Sie unter The world beyond batch, Abschnitt "Ereigniszeit vs. Verarbeitungszeit" (der gesamte Artikel ist sehr lesenswert). Für eine tiefere Beschreibung, siehe VLDB paper. Dies wird auch benutzerorientierter in der offiziellen Dokumentation unter windowing und triggers beschrieben.

Dementsprechend gibt es kein "aktuelles Fenster", da die Pipeline möglicherweise gleichzeitig viele verschiedene Ereignisse verarbeiten kann, die zu unterschiedlichen Zeiten stattfanden und zu verschiedenen Fenstern gehören. Wie das VLDB-Papier zeigt, ist einer der wichtigen Teile der Ausführung einer Dataflow-Pipeline "Gruppenelemente nach Fenster".

In der Pipeline, die Sie gezeigt haben, gruppieren wir die Datensätze, die Sie in BigQuery schreiben möchten, in Windows mit provided timestamps für die Datensätze und schreiben jedes Fenster in seine eigene Tabelle und erstellen bei Bedarf die Tabelle für neu aufgetretene Fenster. Wenn späte Daten in das Fenster gelangen (siehe Dokumentation zur Fensterung und Auslöser für eine Diskussion von späten Daten), werden wir an die bereits vorhandene Tabelle anhängen.

1

Der oben genannte Code funktionierte nicht mehr für mich. Es gibt eine updated example in den Google Docs obwohl wo DaysWindow durch ersetzt IntervalWindow, die für mich gearbeitet:

PCollection<TableRow> quotes = ... 
quotes.apply(Window.<TableRow>into(CalendarWindows.days(1))) 
    .apply(BigQueryIO.Write 
    .named("Write") 
    .withSchema(schema) 
    .to(new SerializableFunction<BoundedWindow, String>() { 
     public String apply(BoundedWindow window) { 
     // The cast below is safe because CalendarWindows.days(1) produces IntervalWindows. 
     String dayString = DateTimeFormat.forPattern("yyyy_MM_dd") 
       .withZone(DateTimeZone.UTC) 
       .print(((IntervalWindow) window).start()); 
     return "my-project:output.output_table_" + dayString; 
     } 
    }));