2013-12-16 8 views
16

Ich bin in Spark, ich habe eine RDD von einer Avro-Datei. Ich möchte nun einige Transformationen auf diesem RDD tun und es wieder als Avro-Datei speichern:Spark: Schreiben in Avro-Datei

val job = new Job(new Configuration()) 
AvroJob.setOutputKeySchema(job, getOutputSchema(inputSchema)) 

rdd.map(elem => (new SparkAvroKey(doTransformation(elem._1)), elem._2)) 
    .saveAsNewAPIHadoopFile(outputPath, 
    classOf[AvroKey[GenericRecord]], 
    classOf[org.apache.hadoop.io.NullWritable], 
    classOf[AvroKeyOutputFormat[GenericRecord]], 
    job.getConfiguration) 

Wenn dieser Funke läuft beklagt, dass Schema $ recordSchema nicht serialisierbar ist.

Wenn ich den .map-Aufruf auskommentiere (und nur rdd.saveAsNewAPIHadoopFile), ist der Aufruf erfolgreich.

Was mache ich hier falsch?

Irgendeine Idee?

+0

Könnten Sie bitte die Exception Stack Trace bereitstellen? Spark, Hadoop und Avro Versionsnummern könnten ebenfalls nützlich sein. – Wildfire

+0

Bitte vergib mir meine Naivität. Darf ich fragen, was ist hier los? Sieht so aus, als wäre es ein Map Reduce Job? Wenn wir Funken verwenden, um zu schreiben, warum brauchen wir einen Map Reduce Job? –

Antwort

2

Der von Spark verwendete Standard-Serializer ist die Java-Serialisierung. Also wird es für alle Java-Typen versuchen, mit der Java-Serialisierung zu serialisieren. AvroKey ist nicht serialisierbar, Sie erhalten also Fehler.

Sie können KryoSerializer oder Plugin in Ihrer benutzerdefinierten Serialisierung (wie Avro) verwenden. Sie können hier mehr über die Serialisierung lesen. http://spark-project.org/docs/latest/tuning.html

Sie können Ihr Objekt auch mit etwas umwickeln, das externalisierbar ist. Sehen Sie sich zum Beispiel das SparkFlumeEvent an, das AvroFlumeEvent hier umschließt: https://github.com/apache/spark/blob/master/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala

5

Das Problem hier bezieht sich auf die Nicht-Serialisierbarkeit der im Auftrag verwendeten Klasse avro.Schema. Die Ausnahme wird ausgelöst, wenn Sie versuchen, das Schema-Objekt aus dem Code innerhalb der Map-Funktion zu referenzieren.

Zum Beispiel, wenn Sie versuchen, wie folgt zu tun, werden Sie den „Task nicht serializable“ get Ausnahme:

val schema = new Schema.Parser().parse(new File(jsonSchema)) 
... 
rdd.map(t => { 
    // reference to the schema object declared outside 
    val record = new GenericData.Record(schema) 
}) 

Sie alles machen können, indem nur die Schaffung eine neue Instanz des Schemas, zu arbeiten innerhalb des Funktionsbausteins:

val schema = new Schema.Parser().parse(new File(jsonSchema)) 
// The schema above should not be used in closures, it's for other purposes 
... 
rdd.map(t => { 
    // create a new Schema object 
    val innserSchema = new Schema.Parser().parse(new File(jsonSchema)) 
    val record = new GenericData.Record(innserSchema) 
    ... 
}) 

Da Sie das avro Schema Parsen für jeden Datensatz nicht wie würden Sie behandeln, eine bessere Lösung wird das Schema auf Partitionsebene zu analysieren. Die folgende funktioniert auch:

val schema = new Schema.Parser().parse(new File(jsonSchema)) 
// The schema above should not be used in closures, it's for other purposes 
... 
rdd.mapPartitions(tuples => { 
    // create a new Schema object 
    val innserSchema = new Schema.Parser().parse(new File(jsonSchema)) 

    tuples.map(t => { 
    val record = new GenericData.Record(innserSchema) 
    ... 
    // this closure will be bundled together with the outer one 
    // (no serialization issues) 
    }) 
}) 

Der obige Code funktioniert, solange Sie einen tragbaren Verweis auf die jsonSchema Datei zur Verfügung stellen, da die Kartenfunktion von mehreren Remote-Vollstrecker ausgeführt werden wird. Es kann ein Verweis auf eine Datei in HDFS sein oder es kann zusammen mit der Anwendung in der JAR gepackt werden (Sie werden die Klassenladefunktionen verwenden, um im letzteren Fall den Inhalt zu erhalten).

Für diejenigen, die mit Funken Avro versuchen, zu verwenden, feststellen, dass es noch einige ungelöste Probleme Kompilation sind und Sie haben die folgende Import auf Maven POM verwenden:

<dependency> 
    <groupId>org.apache.avro</groupId> 
    <artifactId>avro-mapred</artifactId> 
    <version>1.7.7</version> 
    <classifier>hadoop2</classifier> 
<dependency> 

Notiere die "hadoop2" Klassifikator. Sie können das Problem unter https://issues.apache.org/jira/browse/SPARK-3039 verfolgen.

+0

Diese Methode funktioniert einwandfrei, wenn in unserer Kartenfunktion keine externen Abhängigkeiten vorhanden sind. Gibt es eine Möglichkeit, das Schema serialisierbar zu machen? – COSTA