2016-04-19 7 views

Antwort

3

Flink hat keine eingebaute Funktion, um den Durchschnitt auf einer WindowStream zu berechnen. Dazu müssen Sie eine benutzerdefinierte WindowFunction implementieren.

Der effizienteste Weg ist, eine ReduceFunction zu implementieren, die die Zählung und Summe aus dem Wert berechnen, den Sie im Durchschnitt und einem nachfolgenden WindowFunction möchte, dass das Ergebnis der ReduceFunction und berechnet den Durchschnitt dauert. Die Verwendung eines ReduceFunction ist effizienter, da Flink es direkt auf eingehende Werte anwendet. Daher aggregiert es Werte im laufenden Betrieb und sammelt sie nicht im Fenster. Dies reduziert den Speicherbedarf eines Fensters erheblich.

Da die Ausgabe eines ReduceFunction denselben Typ wie seine Eingabe hat, müssen Sie ein Feld für die Zählung hinzufügen, bevor Sie die ReduceFunction anwenden.

etwas wie die folgenden sollte es tun:

val valueStream: DataStream[(String, Double)] = ??? 

val r: DataStream[(String, Double)] = valueStream 
    // append a 1L for counting 
    .map(x => (x._1, x._2, 1l)) 
    // key and window stream 
    .keyBy(0).timeWindow(Time.minutes(5)) 
    .apply(
    // ReduceFunction (compute sum and count) 
    (x: (String, Double, Long), y: (String, Double, Long)) => 
     (x._1, x._2 + y._2, x._3 + y._3), 
    // WindowFunction 
    (key, window: TimeWindow, input: Iterable[(String, Double, Long)], out: Collector[(String, Double)]) => { 
     // get first (and only) value 
     val x: (String, Double, Long) = input.toIterator.next 
     // compute average as sum/count 
     out.collect(x._1, x._2/x._3) 
    } 
) 
+0

Es gibt keine Beispiele oder verfügbar Blog-Beiträge für solche Aufgaben in Flink tun, ich weiß nicht, wie kann ich 'ReduceFunction' implementieren und' WindowFunction' . Können Sie einen Beispielcode teilen? –

+0

OK, ich habe ein Beispiel hinzugefügt. –

+0

Danke, es funktioniert nach ein wenig Feinabstimmung. Trotzdem bin ich noch sehr neu zu flink und hätte mir nie vorstellen können, das selbst zu schreiben. Kannst du mir ein paar Referenzen vorschlagen, von denen ich in Flink erfahren kann? –