2016-08-04 22 views
0

Zuvor hatte PCollection formatierte Ergebnisse; und ich war unter Code verwenden, um Zeilen in großer Abfrage einfügen:Einfügen von Daten in BigQuery aus Dataflow

    // OPTION 1 
PCollection<TableRow> formattedResults = .... 
formattedResults.apply(BigQueryIO.Write.named("Write").to(tableName) 
          .withSchema(tableSchema) 
          .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) 
          .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); 

Und alle Zeilen wurden direkt in BigQuery eingesetzt, die alle gut bis hier. Aber jetzt haben damit begonnen, ich den Tabellennamen und die entsprechende Zeile dynamisch zu identifizieren, so PCollection erschaffe wie folgt: (String wird der Tabellenname sein und dann als Wert seiner Reihe)

PCollection<KV<String, TableRow>> tableRowMap // OPTION 2 

Auch ich schaffe Gruppe von Zeilen das in derselben Tabelle gehen wird als:

PCollection<KV<String, Iterable<TableRow>>> groupedRows //OPTION 3 

den Schlüssel (String) ist der Tabellenname und BQ-Wert ist die Liste von Zeilen in BQ eingefügt werden.

Mit Option 1 konnte ich einfach Zeilen in BQ einfügen, die den oben gezeigten Code verwenden, aber derselbe Code kann nicht mit OPTION 2 oder OPTION 3 verwendet werden, da in diesem Fall mein Tabellenname Schlüssel in map ist. Gibt es eine Möglichkeit, Zeilen in Tabelle mit OPTION 2 oder OPTION 3 einzufügen. Jeder Link oder Codebeispiel wird große Hilfe sein.

Antwort

1

Das nächste, was Dataflow in eine Tabelle pro Fenster schreibt (und Sie können Ihre eigene BoundedWindow-Unterklasse und WindowFn erstellen, um die gewünschten Daten in das Fenster einzubeziehen). Verwenden Sie dazu

to(SerializableFunction<BoundedWindow,String> tableSpecFunction) 

auf BigQueryIO.Write.

Beachten Sie, dass diese Funktion die Streaming-Upload-Funktion von BigQuery verwendet, die auf 100 MB/s pro Tabelle beschränkt ist. Darüber hinaus sind Uploads nicht atomar, sodass ein fehlgeschlagener Batch-Job nur einen Teil der Ausgabe hochladen kann.

-1

Sie haben auch die Möglichkeit, ein eigenes DoFn zu erstellen, das Daten direkt in bigquery einfügt, anstatt sich auf BigQueryIO.Write zu verlassen. Technisch müssen Sie eine BigQueryTableInserter erstellen, können Sie die insertAll(TableReference ref, List<TableRow> rowList) verwenden, um Zeug in die gewünschte Tabelle einzufügen.

Sie können eine TableReference schaffen etwas mit wie: new TableReference().setProjectId("projectfoo").setDatasetId("datasetfoo").setTableId("tablefoo")

Dies wird nicht zu 100% zu empfehlen, da BigQueryIO einige nette Sachen hat die Zeilen aufgeteilt, die den Durchsatz maximieren müssen Einsetzen und Griffe Wiederholungen richtig.