2016-07-19 24 views
0

Problem: Objekt nicht serialisierbarSpark Avro zu Parquet Writer

Können Sie bitte sehen, wie das Problem zu beheben. in der Lage, es richtig zu lesen wie das Drucken richtig. aber während die Aufzeichnungen schriftlich Parkett

Objekt nicht immer serializable

verursacht durch: java.io.NotSerializableException: parquet.avro.AvroParquetWriter Serialisierung Stack: - Objekt nicht serializable (Klasse: parquet.avro .AvroParquetWriter, Wert: [email protected])

Bitte lesen sie und lassen sie mich wissen, was der beste Weg, es zu tun.

Code: coverting Avro Datensatz Parkett

val records = sc.newAPIHadoopRDD(conf.getConfiguration, 
    classOf[AvroKeyInputFormat[GenericRecord]], 
    classOf[AvroKey[GenericRecord]], //Transforms the PairRDD to RDD 
    classOf[NullWritable]).map(x => x._1.datum) 

    // Build a schema 
    val schema = SchemaBuilder 
    .record("x").namespace("x") 
    .fields 
    .name("x").`type`().stringType().noDefault() 
    .endRecord 

val parquetWriter = new AvroParquetWriter[GenericRecord](new Path(outPath), schema) 

val parquet = new GenericRecordBuilder(schema) 

records.foreach { keyVal => 
    val x = keyVal._1.datum().get("xyz") -- Field 
    parquet.set("x", x) 
     .build 
     parquetWriter.write(schema.build()) 
    } 

Antwort

0

Ich bin nicht sicher, warum Sie den Ansatz nehmen Sie sind. Aber ich würde einen anderen Ansatz empfehlen. Wenn du die avro-Datei in eine rdd bekommst, sieht es so aus. Und Sie können ein Schema erstellen, dann die RDD in einen Datenrahmen konvertieren und dann den Datenrahmen als Parkett ausschreiben.

var avroDF = sqlContext.createDataFrame(avroRDD,avroSchema) 
avroDF 
    .write 
    .mode(SaveMode.Overwrite) 
    .parquet("parquet directory to write file") 
+0

Danke für den Ansatz zu lesen. aber Problem ist, das ist verschachtelte Struktur von Array, Liste, Karte. Sehr große verschachtelte Avro. Um also zu flattern, müssen wir alle Elemente durchlaufen und alles Notwendige bekommen. – Ankur

+0

wäre es schön, wenn Sie eine dieser Antworten aufwerten und akzeptieren würden. Ich habe jede Frage beantwortet, die du gestellt hast. @ Ankur – mark

0

Für einige meiner komplexen JSON, die komplizierte Strukturen und Arrays hat, verwende ich Hive ql Seitenansicht explodieren. Hier ist ein Beispiel für komplexe json, die abgeflacht ist. Es beginnt als 10 Zeilen und für einige Spuren kann ich 60 Zeilen bekommen und einige bekomme ich weniger als 5. Es hängt nur davon ab, wie es explodiert.

val tenj = sqlContext.read.json("file:///home/marksmith/hive/Tenfile.json") 

scala> tenj.printSchema 
root 

|-- DDIVersion: string (nullable = true) 
|-- EndTimestamp: string (nullable = true) 
|-- Stalls: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- Stall: long (nullable = true) 
| | |-- StallType: string (nullable = true) 
| | |-- TraceTypes: struct (nullable = true) 
| | | |-- ActiveTicket: struct (nullable = true) 
| | | | |-- Category: string (nullable = true) 
| | | | |-- Traces: array (nullable = true) 
| | | | | |-- element: struct (containsNull = true) 
| | | | | | |-- EndTime: string (nullable = true) 
| | | | | | |-- ID: string (nullable = true) 
| | | | | | |-- Source: string (nullable = true) 
| | | | | | |-- StartPayload: struct (nullable = true) 
| | | | | | | |-- SubticketID: string (nullable = true) 
| | | | | | | |-- TicketID: string (nullable = true) 
| | | | | | | |-- TicketState: long (nullable = true) 
| | | | | | |-- StartTime: string (nullable = true) 

tenj.registerTempTable("ddis") 


val sat = sqlContext.sql(
    "select DDIVersion, StallsExp.stall, StallsExp.StallType, at.EndTime, at.ID, 
     at.Source, at.StartPayload.SubTicketId, at.StartPayload.TicketID, 
     at.StartPayload.TicketState, at.StartTime 
    from ddis 
     lateral view explode(Stalls) st as StallsExp 
     lateral view explode(StallsExp.TraceTypes.ActiveTicket.Traces) at1 as at") 
sat: org.apache.spark.sql.DataFrame = [DDIVersion: string, stall: bigint, StallType: string, EndTime: string, ID: string, Source: string, SubTicketId: string, TicketID: string, TicketState: bigint, StartTime: string] 

sat.count 
res22: Long = 10 

sat.show 
+----------+-----+---------+--------------------+---+------+-----------+--------+-----------+--------------------+ 
|DDIVersion|stall|StallType|    EndTime| ID|Source|SubTicketId|TicketID|TicketState|   StartTime| 
+----------+-----+---------+--------------------+---+------+-----------+--------+-----------+--------------------+ 
| 5.3.1.11| 15| POPS4|2016-06-08T20:07:...| | STALL|   0|  777|   1|2016-06-08T20:07:...| 
| 5.3.1.11| 14| POPS4|2016-06-08T20:07:...| | STALL|   0|  384|   1|2016-06-08T20:06:...| 
| 5.3.1.11| 13| POPS4|2016-06-08T20:07:...| | STALL|   0| 135792|   1|2016-06-08T20:06:...| 
| 5.0.0.28| 26| POPS4|2016-06-08T20:06:...| | STALL|   0|  774|   2|2016-06-08T20:03:...| 
+0

Danke Mark. Können Sie bitte die Möglichkeit geben, verschachtelte avro zu lesen und einige spezifische Spalten zu holen und sie auf das Parkett-Format zu werfen ... – Ankur

1

Sie könnten hier starten in avro in einen Datenrahmen https://github.com/databricks/spark-avro

// import needed for the .avro method to be added 
import com.databricks.spark.avro._ 

val sqlContext = new SQLContext(sc) 

// The Avro records get converted to Spark typesca 
val df = sqlContext.read.avro("src/test/resources/episodes.avro") 

df.registerTempTable("tempTable") 
val sat = sqlContext.sql(//use lateral view explode) 
sat.write.parquet("/tmp/output") 
+0

Was ist der Zweck von '// laterale Ansicht explodieren'? Warum ist das nötig? –

+0

Sie haben eine Spalte, die ein Array von drei Dingen ist. Wenn Sie die seitliche Ansicht verwenden, können Sie diese Reihe abflachen, aber es werden drei Reihen sein. Alle Spalten sind identisch mit Ausnahme der Spalte, die ein Array war. Das wird drei verschiedene Werte haben. – mark

+0

Warum nicht "explodieren" alleine (ohne seitliche Ansicht)? Ich weiß, dass ich es mit 'explode' allein machen kann und habe mich gefragt, warum ich eine Seitenansicht verwenden soll. Sie können auch 'Dataset.flatMap' verwenden. –