Ich versuche einen Datenspeicherfluss zu schreiben, der einen Stream für Pub-Sub einliest und in eine große Abfrage schreibt.Schreiben in BigQuery aus Cloud Dataflow: Ich kann keine Side-Input-Ansicht von Eingabe erstellen
Beim Versuch, das Werkzeug, das ich die Fehlermeldung „Es kann keine Nebeneingang Ansicht von Eingang schaffen“ erhalten laufen mit dem Stack-Trace:
Exception in thread "main" java.lang.IllegalStateException: Unable to create a side-input view from input
at com.google.cloud.dataflow.sdk.transforms.View$AsIterable.validate(View.java:277)
at com.google.cloud.dataflow.sdk.transforms.View$AsIterable.validate(View.java:268)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:366)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274)
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161)
at com.google.cloud.dataflow.sdk.io.Write$Bound.createWrite(Write.java:214)
at com.google.cloud.dataflow.sdk.io.Write$Bound.apply(Write.java:79)
at com.google.cloud.dataflow.sdk.io.Write$Bound.apply(Write.java:68)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.apply(DirectPipelineRunner.java:247)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:290)
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:174)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$Bound.apply(BigQueryIO.java:1738)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$Bound.apply(BigQueryIO.java:1440)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.apply(DirectPipelineRunner.java:247)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274)
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161)
at co.uk.bubblestudent.dataflow.StarterPipeline.main(StarterPipeline.java:116)
Caused by: java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.
at com.google.cloud.dataflow.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:192)
at com.google.cloud.dataflow.sdk.transforms.View$AsIterable.validate(View.java:275)
... 20 more
Mein Code ist:
public class StarterPipeline {
public static final Duration ONE_DAY = Duration.standardDays(1);
public static final Duration ONE_HOUR = Duration.standardHours(1);
public static final Duration TEN_SECONDS = Duration.standardSeconds(10);
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
private static TableSchema schemaGen() {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("facebookID").setType("STRING"));
fields.add(new TableFieldSchema().setName("propertyID").setType("STRING"));
fields.add(new TableFieldSchema().setName("time").setType("TIMESTAMP"));
TableSchema schema = new TableSchema().setFields(fields);
return schema;
}
public static void main(String[] args) {
LOG.info("Starting");
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
LOG.info("Pipeline made");
// For Cloud execution, set the Cloud Platform project, staging location,
// and specify DataflowPipelineRunner or BlockingDataflowPipelineRunner.
options.setProject(<project>);
options.setStagingLocation(<bucket>);
options.setTempLocation(<bucket>);
Pipeline p = Pipeline.create(options);
TableSchema schema = schemaGen();
LOG.info("Schema made");
try {
LOG.info(schema.toPrettyString());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
PCollection<String> input = p.apply(PubsubIO.Read.named("ReadFromPubsub").subscription(<subscription>));
PCollection<TableRow> pardo = input.apply(ParDo.of(new FormatAsTableRowFn()));
LOG.info("Formatted Row");
pardo.apply(BigQueryIO.Write.named("Write into BigQuery").to(<table>)
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
LOG.info("about to run");
p.run();
}
static class FormatAsTableRowFn extends DoFn<String, TableRow> {
@Override
public void processElement(ProcessContext c) {
LOG.info("Formatting");
String json = c.element();
//HashMap<String,String> items = new Gson().fromJson(json, new TypeToken<HashMap<String, String>>(){}.getType());
// Make a BigQuery row from the JSON object:
TableRow row = new TableRow()
.set("facebookID","324234")
.set("properttyID", "23423")
.set("time", "12312313123");
/*
* TableRow row = new TableRow()
.set("facebookID", items.get("facbookID"))
.set("properttyID", items.get("propertyID"))
.set("time", items.get("time"));
*/
c.output(row);
}
}
}
Beliebig Vorschläge, was das sein könnte?
Welche Version des Dataflow-SDK verwenden Sie? – danielm
1.1.2 für die Sonnenfinsternis –
Ich glaube nicht, dass es eine Version 1.1.2 gab. Datenfluss ist jetzt bis zu 1.6.0; kannst du das versuchen? – danielm