2016-08-02 27 views
0

Wir haben eine Stream-Daten, für die ich einige Stamminformationen in einer HBase-Tabelle haben. Für jede Zeile muss ich nach der HBase-Haupttabelle suchen und einige Profilinformationen erhalten. Mein Code ist etwas wie dasSpark-Streaming-Filter-Bedingung innerhalb foreach - NullPointerException

val con    = new setContext(hadoopHome,sparkMaster) 
val l_sparkcontext = con.getSparkContext 
val l_hivecontext = con.getHiveContext 

val topicname  = "events" 
val ssc    = new StreamingContext(l_sparkcontext, Seconds(30)) 
val eventsStream = KafkaUtils.createStream(ssc,"xxx.xxx.142.xxx:2181","receive_rest_events",Map(topicname.toString -> 10)) 
println("Kafka Stream for receiving Events..") 

val profile_data = l_hivecontext.sql("select gender, income, age, riid from hbase_customer_profile") 
profile_data.foreach(println) 
val tabBC = l_sparkcontext.broadcast(profile_data) 

eventsStream.foreachRDD(rdd => { 
    rdd.foreach(record => { 
    val subs_profile_rows = tabBC.value 
    val Rows = record._2.split(rowDelim) 
    Rows.foreach(row => { 
     val values = row.split(colDelim) 
     val riid = values(1).toInt 
     val cond = "riid = " + riid 
     println("Condition : ", cond) 
     val enriched_events = subs_profile_rows.filter(cond) 
    }) // End of Rows 
    }) // End of RDD 
}) // End of Events Stream 

Leider traf ich immer auf NPE auf dem Filter. Ich hatte hier einige Fragen und Antworten verfolgt, um Werte über Arbeiterknoten zu übertragen, aber nichts hilft. Kann mir bitte jemand helfen.

Grüße

Bala

+0

Überprüfen Sie, ob Sie einen Wert verwenden, der nicht serialisiert werden kann. – cchantep

+0

Ich bin mir nicht sicher, ob profile_data innerhalb der foreach erstellt werden soll und das ist was nicht serialisierbar ist. –

Antwort

0

Ihre Kontextverwendung sieht ein bisschen faul ... Für mich ist es wie du (ein Funke, eine für Funken Streaming) sind die Schaffung von zwei getrennten Kontexten aussieht und dann zu versuchen, teilen Sie eine Broadcast-Variable zwischen diesen Kontexten (das wird nicht funktionieren).

Wir haben einen Code, den wir darüber geschrieben haben, der ähnlich war. Hier sind die Videos, die zeigen, wie wir es in Splice Machine (Open Source) gemacht haben, falls Sie interessiert sind. Ich werde versuchen, den Code zu finden oder ihn von jemand anderem posten lassen.

http://community.splicemachine.com/splice-machine-tutorial-video-configuring-kafka-feed-splice-machine-part/

http://community.splicemachine.com/splice-machine-tutorial-video-configuring-kafka-feed-splice-machine-ii/

Viel Glück.

+0

Danke John. Ich werde in das Video schauen. Voraussetzung ist das Lesen von Profilinformationen aus der HBase-Tabelle für die Daten, die von DStream stammen. Ich habe forEachPartition auch (wird den geänderten Code als nächsten Kommentar), aber das gibt mir verschiedene Fehler. Ich werde auf den Code warten, wenn Sie es bitte bekommen können. Vielen Dank für die Hilfe –

+0

Aus Platzgründen muss ich meinen Code in 2 Posts teilen. - Startklasse setContext (argHadoopHome: String, argSparkMaster: String) { System.setProperty ("hadoop.home.dir", argHadoopHome) val conf = neu SparkConf(). SetMaster (argSparkMaster); conf.setAppName ("Evts"); Privat val l_valSparkContext = new SparkContext (conf) privaten val l_hiveContext = new HiveContext (l_valSparkContext) def getSparkContext = l_valSparkContext def getHiveContext = l_hiveContext def getconfContext = conf } –

+0

Objekt receiveEvents { def main (args: Array [String]): Einheit = { var rD = "\ r \ n" var cD = "" var sM = "spark: // nm2: 7077" var ip = "NM2: 2181" var hadoopHome = "/ home/.." val con = neuer setContext (ip, sM) val l_sparkcontext = con.getSparkContext val topicname = "evt" val ssc = new Streaming (l_sparkcontext, Sekunden (9)) val eventsStream \t = KafkaUtils.createStream (ssc "NM2: 2181", "RCV", Karte (topicname.toString -> 2)) val profile_data = w_hivecontext.sql ("select Geschlecht, Einkommen, Alter von hb_cust_pro") –