Flink hat keine eingebaute Funktion, um den Durchschnitt auf einer WindowStream
zu berechnen. Dazu müssen Sie eine benutzerdefinierte WindowFunction
implementieren.
Der effizienteste Weg ist, eine ReduceFunction
zu implementieren, die die Zählung und Summe aus dem Wert berechnen, den Sie im Durchschnitt und einem nachfolgenden WindowFunction
möchte, dass das Ergebnis der ReduceFunction
und berechnet den Durchschnitt dauert. Die Verwendung eines ReduceFunction
ist effizienter, da Flink es direkt auf eingehende Werte anwendet. Daher aggregiert es Werte im laufenden Betrieb und sammelt sie nicht im Fenster. Dies reduziert den Speicherbedarf eines Fensters erheblich.
Da die Ausgabe eines ReduceFunction
denselben Typ wie seine Eingabe hat, müssen Sie ein Feld für die Zählung hinzufügen, bevor Sie die ReduceFunction
anwenden.
etwas wie die folgenden sollte es tun:
val valueStream: DataStream[(String, Double)] = ???
val r: DataStream[(String, Double)] = valueStream
// append a 1L for counting
.map(x => (x._1, x._2, 1l))
// key and window stream
.keyBy(0).timeWindow(Time.minutes(5))
.apply(
// ReduceFunction (compute sum and count)
(x: (String, Double, Long), y: (String, Double, Long)) =>
(x._1, x._2 + y._2, x._3 + y._3),
// WindowFunction
(key, window: TimeWindow, input: Iterable[(String, Double, Long)], out: Collector[(String, Double)]) => {
// get first (and only) value
val x: (String, Double, Long) = input.toIterator.next
// compute average as sum/count
out.collect(x._1, x._2/x._3)
}
)
Es gibt keine Beispiele oder verfügbar Blog-Beiträge für solche Aufgaben in Flink tun, ich weiß nicht, wie kann ich 'ReduceFunction' implementieren und' WindowFunction' . Können Sie einen Beispielcode teilen? –
OK, ich habe ein Beispiel hinzugefügt. –
Danke, es funktioniert nach ein wenig Feinabstimmung. Trotzdem bin ich noch sehr neu zu flink und hätte mir nie vorstellen können, das selbst zu schreiben. Kannst du mir ein paar Referenzen vorschlagen, von denen ich in Flink erfahren kann? –