2016-06-09 4 views
1

Ich bin neu in der Flink Streaming API und möchte die folgende einfache Aufgabe (IMO) abschließen. Ich habe zwei Streams und möchte sie mit zählbasierten Fenstern verbinden. Der Code, den ich bisher habe, ist der folgende:Verbinden von zwei Streams mit einem zählerbasierten Fenster

public class BaselineCategoryEquiJoin { 

private static final String recordFile = "some_file.txt"; 

private static class ParseRecordFunction implements MapFunction<String, Tuple2<String[], MyRecord>> { 
    public Tuple2<String[], MyRecord> map(String s) throws Exception { 
     MyRecord myRecord = parse(s); 
     return new Tuple2<String[], myRecord>(myRecord.attributes, myRecord); 
    } 
} 

public static void main(String[] args) throws Exception { 
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment(); 
    ExecutionConfig config = environment.getConfig(); 
    config.setParallelism(8); 
    DataStream<Tuple2<String[], MyRecord>> dataStream = environment.readTextFile(recordFile) 
      .map(new ParseRecordFunction()); 
    DataStream<Tuple2<String[], MyRecord>> dataStream1 = environment.readTextFile(recordFile) 
      .map(new ParseRecordFunction()); 
    DataStreamSink<Tuple2<String[], String[]>> joinedStream = dataStream1 
      .join(dataStream) 
      .where(new KeySelector<Tuple2<String[],MyRecord>, String[]>() { 
       public String[] getKey(Tuple2<String[], MyRecord> recordTuple2) throws Exception { 
        return recordTuple2.f0; 
       } 
      }).equalTo(new KeySelector<Tuple2<String[], MyRecord>, String[]>() { 
       public String[] getKey(Tuple2<String[], MyRecord> recordTuple2) throws Exception { 
        return recordTuple2.f0; 
       } 
      }).window(TumblingProcessingTimeWindows.of(Time.seconds(1))) 
      .apply(new JoinFunction<Tuple2<String[],MyRecord>, Tuple2<String[],MyRecord>, Tuple2<String[], String[]>>() { 
       public Tuple2<String[], String[]> join(Tuple2<String[], MyRecord> tuple1, Tuple2<String[], MyRecord> tuple2) throws Exception { 
        return new Tuple2<String[], String[]>(tuple1.f0, tuple1.f0); 
       } 
      }).print(); 
    environment.execute(); 
} 
} 

Mein Code funktioniert ohne Fehler, aber es produziert keine Ergebnisse. Tatsächlich wird der Aufruf der Methode apply nie aufgerufen (verifiziert, indem im Debug-Modus ein Haltepunkt hinzugefügt wird). Ich denke, der Hauptgrund für den vorherigen ist, dass meine Daten kein Zeitattribut haben. Daher wird Fensterung (materialisiert durch window) nicht richtig gemacht. Daher meine Frage ist, wie kann ich angeben, dass ich meine Verbindung auf der Grundlage von Count-Fenstern stattfinden soll. Zum Beispiel möchte ich, dass der Join alle 100 Tupel aus jedem Stream materialisiert. Ist das vorherige in Flink machbar? Wenn ja, was sollte ich in meinem Code ändern, um dies zu erreichen?

An dieser Stelle muss ich Ihnen mitteilen, dass ich versucht habe, die countWindow() Methode aufzurufen, aber aus irgendeinem Grund wird es nicht von Flink JoinedStreams angeboten.

Danke

Antwort

2

Count-basierte Joins nicht unterstützt werden. Sie könnten zählungsbasierte Fenster emulieren, indem Sie die Semantik "Ereigniszeit" verwenden und jedem Datensatz eine eindeutige SEQ-ID als Zeitstempel zuweisen. Somit wäre ein Zeitfenster von "5" effektiv ein Zählfenster von 5.

+0

Könnten Sie bitte einen Blick auf diese https://stackoverflow.com/questions/46282692/match-based-on- some-property-between-twide-data-streams-und-collect-all-based-on-m – Kumar