2016-05-20 17 views
5

Ich benutze Apache Spark Streaming 1.6.1, um eine Java-Anwendung zu schreiben, die zwei Key/Value-Datenströme verbindet und die Ausgabe in HDFS schreibt. Die zwei Datenströme enthalten K/V-Zeichenfolgen und werden regelmäßig in Spark von HDFS mithilfe von TextFileStream() aufgenommen.Übertragen von Datenströmen über mehrere Stapelintervalle hinweg in Spark Streaming

Die beiden Datenströme sind nicht synchronisiert, was bedeutet, dass einige Schlüssel, die zum Zeitpunkt t0 in stream1 sind, zum Zeitpunkt t1 in stream2 erscheinen können, oder umgekehrt. Daher ist es mein Ziel, die beiden Streams zu verbinden und "übrig gebliebene" Schlüssel zu berechnen, die für die Join-Operation in den nächsten Batch-Intervallen in Betracht gezogen werden sollten.

Um dies besser zu verdeutlichen, schauen Sie sich den folgenden Algorithmus:

variables: 
stream1 = <String, String> input stream at time t1 
stream2 = <String, String> input stream at time t1 
left_keys_s1 = <String, String> records of stream1 that didn't appear in the join at time t0 
left_keys_s2 = <String, String> records of stream2 that didn't appear in the join at time t0 

operations at time t1: 
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2) 
write out_stream to HDFS 
left_keys_s1 = left_keys_s1 + records of stream1 not in out_stream (should be used at time t2) 
left_keys_s2 = left_keys_s2 + records of stream2 not in out_stream (should be used at time t2) 

Ich habe versucht, diesen Algorithmus mit Funken erfolglos Streaming zu implementieren. Zunächst erstelle ich zwei leere Ströme für übrig gebliebene Schlüssel auf diese Weise (dies ist nur ein Strom, aber der Code den zweiten Strom zu erzeugen ist ähnlich):

JavaRDD<String> empty_rdd = sc.emptyRDD(); //sc = Java Spark Context 
Queue<JavaRDD<String>> q = new LinkedList<JavaRDD<String>>(); 
q.add(empty_rdd); 
JavaDStream<String> empty_dstream = jssc.queueStream(q); 
JavaPairDStream<String, String> k1 = empty_dstream.mapToPair(new PairFunction<String, String, String>() { 
           @Override 
           public scala.Tuple2<String, String> call(String s) { 
            return new scala.Tuple2(s, s); 
           } 
           }); 

Später, dieser leere Strom einheitlich ist (dh, union()) mit stream1 und schließlich, nach dem Join, füge ich die übrig gebliebenen Schlüssel von stream1 und call window() hinzu. Das Gleiche passiert mit Stream2.

Das Problem ist, dass die Operationen, die left_keys_s1 und left_keys_s2 generieren, Transformationen ohne Aktionen sind, was bedeutet, dass Spark keinen RDD-Flussgraphen erstellt und daher niemals ausgeführt wird. Was ich jetzt bekomme, ist ein Join, der nur die Datensätze ausgibt, deren Schlüssel sich im selben Zeitintervall in stream1 und stream2 befinden.

Haben Sie irgendwelche Vorschläge, dies korrekt mit Spark zu implementieren?

Danke, Marco

Antwort

1

Es sollte möglich sein, die neben carry-over-Wert von Charge zu Charge durch einen Verweis auf ein RDD zu halten, wo wir diese Werte gehalten werden.

Versuchen Sie nicht, Ströme mit der queueDStream zusammenzuführen, sondern eine veränderbare RDD-Referenz, die bei jedem Streaming-Intervall aktualisiert werden kann.

Dies ist ein Beispiel:

In diesem Streaming-Job, beginnen wir mit einer RDD 100 ganzen Zahlen CARRING. Jedes Intervall, 10 Zufallszahlen werden für diese ersten 100 ganzen Zahlen generiert und subtrahiert. Dieser Prozess wird fortgesetzt, bis die ursprüngliche RDD mit 100 Elementen leer ist. Dieses Beispiel zeigt, wie Elemente von einem Intervall zum nächsten übertragen werden.

import scala.util.Random 
    import org.apache.spark.streaming.dstream._ 

    val ssc = new StreamingContext(sparkContext, Seconds(2)) 

    var targetInts:RDD[Int] = sc.parallelize(0 until 100) 

    var loops = 0 

    // we create an rdd of functions that generate random data. 
    // evaluating this RDD at each interval will generate new random data points. 
    val randomDataRdd = sc.parallelize(1 to 10).map(_ =>() => Random.nextInt(100)) 

    val dstream = new ConstantInputDStream(ssc, randomDataRdd) 

    // create values from the random func rdd 

    dataDStream.foreachRDD{rdd => 
         loops += 1 
         targetInts = targetInts.subtract(rdd) 
         if (targetInts.isEmpty) {println(loops); ssc.stop(false)} 
         } 


    ssc.start() 

dieses Beispiel Laufen und loops gegen targetInts.count Plotten gibt die folgende Tabelle:

Removing 100 ints by generating random numbers

Ich hoffe, das Sie genug Anleitung gibt die komplette usecase zu implementieren.