Ich benutze ein Out-of-the-Box-EMR-Cluster mit Spark 1.6.0 und Zeppelin 0.5.6 auf AWS. Mein Ziel ist es, einen einfachen Spark-Streaming-Kontext zu initialisieren und auf einen internen Kinesis-Stream zu verweisen, nur als Proof-of-Concept. Allerdings, wenn ich es laufen erhalte ich:Spark Streaming 1.6.0 EMR mit Python: ClassNotFoundException: org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper
Py4JJavaError: An error occurred while calling o89.loadClass. :
java.lang.ClassNotFoundException: org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Der Code I (über Zeppelin) renne einfach ist:
%pyspark
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
ssc = StreamingContext(sc, 1)
appName = '{my-app-name}'
streamName = '{my-stream-name}'
endpointUrl = '{my-endpoint}'
regionName = '{my-region}'
lines = KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 2)
Als ich in diese lokal lief, stellte ich sicher, Funken- zu bauen Streaming-Kinesis-asl von Quelle und schließen diese Gläser in meiner Funke config:
spark.driver.extraClassPath /path/to/kinesis/asl/assembly/jars/*
Allerdings kann ich scheinen dies, wenn auf EMR nicht zur Arbeit zu bekommen. Um sicher zu sein, habe ich es im folgenden ohne Erfolg miteingeschlossen:
spark.driver.extraClassPath
spark.driver.extraLibraryPath
spark.executor.extraClassPath
spark.executor.extraLibraryPath
Hat jemand in diese schon einmal hineingelaufen? Ich drucke die Spark-Konfiguration aus, wenn ich den Kontext neu starte, um zu bestätigen, dass diese Änderungen übernommen werden. Vielleicht muss dies auch auf den Slave-Knoten getan werden? Oder vielleicht eine andere Konfigurationsoption/Schlüssel insgesamt?
Dies hat den Trick! Für alle, die weitere Informationen benötigen, finden Sie die vollständige Zeppelin-Dokumentation [hier] (https://zeppelin.incubator.apache.org/docs/0.5.5-incubating/interpreter/spark.html) –