ich mit Scala/Funken ein seltsames Problem bin vor (1.5) und Zeppelin:Aufgabe nicht serializable während benutzerdefinierten Datenrahmen Klasse Spark mit Scala
Wenn ich den folgenden Scala/Spark-Code ausführen, wird es richtig laufen:
// TEST NO PROBLEM SERIALIZATION
val rdd = sc.parallelize(Seq(1, 2, 3))
val testList = List[String]("a", "b")
rdd.map{a =>
val aa = testList(0)
None}
jedoch nach einem benutzerdefinierten Datenrahmen Typen deklarierte als here
//DATAFRAME EXTENSION
import org.apache.spark.sql.DataFrame
object ExtraDataFrameOperations {
implicit class DFWithExtraOperations(df : DataFrame) {
//drop several columns
def drop(colToDrop:Seq[String]):DataFrame = {
var df_temp = df
colToDrop.foreach{ case (f: String) =>
df_temp = df_temp.drop(f)//can be improved with Spark 2.0
}
df_temp
}
}
}
und verwendet es zum Beispiel wie folgt vorgeschlagen:
//READ ALL THE FILES INTO different DF and save into map
import ExtraDataFrameOperations._
val filename = "myInput.csv"
val delimiter = ","
val colToIgnore = Seq("c_9", "c_10")
val inputICFfolder = "hdfs:///group/project/TestSpark/"
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "false") // Automatically infer data types? => no cause we need to merge all df, with potential null values => keep string only
.option("delimiter", delimiter)
.option("charset", "UTF-8")
.load(inputICFfolder + filename)
.drop(colToIgnore)//call the customize dataframe
Dies wird erfolgreich ausgeführt.
Nun, wenn ich erneut aus dem folgenden Code (wie oben)
// TEST NO PROBLEM SERIALIZATION
val rdd = sc.parallelize(Seq(1, 2, 3))
val testList = List[String]("a", "b")
rdd.map{a =>
val aa = testList(0)
None}
erhalte ich die Fehlermeldung:
rdd: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [8] um parallelisieren bei: 32 testList: Liste [String] = Liste (a, b) org.apache.spark.SparkException: Task nicht serialisierbar um org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 304) bei org.apache.spark.util.ClosureCleaner $ .org $ Apache $ funken $ util $ ClosureCleaner $$ sauber (ClosureCleaner.scala: 294) um org.apache.spark.util.ClosureCleaner $ .clean (ClosureCleaner.scala : 122) bei org.apache.spark.SparkContext.clean (SparkContext.scala: 2032) um org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply (RDD.scala: 314) .. verursacht durch: java.io.NotSerializableException: $ iwC $$ iwC $$ iwC $$ iwC $$$ iwC $$$ iwC $$$ iwC $$$ iwC $$$ iwC $ ExtraDataFrameOperations $ Serialisierungsstapel: - Objekt nicht serialisierbar (Klasse: $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$$ iwC $$$ iwC $$$ iwC $$$ iwC $ ExtraDataFrameOperations $, Wert: $ iwC $$ iwC $$ $ iwC $$$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ ExtraDataFrameOperatio ns $ @ 6c7e70e) - Feld (Klasse: $ iwC $$ iwC $$ iwC $$ iwC $$ $ iwC $$ $ iwC $$ $ iwC $$ $ iwC $$ iwC, Name: ExtraDataFrameOperations $ module, typ: Klasse $ iwC $$ iwC $$ iwC $$ iwC $$$ iwC $$$ iwC $$$ iwC $$$ iwC $$$ iwC $ ExtraDataFrameOperations $) - Objekt (Klasse $ iwC $$ iwC $$ iwC $$$ iwC $$$ iwC $$$ iwC $$ iwC $$ iwC $$$ iwC, $ iwC $$ iwC $$ iwC $$ iwC $$$ iwC $$$ iwC $$ iwC $$$ iwC $$$ iwC @ 4c6d0802) - Feld (Klasse: $ iwC $$$ iwC $$ iwC $$ iwC $$ iwC $$$ iwC $$$ iwC $$$ iwC, Name: $ iw, typ: Klasse $ iwC $$ iwC $$$ iwC $$$ iwC $$$ iwC $$$ iwC $$$ iwC $$ iwc $$ IWC) ...
ich verstehe nicht:
- Warum ist dieser Fehler aufgetreten, während keine Operation für den Datenframe ausgeführt wurde?
- Warum "ExtraDataFrameOperations" ist nicht serialisierbar, während es zuvor erfolgreich verwendet wurde ??
UPDATE:
Der Versuch, mit
@inline val testList = List[String]("a", "b")
hilft nicht.
Leider _ @ inline_ hilft nicht – user2573552
Und Speichern von Funktion/Daten in anderen Objekt passt nicht wirklich mit der Strategie, um Dataframe-Objekt anzupassen – user2573552