2016-07-23 15 views
1

Ich versuche, mit Cloud Dataflow in eine BigQuery-Tabelle zu schreiben. Diese BigQuery-Tabelle hat eine Integer-Spalte, die auf NULL gesetzt ist. Für Nullwerte, gibt es folgende Fehlermeldung:Ich kann keine nullbaren Ganzzahlwerte in BigQuery mit Cloud Dataflow schreiben

Could not convert value to integer. Field: ITM_QT; Value:

Aber wenn ich den Datentyp der gleichen Spalte in String umgewandelt wird NULL-Werte akzeptieren.

Gibt es eine Möglichkeit, NULL-Werte mit Cloud Dataflow in eine Integer-Spalte zu schreiben?

Dieser Fehler tritt auf, wenn ich den Spaltendatentyp in String ändere.

+0

Dieses Problem besteht sogar für Null-Zeitstempel-Werte. SO habe ich dieses Problem für Integer-, Float- und Timestamp-Datentypen. –

+0

Sind Sie sicher, dass Sie null (tatsächlich null) und nicht "null" wie in einem String-Wert setzen? –

Antwort

2

nicht sicher, was Sie falsch machen, aber der folgende Code funktioniert gut, und in der Tat null Werte für Integer & Float-Datentypen in BigQuery erlaubt das Schreiben:

public static void main(String[] args) { 
     DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class); 
     options.setRunner(DirectPipelineRunner.class); 
     options.setProject("<project-id>"); 

     Pipeline pipeline = Pipeline.create(options); 

     PCollection<TableRow> results = pipeline.apply("whatever", BigQueryIO.Read.from("<table-spec>")).apply(ParDo.of(new DoFn<TableRow, TableRow>() { 
      @Override 
      public void processElement(ProcessContext c) throws Exception { 
       System.out.println(c.element()); 
       TableRow row = new TableRow(); 
       row.set("foo", null); //null FLOAT 
       row.set("bar", null); //null INTEGER 
       c.output(row); 
      } 
     })); 

     List<TableFieldSchema> fields = new ArrayList<>(); 
     fields.add(new TableFieldSchema().setName("foo").setType("FLOAT")); 
     fields.add(new TableFieldSchema().setName("bar").setType("INTEGER")); 
     TableSchema schema = new TableSchema().setFields(fields); 

     results.apply(BigQueryIO.Write 
       .named("Write") 
       .to("<project-id>:<dataset-name>.write_null_numbers_test") 
       .withSchema(schema) 
       .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) 
       .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); 

     pipeline.run(); 
    } 

enter image description here

enter image description here

+0

Hey danke, deine Lösung hat wirklich funktioniert. Der Fehler kam, weil ich es falsch machte. –