2016-08-05 37 views
0

Ich habe ein atypisches Problem. Wenn ich versuche, die empfangene rdd von kafka zu verarbeiten, bekomme ich eine Ausnahme (java.lang.NullPointerException), wenn ich versuche, auf sparkContext zuzugreifen. RDDProcessor ist serializableSpark Streaming verlieren SparkContext

def convertArrayToDataFrame(byteArray: Array[Byte], sqlContext: SQLContext) = { 
val stringFromByte = b2s(byteArray) 
val rdd = sqlContext.sparkContext.parallelize(stringFromByte.split("\n")) 
val rows = rdd.map(_.split(";")).map(attributes => Row.fromSeq(attributes.toSeq)) 
val dateframe = sqlContext.createDataFrame(rows,RDDComponents.schema) 
dateframe 
} 

Das Problem beginnt diese:

val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER) 
val sqlContext = new SQLContext(ssc.sparkContext) 
receiver.foreachRDD { rdd => 
    log.info("Received RDD attempt") 
    if (!rdd.isEmpty()) { 
     rdd.foreach(a => rddProcessor.processingRDD(a, sqlContext)) 
    } 

Jedoch, wenn ich nur die erste rdd Verarbeitung, tritt das Problem nicht

val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER) 
val sqlContext = new SQLContext(ssc.sparkContext) 
receiver.foreachRDD { rdd => 
    log.info("Received RDD attempt") 
    if (!rdd.isEmpty()) { 
    rddProcessor.processingRDD(rdd.first(), sqlContext) 
    } 

Ich weiß nicht wirklich, warum es so problematisch. Wenn jemand Tipps haben will ich

@EDIT dankbar sein, dass ich Streaming

val sparkConf = new SparkConf().setAppName("KafkaConsumer") 
val ssc = new StreamingContext(sparkConf, Milliseconds(batchDuration)) 
+0

Können Sie Code zur Verfügung stellen, wo Sie Ihren ssc: SparkContext definieren? – ponkin

+0

ok, habe ich dies zu post – kasiula03

Antwort

0

Nun definieren, ist SparkContext nicht serialisierbar, und es in SqlContext über SparkSession verfügbar ist, wo es als @transient markiert ist. Wenn Sie also processingRDD nicht so schreiben können, dass es niemals SparkContext verwendet, können Sie es nicht in einem Lambda verwenden, das serialisiert werden muss, wie foreach oder maps Argument (aber nicht foreachRDD!).

+0

Ich verstehe, aber warum es für eine rdd Arbeit? Wenn das Objekt serialisiert wird, werden auch alle Funktionen serialisiert? – kasiula03

+0

'foreachRDD' sendet sein Argument nicht an andere Knoten, es wird vollständig auf dem Treiberknoten ausgeführt (und' rdd.first() 'sendet das Element von einem anderen Knoten zum Treiberknoten). 'rdd.foreach' muss sein Argument an jeden Knoten senden, um es dort auszuführen. –

+0

Wie konnte ich dieses Problem überspringen, wenn ich diese rdd mit sparkContext verarbeiten muss? rdd.colect() funktioniert, aber es ist nicht korrekt – kasiula03