2016-04-28 8 views
0

Ich versuchte, den folloiwng einfachen Code in Zeppelin auszuführen:Zeppelin: muptiple SparkContexts Ausgabe

import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.{Logging, SparkConf, SparkContext} 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.dstream.DStream 

System.clearProperty("spark.driver.port") 
System.clearProperty("spark.hostPort") 

def maxWaitTimeMillis: Int = 20000 
def actuallyWait: Boolean = false 

val conf = new SparkConf().setMaster("local[2]").setAppName("Streaming test") 
var sc = new SparkContext(conf) 

def batchDuration: Duration = Seconds(1) 
val ssc = new StreamingContext(sc, batchDuration) 

Dies ist die Ausgabe in Zeppelin:

import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.{Logging, SparkConf, SparkContext} 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.dstream.DStream 
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD 
import org.apache.spark.mllib.regression.LabeledPoint 
calculateRMSE: (output: org.apache.spark.streaming.dstream.DStream[(Double, Double)], n: org.apache.spark.streaming.dstream.DStream[Long])Double 
res50: String = null 
res51: String = null 
maxWaitTimeMillis: Int 
actuallyWait: Boolean 
conf: org.apache.spark.SparkConf = [email protected] 
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at: 
org.apache.spark.SparkContext.<init>(SparkContext.scala:82) 
org.apache.zeppelin.spark.SparkInterpreter.createSparkContext(SparkInterpreter.java:356) 
org.apache.zeppelin.spark.SparkInterpreter.getSparkContext(SparkInterpreter.java:150) 
org.apache.zeppelin.spark.SparkInterpreter.open(SparkInterpreter.java:525) 
org.apache.zeppelin.interpreter.ClassloaderInterpreter.open(ClassloaderInterpreter.java:74) 
org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:68) 
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:92) 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:345) 
org.apache.zeppelin.scheduler.Job.run(Job.java:176) 
org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:139) 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
java.util.concurrent.FutureTask.run(FutureTask.java:266) 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
java.lang.Thread.run(Thread.java:745) 
    at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2257) 
    at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2239) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2239) 
    at org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2312) 
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:91) 

Warum muss es sagen, dass ich mehrere SparkContexts laufen ? Wenn ich die Zeile var sc = new SparkContext(conf) nicht hinzufüge, dann ist scnull, also wird es nicht erstellt.

+0

der SparkContext sollte automatisch mit dem Namen sc von Zeppelin erstellt werden .. Ich weiß in Ihrem Beitrag haben Sie gesagt, dass es null ist, aber es sollte nicht ... – mgaido

+0

@ mark91: ok, du hast Recht. Ich habe den Code überprüft und "sc" ist wirklich erstellt. Das Problem besteht nun beim Setzen des Checkpoint-Verzeichnisses. – Klue

+0

Welches ist dein Problem damit? Hast du versucht, ssc.checkpoint ("/ my_cwonderful_checkpoint_dir") zu machen? – mgaido

Antwort

1

Sie können nicht mehrere SparkContexte in Zeppelin verwenden. Es ist eine seiner Einschränkungen, da er tatsächlich einen Webhook zu einem SparkContext erstellt.

Wenn Sie Ihre SparkConf in Zeppelin einrichten möchten, ist es am einfachsten, diese Eigenschaften im Interpreter-Menü zu setzen und den Interpreter neu zu starten, um diese Konfiguration in Ihrem SparkContext zu übernehmen.

Jetzt können Sie Ihr Notebook zurück und testen Sie Ihr Code:

import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.{Logging, SparkConf, SparkContext} 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.dstream.DStream 

def maxWaitTimeMillis: Int = 20000 
def actuallyWait: Boolean = false 

def batchDuration: Duration = Seconds(1) 
val ssc = new StreamingContext(sc, batchDuration) 

Mehr zu diesem here.

+0

Danke. Es besagt, dass ich auch das Checkpoint-Verzeichnis setzen sollte. Es kann wie folgt durchgeführt werden: 'ssc.checkpoint (SparkCheckpointDir)', aber wie definiert man 'SparkCheckpointDir'? – Klue

+0

Sie können das Checkpoint-Verzeichnis auch im Interpreter-Menü festlegen. Sie sehen eine Funkeneigenschaften mit Name und Wert. – eliasah

+0

Kennen Sie ein Tutorial, das zeigt, wie das geht? Außerdem schätze ich es, wenn Sie erklären, wie Sie einen Prüfpunkt programmgesteuert setzen. Danke vielmals. – Klue