Wir haben eine Stream-Daten, für die ich einige Stamminformationen in einer HBase-Tabelle haben. Für jede Zeile muss ich nach der HBase-Haupttabelle suchen und einige Profilinformationen erhalten. Mein Code ist etwas wie dasSpark-Streaming-Filter-Bedingung innerhalb foreach - NullPointerException
val con = new setContext(hadoopHome,sparkMaster)
val l_sparkcontext = con.getSparkContext
val l_hivecontext = con.getHiveContext
val topicname = "events"
val ssc = new StreamingContext(l_sparkcontext, Seconds(30))
val eventsStream = KafkaUtils.createStream(ssc,"xxx.xxx.142.xxx:2181","receive_rest_events",Map(topicname.toString -> 10))
println("Kafka Stream for receiving Events..")
val profile_data = l_hivecontext.sql("select gender, income, age, riid from hbase_customer_profile")
profile_data.foreach(println)
val tabBC = l_sparkcontext.broadcast(profile_data)
eventsStream.foreachRDD(rdd => {
rdd.foreach(record => {
val subs_profile_rows = tabBC.value
val Rows = record._2.split(rowDelim)
Rows.foreach(row => {
val values = row.split(colDelim)
val riid = values(1).toInt
val cond = "riid = " + riid
println("Condition : ", cond)
val enriched_events = subs_profile_rows.filter(cond)
}) // End of Rows
}) // End of RDD
}) // End of Events Stream
Leider traf ich immer auf NPE auf dem Filter. Ich hatte hier einige Fragen und Antworten verfolgt, um Werte über Arbeiterknoten zu übertragen, aber nichts hilft. Kann mir bitte jemand helfen.
Grüße
Bala
Überprüfen Sie, ob Sie einen Wert verwenden, der nicht serialisiert werden kann. – cchantep
Ich bin mir nicht sicher, ob profile_data innerhalb der foreach erstellt werden soll und das ist was nicht serialisierbar ist. –