Das Problem hier bezieht sich auf die Nicht-Serialisierbarkeit der im Auftrag verwendeten Klasse avro.Schema. Die Ausnahme wird ausgelöst, wenn Sie versuchen, das Schema-Objekt aus dem Code innerhalb der Map-Funktion zu referenzieren.
Zum Beispiel, wenn Sie versuchen, wie folgt zu tun, werden Sie den „Task nicht serializable“ get Ausnahme:
val schema = new Schema.Parser().parse(new File(jsonSchema))
...
rdd.map(t => {
// reference to the schema object declared outside
val record = new GenericData.Record(schema)
})
Sie alles machen können, indem nur die Schaffung eine neue Instanz des Schemas, zu arbeiten innerhalb des Funktionsbausteins:
val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it's for other purposes
...
rdd.map(t => {
// create a new Schema object
val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
val record = new GenericData.Record(innserSchema)
...
})
Da Sie das avro Schema Parsen für jeden Datensatz nicht wie würden Sie behandeln, eine bessere Lösung wird das Schema auf Partitionsebene zu analysieren. Die folgende funktioniert auch:
val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it's for other purposes
...
rdd.mapPartitions(tuples => {
// create a new Schema object
val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
tuples.map(t => {
val record = new GenericData.Record(innserSchema)
...
// this closure will be bundled together with the outer one
// (no serialization issues)
})
})
Der obige Code funktioniert, solange Sie einen tragbaren Verweis auf die jsonSchema Datei zur Verfügung stellen, da die Kartenfunktion von mehreren Remote-Vollstrecker ausgeführt werden wird. Es kann ein Verweis auf eine Datei in HDFS sein oder es kann zusammen mit der Anwendung in der JAR gepackt werden (Sie werden die Klassenladefunktionen verwenden, um im letzteren Fall den Inhalt zu erhalten).
Für diejenigen, die mit Funken Avro versuchen, zu verwenden, feststellen, dass es noch einige ungelöste Probleme Kompilation sind und Sie haben die folgende Import auf Maven POM verwenden:
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.7.7</version>
<classifier>hadoop2</classifier>
<dependency>
Notiere die "hadoop2"
Klassifikator. Sie können das Problem unter https://issues.apache.org/jira/browse/SPARK-3039 verfolgen.
Könnten Sie bitte die Exception Stack Trace bereitstellen? Spark, Hadoop und Avro Versionsnummern könnten ebenfalls nützlich sein. – Wildfire
Bitte vergib mir meine Naivität. Darf ich fragen, was ist hier los? Sieht so aus, als wäre es ein Map Reduce Job? Wenn wir Funken verwenden, um zu schreiben, warum brauchen wir einen Map Reduce Job? –