2016-07-28 23 views
0

Ich möchte Apache Flink für Folgendes verwenden. Ich habe einen Hauptstrom, der mit den Daten eines anderen Streams angereichert werden muss. Dieser Hauptstrom hat Elemente mit den Attributen "site" und "timestamp". Der andere Stream (nennen wir ihn Countrystream) hat die Attribute "Site" und "Country". Der Landesstream sollte das neueste Land, das für eine Website verwendet wird, verfolgen. Wenn zum Beispiel ("klm.com", "netherlands") zuerst angekommen ist und einige Zeit später das Tupel ("klm.com", "france") angekommen ist, dann sollte "klm.com" auf "frankreich" zeigen (da dies der letzte war). Also sollte es einen Zustand aufrechterhalten. Angenommen, ein Tupel ("klm.com", 100) hat den Hauptstrom erreicht. Dies sollte nun auf ("klm.com", 100, "france") angereichert werden. Wenn eine Site nicht im Landstrom gefunden wird, sollte sie mit "?" Angereichert werden. So zum Beispiel ("stackoverflow.com", 150, "?"). Wie kann ich das schaffen?Einen Stream mit einem anderen Stream anreichern

Antwort

0

Ich fand eine Lösung (es dauerte einige Zeit). Ist das effizient? Kann es verbessert werden? Bedeutet das, dass ich keine Kontrollpunkte für meinen iterativen Stream haben kann?

val env = StreamExecutionEnvironment.getExecutionEnvironment 

val mainStream = env.fromElements("a", "a", "b", "a", "a", "b", "b", "a", "c", "b", "a", "c") 
val infoStream = env.fromElements((1, "a", "It is F"), (2, "b", "It is B"), (3, "c", "It is C"), (4, "a", "Whoops, it is A")) 
     .iterate(
      iteration => { 
       (iteration, iteration) 
      } 
     ) 

mainStream 
    .coGroup(infoStream) 
     .where[String]((x: String) => x) 
     .equalTo(_._2) 
     .window(TumblingProcessingTimeWindows.of(Time.seconds(1))) { 
      (first: Iterator[String], second: Iterator[(Int, String, String)], out: Collector[(String, String)]) => { 
       first.foreach((key: String) => { 
         val matchingRecords = second 
          .filter(_._2 == key) 
         if (matchingRecords.nonEmpty) { 
          val matchingRecord = matchingRecords.maxBy(_._1) 
          out.collect((matchingRecord._2, matchingRecord._3)) 
         } 
        } 
       ) 
      } 
     } 
    .print() 

env.execute("proof_of_concept")