3

Ich versuche, Google Cloud-Datenspeicherdaten in Avro-Dateien in Google Cloud-Speicher zu exportieren und dann diese Dateien in BigQuery zu laden.Warum kann BigQuery eine Avro-Datei, die von avro-tools akzeptiert wird, nicht analysieren?

Erstens weiß ich, dass Big Query Datenspeicher-Backups lädt. Dies hat mehrere Nachteile, die ich vermeiden möchte:

Mit der Motivation für dieses Experiment geklärt, hier ist mein Datenfluss Pipeline der Daten zu Avro-Format zu exportieren:

package com.example.dataflow; 

import com.google.api.services.datastore.DatastoreV1; 
import com.google.api.services.datastore.DatastoreV1.Entity; 
import com.google.cloud.dataflow.sdk.Pipeline; 
import com.google.cloud.dataflow.sdk.coders.AvroCoder; 
import com.google.cloud.dataflow.sdk.io.AvroIO; 
import com.google.cloud.dataflow.sdk.io.DatastoreIO; 
import com.google.cloud.dataflow.sdk.io.Read; 
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; 
import com.google.cloud.dataflow.sdk.options.PipelineOptions; 
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; 
import com.google.cloud.dataflow.sdk.transforms.DoFn; 
import com.google.cloud.dataflow.sdk.transforms.ParDo; 
import org.apache.avro.Schema; 
import org.apache.avro.file.DataFileReader; 
import org.apache.avro.file.DataFileWriter; 
import org.apache.avro.file.SeekableByteArrayInput; 
import org.apache.avro.generic.GenericDatumReader; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.avro.io.DatumReader; 
import org.apache.avro.protobuf.ProtobufData; 
import org.apache.avro.protobuf.ProtobufDatumWriter; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.io.ByteArrayOutputStream; 

public class GCDSEntitiesToAvroSSCCEPipeline { 

    private static final String GCS_TARGET_URI = "gs://myBucket/datastore/dummy"; 
    private static final String ENTITY_KIND = "Dummy"; 

    private static Schema getSchema() { 
     return ProtobufData.get().getSchema(Entity.class); 
    } 

    private static final Logger LOG = LoggerFactory.getLogger(GCDSEntitiesToAvroSSCCEPipeline.class); 
    public static void main(String[] args) { 
     PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create(); 
     Pipeline p = Pipeline.create(options); 

     DatastoreV1.Query.Builder q = DatastoreV1.Query.newBuilder() 
       .addKind(DatastoreV1.KindExpression.newBuilder().setName(ENTITY_KIND)); 

     p.apply(Read.named("DatastoreQuery").from(DatastoreIO.source() 
       .withDataset(options.as(DataflowPipelineOptions.class).getProject()) 
       .withQuery(q.build()))) 
      .apply(ParDo.named("ProtoBufToAvro").of(new ProtoBufToAvro())) 
      .setCoder(AvroCoder.of(getSchema())) 
      .apply(AvroIO.Write.named("WriteToAvro") 
        .to(GCS_TARGET_URI) 
        .withSchema(getSchema()) 
        .withSuffix(".avro")); 
     p.run(); 

    } 

    private static class ProtoBufToAvro extends DoFn<Entity, GenericRecord> { 
     private static final long serialVersionUID = 1L; 

     @Override 
     public void processElement(ProcessContext c) throws Exception { 
      Schema schema = getSchema(); 
      ProtobufDatumWriter<Entity> pbWriter = new ProtobufDatumWriter<>(Entity.class); 
      DataFileWriter<Entity> dataFileWriter = new DataFileWriter<>(pbWriter); 
      ByteArrayOutputStream bos = new ByteArrayOutputStream(); 
      dataFileWriter.create(schema, bos); 
      dataFileWriter.append(c.element()); 
      dataFileWriter.close(); 

      DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema); 
      DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(
        new SeekableByteArrayInput(bos.toByteArray()), datumReader); 

      c.output(dataFileReader.next()); 

     } 
    } 
} 

Die Pipeline läuft gut, aber wenn ich versuche, die resultierende Avro-Datei in groß zu laden Abfrage ich die folgende Fehlermeldung erhalten:

bq load --project_id=roodev001 --source_format=AVRO dummy.dummy_1 gs://roodev001.appspot.com/datastore/dummy-00000-of-00001.avro 
Waiting on bqjob_r5c9b81a49572a53b_00000154951eb523_1 ... (0s) Current status: DONE 
BigQuery error in load operation: Error processing job 'roodev001:bqjob_r5c9b81a49572a53b_00000154951eb523_1': The Apache Avro library failed to parse file 
gs://roodev001.appspot.com/datastore/dummy-00000-of-00001.avro. 

Allerdings, wenn ich die resultierende Avro-Datei mit Avro-Tool laden, ist alles in Ordnung:

avro-tools tojson datastore-dummy-00000-of-00001.avro | head 
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). 
log4j:WARN Please initialize the log4j system properly. 
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 
{"key":{"com.google.api.services.datastore.DatastoreV1$.Key":{"partition_id":{"com.google.api.services.datastore.DatastoreV1$.PartitionId":{"dataset_id":"s~roodev001","namespace":""}},"path_element":[{"kind":"Dummy","id":4503905778008064,"name":""}]}},"property":[{"name":"number","value":{"boolean_value":false,"integer_value":879,"double_value":0.0,"timestamp_microseconds_value":0,"key_value":null,"blob_key_value":"","string_value":"","blob_value":"","entity_value":null,"list_value":[],"meaning":0,"indexed":true}}]} 
... 

benutzte ich diesen Code auf den Datenspeicher mit Dummy-Daten zu füllen, bevor die Datenfluss-Pipeline ausgeführt wird:

package com.example.datastore; 

import com.google.gcloud.AuthCredentials; 
import com.google.gcloud.datastore.*; 

import java.io.IOException; 

public static void main(String[] args) throws IOException { 

    Datastore datastore = DatastoreOptions.builder() 
      .projectId("myProjectId") 
      .authCredentials(AuthCredentials.createApplicationDefaults()) 
      .build().service(); 

    KeyFactory dummyKeyFactory = datastore.newKeyFactory().kind("Dummy"); 


    Batch batch = datastore.newBatch(); 
    int batchCount = 0; 
    for (int i = 0; i < 4000; i++){ 
     IncompleteKey key = dummyKeyFactory.newKey(); 
     System.out.println("adding entity " + i); 
     batch.add(Entity.builder(key).set("number", i).build()); 
     batchCount++; 
     if (batchCount > 99) { 
      batch.submit(); 
      batch = datastore.newBatch(); 
      batchCount = 0; 
     } 
    } 

    System.out.println("done"); 

} 

Warum ist meine avro Dateien BigQuery Ablehnung?

+1

Welcher Codec wird in der AVRO-Datei für die Blockkompression verwendet? BigQuery unterstützt momentan nur DEFLATE. Siehe https://cloud.google.com/bigquery/data-formats#avro_format. –

+0

@MichaelSheldon Soweit ich feststellen kann, wird in der Datei keine Blockkompression verwendet. Der Befehl "getmeta" der Avro-Tools gibt keinen Wert für 'avro.codec' aus. –

+0

Heres ein Link zu der Ausgabe Avro-Datei von Dummy-Einheiten: https://drive.google.com/open?id=0B4dY1dqkTY14VmZnc0pCUjNrMEE –

Antwort

3

BigQuery verwendet die C++ - Avro-Bibliothek, und anscheinend mag es nicht das "$" im Namespace. Hier ist die Fehlermeldung:

ungültiger Namespace: com.google.api.services.datastore.DatastoreV1 $

Wir arbeiten diese Avro Fehlermeldungen, um den Endbenutzer auf zu bekommen.

+0

Von dem, was ich weiß, auch $ -Zeichen Entfernen nicht Die neueste ist Ich habe gehört, hilft das Problem für Doppel in langen Sein Standard war und dies bewirkt, dass in C++ Avro Bibliothek –

+0

@HuaZhang Ich werde sehen, ob ich filtern kaskadierende Fehler, Dollar raus und sehen ob es hilft. Ich weiß, dass Avro für BigQuery in der Betaversion ist. Wird dieses Problem gelöst, bevor Avro für BigQuery allgemein verfügbar ist? Dies scheint das vorgelagerte Problem zu sein https://issues.apache.org/jira/browse/AVRO-1478 –

+0

@MikhailBerlyant Ja, die C++ Avro-Bibliothek hat mehr Einschränkungen als die Java-Bibliothek.Bevor Sie die Pipe erstellen, um die Fehlermeldungen der Avro-Bibliothek anzuzeigen, testen Sie die Datei mit der C++ Avro-Bibliothek, um sicherzustellen, dass sie verarbeitet werden kann. –