2016-03-27 1 views
0

Ich bin neu in Spark und HBase, aber ich muss die beiden miteinander verbinden, ich habe versucht, die Bibliothek Funken-HBase-Connector aber mit Funken-Submit funktioniert es nicht, obwohl kein Fehler ist gezeigt. Ich habe hier und anderswo nach einem ähnlichen Problem oder einem Tutorial gesucht, konnte aber keins finden. Kann mir jemand erklären, wie man aus Spark-Streaming in HBase schreibt oder ein Tutorial oder ein Buch empfiehlt? Vielen Dank im VorausVerknüpfung von Spark-Streaming zu HBase

+0

Diese Frage ist Wegthema auf vielen Ebenen. Lesen Sie weiter, wie Sie Fragen zu SO stellen und welche Frage hier beantwortet wird. – eliasah

Antwort

1

Was war schließlich arbeitete:

val hconf = HBaseConfiguration.create() 
val hTable = new HTable(hconf, "mytab") 
val thePut = new Put(Bytes.toBytes(row)) 
thePut.add(Bytes.toBytes("colfamily"), Bytes.toBytes("c1"), Bytes.toBytes(value) 
hTable.put(thePut) 
0

ist der entsprechende Code ...

 LOG.info("************ SparkStreamingKafka.processKafka start"); 

    // Create the spark application and set the name to MQTT 
    SparkConf sparkConf = new SparkConf().setAppName("KAFKA"); 

    // Create the spark streaming context with a 'numSeconds' second batch size 
    jssc = new JavaStreamingContext(sparkConf, Durations.seconds(numSeconds)); 
    jssc.checkpoint(checkpointDirectory); 

    LOG.info("zookeeper:" + zookeeper); 
    LOG.info("group:" + group); 
    LOG.info("numThreads:" + numThreads); 
    LOG.info("numSeconds:" + numSeconds); 


    Map<String, Integer> topicMap = new HashMap<>(); 
    for (String topic: topics) { 
     LOG.info("topic:" + topic); 
     topicMap.put(topic, numThreads); 
    } 

    LOG.info("************ SparkStreamingKafka.processKafka about to read the MQTTUtils.createStream"); 
    //2. KafkaUtils to collect Kafka messages 
    JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper, group, topicMap); 

    //Convert each tuple into a single string. We want the second tuple 
    JavaDStream<String> lines = messages.map(new TupleFunction()); 

    LOG.info("************ SparkStreamingKafka.processKafka about to do foreachRDD"); 
    //process the messages on the queue and save them to the database 
    lines.foreachRDD(new SaveRDDWithVTI()); 


    LOG.info("************ SparkStreamingKafka.processKafka prior to context.strt"); 
    // Start the context 
    jssc.start(); 
    jssc.awaitTermination();