2016-07-22 16 views
0

Ich muss Tweets aus einem Kafka Topic konsumieren und gleich in HBase aufnehmen. Das Folgende ist der Code, den ich geschrieben habe, aber das funktioniert nicht richtig.Scala - Tweets abonnieren - Kafka Topic und Ingest in HBase

Der Hauptcode ruft nicht die Methode "convert" auf, und daher werden keine Datensätze in HBase-Tabelle aufgenommen. Kann mir bitte jemand helfen?

tweetskafkaStream.foreachRDD(rdd => { 
    println("Inside For Each RDD") 
    rdd.foreachPartition(record => { 
    println("Inside For Each Partition") 
    val data = record.map(r => (r._1, r._2)).map(convert) 
    }) 
    }) 

def convert(t: (String, String)) = { 
    println("in convert") 
    //println("first param value ", t._1) 
    //println("second param value ", t._2) 

    val hConf = HBaseConfiguration.create() 
    hConf.set(TableOutputFormat.OUTPUT_TABLE,hbaseTableName) 
    hConf.set("hbase.zookeeper.quorum", "192.168.XXX.XXX:2181") 
    hConf.set("hbase.master", "192.168.XXX.XXX:16000") 
    hConf.set("hbase.rootdir","hdfs://192.168.XXX.XXX:9000/hbase") 
    val today = Calendar.getInstance.getTime 
    val printformat = new SimpleDateFormat("yyyyMMddHHmmss") 

    val id = printformat.format(today) 
    val p = new Put(Bytes.toBytes(id)) 

    p.add(Bytes.toBytes("data"), Bytes.toBytes("tweet_text"),(t._2).getBytes()) 
    (id, p) 

    val mytable = new HTable(hConf,hbaseTableName) 
    mytable.put(p) 
} 

Ich mag nicht die aktuelle Datumzeit als Schlüssel (t._1) verwenden und damit das Verfahren in meinem convert zu konstruieren.

Dank

Bala

+0

Sind Sie sicher, dass Sie den Hochtöner richtig gelesen haben? Siehst du überhaupt Tweets? –

+0

Ja, ich kann die eingehenden Tweets sehen. –

Antwort

0

Statt foreachPartition, habe ich es foreach. Das hat gut funktioniert.