2016-08-04 27 views
1

Für den folgenden Code laufen sowohl stream1 als auch stream2 einzeln und ich kann die Ausgabe sehen, aber der verbundene Stream protokolliert überhaupt nichts. Ich habe das Gefühl, dass es etwas mit dem Join-Fenster zu tun hat, aber die Daten von beiden Streams kommen fast zur selben Zeit herein.Es kann kein verknüpfter Kafka-Stream zur Ausführung oder Ausgabe von irgendetwas gefunden werden.

val stream = builder.stream(stringSerde, byteArraySerde, "topic") 

val stream1 = stream 
    .filter((key, value) => somefilter(key, value)) 
    .through(stringSerde, byteArraySerde, "topic1") 

val stream2 = stream 
    .filter((key, value) => someotherfilter(key, value)) 
    .through(stringSerde, byteArraySerde, "topic2") 

val joinedStream = stream1 
    .join(stream2, (value1: Array[Byte], value2: Array[Byte]) => { 
    println("wont print anything") 
    return somerandomdata 
    }, 
    JoinWindows.of("othertopic").within(10000L), 
    stringSerde, byteArraySerde, byteArraySerde) 
+1

Ein Join-Fenster über den eingebetteten Datensatz Zeitstempel berechnet wird (dh Metadaten, die in enthalten ist jeder Datensatz zusätzlich zu Schlüssel und Wert). Es würde helfen, wenn Sie diese Zeitstempel zum Debuggen drucken. Um auf sie zuzugreifen, müssen Sie process() verwenden - das angegebene 'context'-Objekt enthält den Zeitstempel des aktuell verarbeiteten Datensatzes (dh der Kontext wird für jeden verarbeiteten Datensatz aktualisiert). –

Antwort