Ziel: Lesen Sie Kafka mit Spark-Streaming und speichern Sie Daten in Cassandra Von: Java Spark Cassandra-Anschluss 1.6 Dateneingabe: einfache JSON-Linie Objekt {"ID": "1", "field1": " Wert1}java Spark Streaming nach Cassandra
Ive eine Java-Klasse von kafka durch Funken-Streaming, die Verarbeitung der Daten lesen und speichern es in cassandra zu lesen
hier ist der Hauptcode.
**JavaPairReceiverInputDStream**<String, String> messages =
KafkaUtils.createStream(ssc,
targetKafkaServerPort, targetTopic, topicMap);
**JavaDStream** list = messages.map(new Function<Tuple2<String,String>,List<Object>>(){
public List<Object> call( Tuple2<String,String> tuple2){
List<Object> **list**=new ArrayList<Object>();
Gson gson = new Gson();
MyClass myclass = gson.fromJson(tuple2._2(), MyClass.class);
myclass.setNewData("new_data");
String jsonInString = gson.toJson(myclass);
list.add(jsonInString);
return list;
}
});
die nächste Code ist falsch:
**javaFunctions**(list)
.writerBuilder("schema", "table", mapToRow(JavaDStream.class))
.saveToCassandra();
Da „javaFunctions“ Methode erwartet ein JavaRDD Objekt und „Liste“ eine JavaDStream ist ...
Id brauchen JavaDStream zu JavaRDD werfen, aber ich nicht den richtigen Weg finden. ..
Irgendwelche Hilfe?