Ich bin neu in Spark und HBase, aber ich muss die beiden miteinander verbinden, ich habe versucht, die Bibliothek Funken-HBase-Connector aber mit Funken-Submit funktioniert es nicht, obwohl kein Fehler ist gezeigt. Ich habe hier und anderswo nach einem ähnlichen Problem oder einem Tutorial gesucht, konnte aber keins finden. Kann mir jemand erklären, wie man aus Spark-Streaming in HBase schreibt oder ein Tutorial oder ein Buch empfiehlt? Vielen Dank im VorausVerknüpfung von Spark-Streaming zu HBase
0
A
Antwort
1
Was war schließlich arbeitete:
val hconf = HBaseConfiguration.create()
val hTable = new HTable(hconf, "mytab")
val thePut = new Put(Bytes.toBytes(row))
thePut.add(Bytes.toBytes("colfamily"), Bytes.toBytes("c1"), Bytes.toBytes(value)
hTable.put(thePut)
0
Hier einige Beispiel-Code ist mit Splice-Maschine (Open Source) zum Speichern von Daten in HBase über Spark-Streaming und Kafka ...
Wir kämpften uns auch durch und wissen, dass es ein wenig entmutigend sein kann. Hier
+0
Bitte fügen Sie die relevanten Teile in die Antwort ein, da dieser Link möglicherweise wegfällt. – Beryllium
0
ist der entsprechende Code ...
LOG.info("************ SparkStreamingKafka.processKafka start");
// Create the spark application and set the name to MQTT
SparkConf sparkConf = new SparkConf().setAppName("KAFKA");
// Create the spark streaming context with a 'numSeconds' second batch size
jssc = new JavaStreamingContext(sparkConf, Durations.seconds(numSeconds));
jssc.checkpoint(checkpointDirectory);
LOG.info("zookeeper:" + zookeeper);
LOG.info("group:" + group);
LOG.info("numThreads:" + numThreads);
LOG.info("numSeconds:" + numSeconds);
Map<String, Integer> topicMap = new HashMap<>();
for (String topic: topics) {
LOG.info("topic:" + topic);
topicMap.put(topic, numThreads);
}
LOG.info("************ SparkStreamingKafka.processKafka about to read the MQTTUtils.createStream");
//2. KafkaUtils to collect Kafka messages
JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper, group, topicMap);
//Convert each tuple into a single string. We want the second tuple
JavaDStream<String> lines = messages.map(new TupleFunction());
LOG.info("************ SparkStreamingKafka.processKafka about to do foreachRDD");
//process the messages on the queue and save them to the database
lines.foreachRDD(new SaveRDDWithVTI());
LOG.info("************ SparkStreamingKafka.processKafka prior to context.strt");
// Start the context
jssc.start();
jssc.awaitTermination();
Diese Frage ist Wegthema auf vielen Ebenen. Lesen Sie weiter, wie Sie Fragen zu SO stellen und welche Frage hier beantwortet wird. – eliasah