Ich benutze Apache Spark Streaming 1.6.1, um eine Java-Anwendung zu schreiben, die zwei Key/Value-Datenströme verbindet und die Ausgabe in HDFS schreibt. Die zwei Datenströme enthalten K/V-Zeichenfolgen und werden regelmäßig in Spark von HDFS mithilfe von TextFileStream() aufgenommen.Übertragen von Datenströmen über mehrere Stapelintervalle hinweg in Spark Streaming
Die beiden Datenströme sind nicht synchronisiert, was bedeutet, dass einige Schlüssel, die zum Zeitpunkt t0 in stream1 sind, zum Zeitpunkt t1 in stream2 erscheinen können, oder umgekehrt. Daher ist es mein Ziel, die beiden Streams zu verbinden und "übrig gebliebene" Schlüssel zu berechnen, die für die Join-Operation in den nächsten Batch-Intervallen in Betracht gezogen werden sollten.
Um dies besser zu verdeutlichen, schauen Sie sich den folgenden Algorithmus:
variables:
stream1 = <String, String> input stream at time t1
stream2 = <String, String> input stream at time t1
left_keys_s1 = <String, String> records of stream1 that didn't appear in the join at time t0
left_keys_s2 = <String, String> records of stream2 that didn't appear in the join at time t0
operations at time t1:
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2)
write out_stream to HDFS
left_keys_s1 = left_keys_s1 + records of stream1 not in out_stream (should be used at time t2)
left_keys_s2 = left_keys_s2 + records of stream2 not in out_stream (should be used at time t2)
Ich habe versucht, diesen Algorithmus mit Funken erfolglos Streaming zu implementieren. Zunächst erstelle ich zwei leere Ströme für übrig gebliebene Schlüssel auf diese Weise (dies ist nur ein Strom, aber der Code den zweiten Strom zu erzeugen ist ähnlich):
JavaRDD<String> empty_rdd = sc.emptyRDD(); //sc = Java Spark Context
Queue<JavaRDD<String>> q = new LinkedList<JavaRDD<String>>();
q.add(empty_rdd);
JavaDStream<String> empty_dstream = jssc.queueStream(q);
JavaPairDStream<String, String> k1 = empty_dstream.mapToPair(new PairFunction<String, String, String>() {
@Override
public scala.Tuple2<String, String> call(String s) {
return new scala.Tuple2(s, s);
}
});
Später, dieser leere Strom einheitlich ist (dh, union()) mit stream1 und schließlich, nach dem Join, füge ich die übrig gebliebenen Schlüssel von stream1 und call window() hinzu. Das Gleiche passiert mit Stream2.
Das Problem ist, dass die Operationen, die left_keys_s1 und left_keys_s2 generieren, Transformationen ohne Aktionen sind, was bedeutet, dass Spark keinen RDD-Flussgraphen erstellt und daher niemals ausgeführt wird. Was ich jetzt bekomme, ist ein Join, der nur die Datensätze ausgibt, deren Schlüssel sich im selben Zeitintervall in stream1 und stream2 befinden.
Haben Sie irgendwelche Vorschläge, dies korrekt mit Spark zu implementieren?
Danke, Marco