Ich versuche, Google Cloud-Datenspeicherdaten in Avro-Dateien in Google Cloud-Speicher zu exportieren und dann diese Dateien in BigQuery zu laden.Warum kann BigQuery eine Avro-Datei, die von avro-tools akzeptiert wird, nicht analysieren?
Erstens weiß ich, dass Big Query Datenspeicher-Backups lädt. Dies hat mehrere Nachteile, die ich vermeiden möchte:
- Backup tool is closed source
- Backup-Tool Format undokumentiert ist.
- Backup-Tool-Format kann nicht direkt von Dataflow gelesen werden
- Backup scheduling for appengine ist in (scheinbar ewig) alpha.
- Es ist möglich, implement your own backup handler in appengine, aber es ist Feuer und vergessen. Sie wissen nicht, wann genau das Backup beendet wurde oder wie der Dateiname lautet.
Mit der Motivation für dieses Experiment geklärt, hier ist mein Datenfluss Pipeline der Daten zu Avro-Format zu exportieren:
package com.example.dataflow;
import com.google.api.services.datastore.DatastoreV1;
import com.google.api.services.datastore.DatastoreV1.Entity;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.AvroCoder;
import com.google.cloud.dataflow.sdk.io.AvroIO;
import com.google.cloud.dataflow.sdk.io.DatastoreIO;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.protobuf.ProtobufData;
import org.apache.avro.protobuf.ProtobufDatumWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
public class GCDSEntitiesToAvroSSCCEPipeline {
private static final String GCS_TARGET_URI = "gs://myBucket/datastore/dummy";
private static final String ENTITY_KIND = "Dummy";
private static Schema getSchema() {
return ProtobufData.get().getSchema(Entity.class);
}
private static final Logger LOG = LoggerFactory.getLogger(GCDSEntitiesToAvroSSCCEPipeline.class);
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
DatastoreV1.Query.Builder q = DatastoreV1.Query.newBuilder()
.addKind(DatastoreV1.KindExpression.newBuilder().setName(ENTITY_KIND));
p.apply(Read.named("DatastoreQuery").from(DatastoreIO.source()
.withDataset(options.as(DataflowPipelineOptions.class).getProject())
.withQuery(q.build())))
.apply(ParDo.named("ProtoBufToAvro").of(new ProtoBufToAvro()))
.setCoder(AvroCoder.of(getSchema()))
.apply(AvroIO.Write.named("WriteToAvro")
.to(GCS_TARGET_URI)
.withSchema(getSchema())
.withSuffix(".avro"));
p.run();
}
private static class ProtoBufToAvro extends DoFn<Entity, GenericRecord> {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws Exception {
Schema schema = getSchema();
ProtobufDatumWriter<Entity> pbWriter = new ProtobufDatumWriter<>(Entity.class);
DataFileWriter<Entity> dataFileWriter = new DataFileWriter<>(pbWriter);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
dataFileWriter.create(schema, bos);
dataFileWriter.append(c.element());
dataFileWriter.close();
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(
new SeekableByteArrayInput(bos.toByteArray()), datumReader);
c.output(dataFileReader.next());
}
}
}
Die Pipeline läuft gut, aber wenn ich versuche, die resultierende Avro-Datei in groß zu laden Abfrage ich die folgende Fehlermeldung erhalten:
bq load --project_id=roodev001 --source_format=AVRO dummy.dummy_1 gs://roodev001.appspot.com/datastore/dummy-00000-of-00001.avro
Waiting on bqjob_r5c9b81a49572a53b_00000154951eb523_1 ... (0s) Current status: DONE
BigQuery error in load operation: Error processing job 'roodev001:bqjob_r5c9b81a49572a53b_00000154951eb523_1': The Apache Avro library failed to parse file
gs://roodev001.appspot.com/datastore/dummy-00000-of-00001.avro.
Allerdings, wenn ich die resultierende Avro-Datei mit Avro-Tool laden, ist alles in Ordnung:
avro-tools tojson datastore-dummy-00000-of-00001.avro | head
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
{"key":{"com.google.api.services.datastore.DatastoreV1$.Key":{"partition_id":{"com.google.api.services.datastore.DatastoreV1$.PartitionId":{"dataset_id":"s~roodev001","namespace":""}},"path_element":[{"kind":"Dummy","id":4503905778008064,"name":""}]}},"property":[{"name":"number","value":{"boolean_value":false,"integer_value":879,"double_value":0.0,"timestamp_microseconds_value":0,"key_value":null,"blob_key_value":"","string_value":"","blob_value":"","entity_value":null,"list_value":[],"meaning":0,"indexed":true}}]}
...
benutzte ich diesen Code auf den Datenspeicher mit Dummy-Daten zu füllen, bevor die Datenfluss-Pipeline ausgeführt wird:
package com.example.datastore;
import com.google.gcloud.AuthCredentials;
import com.google.gcloud.datastore.*;
import java.io.IOException;
public static void main(String[] args) throws IOException {
Datastore datastore = DatastoreOptions.builder()
.projectId("myProjectId")
.authCredentials(AuthCredentials.createApplicationDefaults())
.build().service();
KeyFactory dummyKeyFactory = datastore.newKeyFactory().kind("Dummy");
Batch batch = datastore.newBatch();
int batchCount = 0;
for (int i = 0; i < 4000; i++){
IncompleteKey key = dummyKeyFactory.newKey();
System.out.println("adding entity " + i);
batch.add(Entity.builder(key).set("number", i).build());
batchCount++;
if (batchCount > 99) {
batch.submit();
batch = datastore.newBatch();
batchCount = 0;
}
}
System.out.println("done");
}
Warum ist meine avro Dateien BigQuery Ablehnung?
Welcher Codec wird in der AVRO-Datei für die Blockkompression verwendet? BigQuery unterstützt momentan nur DEFLATE. Siehe https://cloud.google.com/bigquery/data-formats#avro_format. –
@MichaelSheldon Soweit ich feststellen kann, wird in der Datei keine Blockkompression verwendet. Der Befehl "getmeta" der Avro-Tools gibt keinen Wert für 'avro.codec' aus. –
Heres ein Link zu der Ausgabe Avro-Datei von Dummy-Einheiten: https://drive.google.com/open?id=0B4dY1dqkTY14VmZnc0pCUjNrMEE –