2016-07-20 9 views
0

Ich habe einen kleinen Scala-Code, der auf Spark-Shell funktioniert, aber nicht in Eclipse mit Scala-Plugin. Ich kann hdfs zugreifen Plugin versucht, eine andere Datei zu schreiben und es funktionierte ..Code funktioniert in Spark-Shell nicht in Eclipse

FirstSpark.scala

package bigdata.spark 
import org.apache.spark.SparkConf 
import java. io. _ 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 

object FirstSpark { 

    def main(args: Array[String])={ 
    val conf = new SparkConf().setMaster("local").setAppName("FirstSparkProgram") 
    val sparkcontext = new SparkContext(conf) 
    val textFile =sparkcontext.textFile("hdfs://pranay:8020/spark/linkage") 
    val m = new Methods() 
    val q =textFile.filter(x => !m.isHeader(x)).map(x=> m.parse(x)) 
    q.saveAsTextFile("hdfs://pranay:8020/output") } 
} 

Methods.scala

package bigdata.spark 
import java.util.function.ToDoubleFunction 

class Methods { 
def isHeader(s:String):Boolean={ 
    s.contains("id_1") 
} 
def parse(line:String) ={ 
    val pieces = line.split(',') 
    val id1=pieces(0).toInt 
    val id2=pieces(1).toInt 
    val matches=pieces(11).toBoolean 
    val mapArray=pieces.slice(2, 11).map(toDouble) 
    MatchData(id1,id2,mapArray,matches) 
    } 
def toDouble(s: String) = { 
    if ("?".equals(s)) Double.NaN else s.toDouble 
} 
} 
case class MatchData(id1: Int, id2: Int, 
scores: Array[Double], matched: Boolean) 

Fehlermeldung:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) 
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:335) 
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:334) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:310) 

Can jemand bitte helfen Sie mir mit diesem

Antwort

0

Versuchen Sie, class Methods { .. } zu object Methods { .. } zu ändern.

Ich denke, das Problem ist um val q =textFile.filter(x => !m.isHeader(x)).map(x=> m.parse(x)). Wenn Spark die Funktionen filter und map sieht, versucht es, die an sie übergebenen Funktionen zu serialisieren (x => !m.isHeader(x) und x=> m.parse(x)), so dass es die Aufgabe der Ausführung an alle Executoren absetzen kann (dies ist die genannte Task). Dazu muss jedoch m serialisiert werden, da dieses Objekt innerhalb der Funktion referenziert wird (es ist in der Schließung der beiden anonymen Methoden) - aber dies kann nicht geschehen, da nicht serialisierbar ist. Sie könnten extends Serializable zu der Methods Klasse hinzufügen, aber in diesem Fall ist eine object geeigneter (und ist bereits serialisierbar).

+0

Danke für das Alec. Es funktionierte. – Pranay

+0

@Pranay Froh, dass ich geholfen habe! Selbst wenn Sie 'x =>! (New Methods()) verwendet hätten, hätten Sie immer noch das selbe Problem, da Spark versuchen würde, alle Methoden zu serialisieren, nur um die Definition zu erhalten von 'isHeader'. – Alec