Angenommen ich eine Datei des Formulars haben (ein Ereignis pro Zeile):Wie kann man mit Apache Flink nach Eigenschaften und Zeitfenster zählen?
Source,Timestamp aa,2014-05-02 22:12:11 bb,2014-05-02 22:22:11
Und ich möchte die Anzahl der Ereignisse nach Quelle mit einem kontinuierlichen Zeitfenster von 5 Minuten gruppiert zusammenzufassen. Wie würde ich das mit Flink machen?
Was ich jetzt habe, ist:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<Event> stream = env.fromCollection(new EventFileReader(new File("path/to/file")), Event.class);
stream
.keyBy("getSource()")
.timeWindow(Time.minutes(5))
.sum("getTimestamp()");
env.execute();
public class Event {
private final String source;
private final long timestamp;
public Event(String source, long timestamp) {
this.source = source;
this.timestamp = timestamp;
}
public String getSource() {
return source;
}
public long getTimestamp() {
return timestamp;
}
}
ich zwei Dinge fehlen. Zuerst schlägt dies fehl und sagt, dass die Klasse Event
kein POJO ist. Zweitens kann ich die Anzahl der Ereignisse im Fenster nicht zählen. Im Moment verwende ich .sum("getTimestamp()")
, aber ich bin sicher, dass es nicht ist. Irgendwelche Gedanken?