Ich muss Tweets aus einem Kafka Topic konsumieren und gleich in HBase aufnehmen. Das Folgende ist der Code, den ich geschrieben habe, aber das funktioniert nicht richtig.Scala - Tweets abonnieren - Kafka Topic und Ingest in HBase
Der Hauptcode ruft nicht die Methode "convert" auf, und daher werden keine Datensätze in HBase-Tabelle aufgenommen. Kann mir bitte jemand helfen?
tweetskafkaStream.foreachRDD(rdd => {
println("Inside For Each RDD")
rdd.foreachPartition(record => {
println("Inside For Each Partition")
val data = record.map(r => (r._1, r._2)).map(convert)
})
})
def convert(t: (String, String)) = {
println("in convert")
//println("first param value ", t._1)
//println("second param value ", t._2)
val hConf = HBaseConfiguration.create()
hConf.set(TableOutputFormat.OUTPUT_TABLE,hbaseTableName)
hConf.set("hbase.zookeeper.quorum", "192.168.XXX.XXX:2181")
hConf.set("hbase.master", "192.168.XXX.XXX:16000")
hConf.set("hbase.rootdir","hdfs://192.168.XXX.XXX:9000/hbase")
val today = Calendar.getInstance.getTime
val printformat = new SimpleDateFormat("yyyyMMddHHmmss")
val id = printformat.format(today)
val p = new Put(Bytes.toBytes(id))
p.add(Bytes.toBytes("data"), Bytes.toBytes("tweet_text"),(t._2).getBytes())
(id, p)
val mytable = new HTable(hConf,hbaseTableName)
mytable.put(p)
}
Ich mag nicht die aktuelle Datumzeit als Schlüssel (t._1) verwenden und damit das Verfahren in meinem convert zu konstruieren.
Dank
Bala
Sind Sie sicher, dass Sie den Hochtöner richtig gelesen haben? Siehst du überhaupt Tweets? –
Ja, ich kann die eingehenden Tweets sehen. –