2016-07-19 37 views
0

ich mit Scala/Funken ein seltsames Problem bin vor (1.5) und Zeppelin:Aufgabe nicht serializable während benutzerdefinierten Datenrahmen Klasse Spark mit Scala

Wenn ich den folgenden Scala/Spark-Code ausführen, wird es richtig laufen:

// TEST NO PROBLEM SERIALIZATION 
val rdd = sc.parallelize(Seq(1, 2, 3)) 
val testList = List[String]("a", "b") 

rdd.map{a => 
    val aa = testList(0) 
    None} 

jedoch nach einem benutzerdefinierten Datenrahmen Typen deklarierte als here

//DATAFRAME EXTENSION 
import org.apache.spark.sql.DataFrame 

object ExtraDataFrameOperations { 
    implicit class DFWithExtraOperations(df : DataFrame) { 

    //drop several columns 
    def drop(colToDrop:Seq[String]):DataFrame = { 
     var df_temp = df 
     colToDrop.foreach{ case (f: String) => 
      df_temp = df_temp.drop(f)//can be improved with Spark 2.0 
     } 
     df_temp 
    } 
    } 
} 

und verwendet es zum Beispiel wie folgt vorgeschlagen:

//READ ALL THE FILES INTO different DF and save into map 
import ExtraDataFrameOperations._ 
val filename = "myInput.csv" 

val delimiter = "," 

val colToIgnore = Seq("c_9", "c_10") 

val inputICFfolder = "hdfs:///group/project/TestSpark/" 

val df = sqlContext.read 
      .format("com.databricks.spark.csv") 
      .option("header", "true") // Use first line of all files as header 
      .option("inferSchema", "false") // Automatically infer data types? => no cause we need to merge all df, with potential null values => keep string only 
      .option("delimiter", delimiter) 
      .option("charset", "UTF-8") 
      .load(inputICFfolder + filename) 
      .drop(colToIgnore)//call the customize dataframe 

Dies wird erfolgreich ausgeführt.

Nun, wenn ich erneut aus dem folgenden Code (wie oben)

// TEST NO PROBLEM SERIALIZATION 
val rdd = sc.parallelize(Seq(1, 2, 3)) 
val testList = List[String]("a", "b") 
rdd.map{a => 
    val aa = testList(0) 
    None} 

erhalte ich die Fehlermeldung:

rdd: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [8] um parallelisieren bei: 32 testList: Liste [String] = Liste (a, b) org.apache.spark.SparkException: Task nicht serialisierbar um org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 304) bei org.apache.spark.util.ClosureCleaner $ .org $ Apache $ funken $ util $ ClosureCleaner $$ sauber (ClosureCleaner.scala: 294) um org.apache.spark.util.ClosureCleaner $ .clean (ClosureCleaner.scala : 122) bei org.apache.spark.SparkContext.clean (SparkContext.scala: 2032) um org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply (RDD.scala: 314) .. verursacht durch: java.io.NotSerializableException: $ iwC $$ iwC $$ iwC $$ iwC $$$ iwC $$$ iwC $$$ iwC $$$ iwC $$$ iwC $ ExtraDataFrameOperations $ Serialisierungsstapel: - Objekt nicht serialisierbar (Klasse: $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$$ iwC $$$ iwC $$$ iwC $$$ iwC $ ExtraDataFrameOperations $, Wert: $ iwC $$ iwC $$ $ iwC $$$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ ExtraDataFrameOperatio ns $ @ 6c7e70e) - Feld (Klasse: $ iwC $$ iwC $$ iwC $$ iwC $$ $ iwC $$ $ iwC $$ $ iwC $$ $ iwC $$ iwC, Name: ExtraDataFrameOperations $ module, typ: Klasse $ iwC $$ iwC $$ iwC $$ iwC $$$ iwC $$$ iwC $$$ iwC $$$ iwC $$$ iwC $ ExtraDataFrameOperations $) - Objekt (Klasse $ iwC $$ iwC $$ iwC $$$ iwC $$$ iwC $$$ iwC $$ iwC $$ iwC $$$ iwC, $ iwC $$ iwC $$ iwC $$ iwC $$$ iwC $$$ iwC $$ iwC $$$ iwC $$$ iwC @ 4c6d0802) - Feld (Klasse: $ iwC $$$ iwC $$ iwC $$ iwC $$ iwC $$$ iwC $$$ iwC $$$ iwC, Name: $ iw, typ: Klasse $ iwC $$ iwC $$$ iwC $$$ iwC $$$ iwC $$$ iwC $$$ iwC $$ iwc $$ IWC) ...

ich verstehe nicht:

  • Warum ist dieser Fehler aufgetreten, während keine Operation für den Datenframe ausgeführt wurde?
  • Warum "ExtraDataFrameOperations" ist nicht serialisierbar, während es zuvor erfolgreich verwendet wurde ??

UPDATE:

Der Versuch, mit

@inline val testList = List[String]("a", "b") 

hilft nicht.

Antwort

0

Es scheint, als ob Spark versucht, den gesamten Bereich um testList zu serialisieren. Versuchen Sie, Daten @inline val testList = List[String]("a", "b") inline oder verwenden Sie ein anderes Objekt, wo Sie Funktion/Daten speichern, die Sie an Treiber übergeben.

+0

Leider _ @ inline_ hilft nicht – user2573552

+0

Und Speichern von Funktion/Daten in anderen Objekt passt nicht wirklich mit der Strategie, um Dataframe-Objekt anzupassen – user2573552