1

Ich benutze ein Out-of-the-Box-EMR-Cluster mit Spark 1.6.0 und Zeppelin 0.5.6 auf AWS. Mein Ziel ist es, einen einfachen Spark-Streaming-Kontext zu initialisieren und auf einen internen Kinesis-Stream zu verweisen, nur als Proof-of-Concept. Allerdings, wenn ich es laufen erhalte ich:Spark Streaming 1.6.0 EMR mit Python: ClassNotFoundException: org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper

Py4JJavaError: An error occurred while calling o89.loadClass. : 
java.lang.ClassNotFoundException: org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 

Der Code I (über Zeppelin) renne einfach ist:

%pyspark 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream 

ssc = StreamingContext(sc, 1) 

appName = '{my-app-name}' 
streamName = '{my-stream-name}' 
endpointUrl = '{my-endpoint}' 
regionName = '{my-region}' 

lines = KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 2) 

Als ich in diese lokal lief, stellte ich sicher, Funken- zu bauen Streaming-Kinesis-asl von Quelle und schließen diese Gläser in meiner Funke config:

spark.driver.extraClassPath /path/to/kinesis/asl/assembly/jars/* 

Allerdings kann ich scheinen dies, wenn auf EMR nicht zur Arbeit zu bekommen. Um sicher zu sein, habe ich es im folgenden ohne Erfolg miteingeschlossen:

spark.driver.extraClassPath 
spark.driver.extraLibraryPath 
spark.executor.extraClassPath 
spark.executor.extraLibraryPath 

Hat jemand in diese schon einmal hineingelaufen? Ich drucke die Spark-Konfiguration aus, wenn ich den Kontext neu starte, um zu bestätigen, dass diese Änderungen übernommen werden. Vielleicht muss dies auch auf den Slave-Knoten getan werden? Oder vielleicht eine andere Konfigurationsoption/Schlüssel insgesamt?

Antwort

2

Fügen Sie die Abhängigkeit zum Zeppelin-Kontext "z" hinzu. Hier ein Beispiel für das Hinzufügen des Sparksv-Pakets

%dep 
z.load("com.databricks:spark-csv_2.11:1.3.0") 
+0

Dies hat den Trick! Für alle, die weitere Informationen benötigen, finden Sie die vollständige Zeppelin-Dokumentation [hier] (https://zeppelin.incubator.apache.org/docs/0.5.5-incubating/interpreter/spark.html) –