2016-04-18 4 views
4

Ich möchte ein Zeitfenster von Streaming-Daten in Apache Flink machen. Meine Daten sieht etwas wie folgt aus:Wie führe ich timeWindow() für String DataStream in Flink aus?

1> {52,"mokshda",84.85} 
2> {1,"kavita",26.16} 
2> {131,"nidhi",178.9} 
3> {2,"poorvi",22.97} 
4> {115,"saheba",110.41} 

Alle 20 Sekunden, möchte ich die Summe der Marken (letzte Spalte, zB Mokshda Marken sind 84.85.) Aller Reihen. Die timeWindow() - Funktion arbeitet auf einem KeyedStream und daher muss ich keyBy() diesen DataStream. Ich kann es nach Rollennummer eingeben (die erste Spalte, z. B. 52 für Mokshda).

val windowedStream = stockStream 
         .keyBy(0) 
         .timeWindow(Time.seconds(20)) 
         .sum(2) 

Aber natürlich liest Flink meine Daten nicht als Liste. Es wird als String zu lesen und so bekomme ich die folgende Ausnahme:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Specifying keys via field positions is only valid for tuple data types. Type: String 

Wie kann ich ein Zeitfenster auf String-Daten durchführen, oder wie kann ich diese Daten in Tuple konvertieren?

Antwort

5

Sie können eine DataStream[String] in eine DataStream[(Int, String, Double)] ein MapFunction[String, (Int, String, Double)] eingesetzt, die einen String in seine Bestandteile zerlegt konvertieren, wandelt die Datentypen und gibt einen Tuple.

Sie können auch einen timeWindowAll auf einen nicht codierten Datenstrom anwenden. Die Semantik ist jedoch natürlich anders und ein AllWindow kann nur mit der Parallelität 1 bearbeitet werden.