2016-04-19 5 views
0

Ich verwende die Flink-Funktion TimeWindow, um einige Berechnungen durchzuführen. Ich erstelle eine 5 Minuten Window. Ich möchte jedoch nur eine Stunde Window für das erste Mal erstellen. Die nächsten Windows, die ich brauche, sind 5 Minuten.Benutzerdefiniertes Windows-Laden in Flink

So, dass für die erste Stunde Daten gesammelt werden und meine Operation darauf durchgeführt wird. Sobald dies erledigt ist, wird alle fünf Minuten die gleiche Operation ausgeführt.

Ich denke, das kann mit einem trigger implementiert werden, aber ich bin mir nicht sicher, welche trigger ich verwenden sollte und wie.

UPDATE: Ich glaube nicht einmal, triggers hilfreich ist, von dem, was ich bekommen kann, definieren sie nur die Zeit/Zahl pro window Auslösung nicht, wenn die ersten window ausgelöst werden soll.

+0

Ich bezweifle, dass dies mit der aktuellen DataStream API möglich ist. Wenn Sie ein Fenster definieren, ist die Definition für alle Instanzen, die zur Laufzeit erstellt werden, "gleich". Die einzige Möglichkeit wäre, einen benutzerdefinierten Operator zu definieren und über '.transform (...)' zu Ihrem Programm hinzuzufügen - Aber das scheint ziemlich umständlich zu sein, richtig gemacht zu werden. –

Antwort

2

Dies ist nicht trivial zu implementieren.

Gegeben eine KeyedStream müssen Sie eine GlobalWindow und eine benutzerdefinierte Stateful Trigger verwenden, die sich "merkt", ob sie zum ersten Mal gefeuert hat oder nicht.

val stream: DataStream[(String, Int)] = ??? 
val result = stream 
    .keyBy(0) 
    .window(GlobalWindows.create()) 
    .trigger(new YourTrigger()) 
    .apply(new YourWindowFunction()) 

Details zu GlobalWindow und Trigger sind im Flink Window documentation.