2016-04-21 6 views
1

Angenommen ich eine Datei des Formulars haben (ein Ereignis pro Zeile):Wie kann man mit Apache Flink nach Eigenschaften und Zeitfenster zählen?

Source,Timestamp aa,2014-05-02 22:12:11 bb,2014-05-02 22:22:11

Und ich möchte die Anzahl der Ereignisse nach Quelle mit einem kontinuierlichen Zeitfenster von 5 Minuten gruppiert zusammenzufassen. Wie würde ich das mit Flink machen?

Was ich jetzt habe, ist:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
    DataStreamSource<Event> stream = env.fromCollection(new EventFileReader(new File("path/to/file")), Event.class); 

    stream 
     .keyBy("getSource()") 
     .timeWindow(Time.minutes(5)) 
     .sum("getTimestamp()");  

    env.execute(); 

public class Event { 
    private final String source; 
    private final long timestamp; 

    public Event(String source, long timestamp) { 
     this.source = source; 
     this.timestamp = timestamp; 
    } 

    public String getSource() { 
     return source; 
    } 

    public long getTimestamp() { 
     return timestamp; 
    } 
} 

ich zwei Dinge fehlen. Zuerst schlägt dies fehl und sagt, dass die Klasse Event kein POJO ist. Zweitens kann ich die Anzahl der Ereignisse im Fenster nicht zählen. Im Moment verwende ich .sum("getTimestamp()"), aber ich bin sicher, dass es nicht ist. Irgendwelche Gedanken?

Antwort

1

Ich würde empfehlen, die fold Funktion zu verwenden, um die Fensteraggregation zu tun. Das folgende Code-Snippet sollte den Job erledigen:

public class Job { 
    public static void main(String[] args) throws Exception { 
     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
     DataStream<Event> stream = env.fromElements(new Event("a", 1), new Event("b", 2), new Event("a", 2)).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Event>() { 
      @Nullable 
      @Override 
      public Watermark checkAndGetNextWatermark(Event event, long l) { 
       return new Watermark(l); 
      } 

      @Override 
      public long extractTimestamp(Event event, long l) { 
       return event.getTimestamp(); 
      } 
     }); 

     DataStream<Tuple2<String, Integer>> count = stream.keyBy(new KeySelector<Event, String>() { 
       @Override 
       public String getKey(Event event) throws Exception { 
        return event.getSource(); 
       } 
      }) 
      .timeWindow(Time.minutes(5)) 
      .fold(Tuple2.of("", 0), new FoldFunction<Event, Tuple2<String, Integer>>() { 
       @Override 
       public Tuple2<String, Integer> fold(Tuple2<String, Integer> acc, Event o) throws Exception { 
        return Tuple2.of(o.getSource(), acc.f1 + 1); 
       } 
      }); 

     count.print(); 

     env.execute(); 
    } 

    public static class Event { 
     private final String source; 
     private final long timestamp; 

     public Event(String source, long timestamp) { 
      this.source = source; 
      this.timestamp = timestamp; 
     } 

     public String getSource() { 
      return source; 
     } 

     public long getTimestamp() { 
      return timestamp; 
     } 
    } 
}