Das folgende Java-Programm wurde geschrieben, um mit Apache Funke zu experimentieren.Aufgabe nicht Serialisierung Ausnahme beim Ausführen von Apache Funke Job
Das Programm versucht, eine Liste positiver und negativer Wörter aus einer entsprechenden Datei zu lesen, diese mit der Hauptdatei zu vergleichen und die Ergebnisse entsprechend zu filtern.
import java.io.Serializable;
import java.io.FileNotFoundException;
import java.io.File;
import java.util.*;
import java.util.Iterator;
import java.util.List;
import java.util.List;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
public class SimpleApp implements Serializable{
public static void main(String[] args) {
String logFile = "/tmp/master.txt"; // Should be some file on your system
String positive = "/tmp/positive.txt"; // Should be some file on your system
String negative = "/tmp/negative.txt"; // Should be some file on your system
JavaSparkContext sc = new JavaSparkContext("local[4]", "Twitter Analyzer", "/home/welcome/Downloads/spark-1.1.0/", new String[]{"target/scala-2.10/Simple-assembly-0.1.0.jar"});
JavaRDD<String> positiveComments = sc.textFile(logFile).cache();
List<String> positiveList = GetSentiments(positive);
List<String> negativeList= GetSentiments(negative);
final Iterator<String> iterator = positiveList.iterator();
int i = 0;
while (iterator.hasNext())
{
JavaRDD<String> numAs = positiveComments.filter(new Function<String, Boolean>()
{
public Boolean call(String s)
{
return s.contains(iterator.next());
}
});
numAs.saveAsTextFile("/tmp/output/"+ i);
i++;
}
}
public static List<String> GetSentiments(String fileName) {
List<String> input = new ArrayList<String>();
try
{
Scanner sc = new Scanner(new File(fileName));
while (sc.hasNextLine()) {
input.add(sc.nextLine());
}
}
catch (FileNotFoundException e){
// do stuff here..
}
return input;
}
}
Der folgende Fehler wird
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.filter(RDD.scala:282)
at org.apache.spark.api.java.JavaRDD.filter(JavaRDD.scala:78)
at SimpleApp.main(SimpleApp.java:37)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: java.util.ArrayList$Itr
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 12 more
Alle Zeiger während der Ausführung von Funken Job, geworfen ??