Ich habe ein atypisches Problem. Wenn ich versuche, die empfangene rdd von kafka zu verarbeiten, bekomme ich eine Ausnahme (java.lang.NullPointerException), wenn ich versuche, auf sparkContext zuzugreifen. RDDProcessor ist serializableSpark Streaming verlieren SparkContext
def convertArrayToDataFrame(byteArray: Array[Byte], sqlContext: SQLContext) = {
val stringFromByte = b2s(byteArray)
val rdd = sqlContext.sparkContext.parallelize(stringFromByte.split("\n"))
val rows = rdd.map(_.split(";")).map(attributes => Row.fromSeq(attributes.toSeq))
val dateframe = sqlContext.createDataFrame(rows,RDDComponents.schema)
dateframe
}
Das Problem beginnt diese:
val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER)
val sqlContext = new SQLContext(ssc.sparkContext)
receiver.foreachRDD { rdd =>
log.info("Received RDD attempt")
if (!rdd.isEmpty()) {
rdd.foreach(a => rddProcessor.processingRDD(a, sqlContext))
}
Jedoch, wenn ich nur die erste rdd Verarbeitung, tritt das Problem nicht
val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER)
val sqlContext = new SQLContext(ssc.sparkContext)
receiver.foreachRDD { rdd =>
log.info("Received RDD attempt")
if (!rdd.isEmpty()) {
rddProcessor.processingRDD(rdd.first(), sqlContext)
}
Ich weiß nicht wirklich, warum es so problematisch. Wenn jemand Tipps haben will ich
@EDIT dankbar sein, dass ich Streaming
val sparkConf = new SparkConf().setAppName("KafkaConsumer")
val ssc = new StreamingContext(sparkConf, Milliseconds(batchDuration))
Können Sie Code zur Verfügung stellen, wo Sie Ihren ssc: SparkContext definieren? – ponkin
ok, habe ich dies zu post – kasiula03