2016-06-21 7 views
4

Ich benutze Flink DataStream API, wo es Racks gibt & Ich möchte "Durchschnitt" der Temperaturgruppe nach Rack-IDs berechnen. Meine Fensterdauer beträgt 40 Sekunden. & mein Fenster gleitet alle 10 Sekunden ... Es folgt mein Code, wo ich summe der Temperaturen alle 10 Sekunden für jede RackID berechne, aber jetzt möchte ich Durchschnitt berechnen Temperaturen ::Durchschnitt berechnen mit Flink DataStream für eine Fensterdauer

static Properties properties=new Properties(); 
    public static Properties getProperties() 
    { 
     properties.setProperty("bootstrap.servers", "54.164.200.104:9092"); 
     properties.setProperty("zookeeper.connect", "54.164.200.104:2181"); 
     //properties.setProperty("deserializer.class", "kafka.serializer.StringEncoder"); 
     //properties.setProperty("group.id", "akshay"); 
     properties.setProperty("auto.offset.reset", "earliest"); 
     return properties; 
    } 

@SuppressWarnings("rawtypes") 
public static void main(String[] args) throws Exception 
{ 
    StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
    Properties props=Program.getProperties(); 
    DataStream<TemperatureEvent> dstream=env.addSource(new FlinkKafkaConsumer09<TemperatureEvent>("TemperatureEvent", new TemperatureEventSchema(), props)).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()); 
    DataStream<TemperatureEvent> ds1=dstream.keyBy("rackId").timeWindow(Time.seconds(40), Time.seconds(10)).sum("temperature"); 
    env.execute("Temperature Consumer"); 
} 

Wie kann ich Durchschnittstemperatur für das obige Beispiel berechnen?

Antwort

3

Soweit ich das beurteilen kann, müssen Sie die durchschnittliche Funktion selbst schreiben. Sie können ein Beispiel finden Sie hier:

https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java

In Ihrem Fall würden Sie wahrscheinlich .sum("temperature");

mit so etwas wie .apply(new Avg()); ersetzen und die Mittelklasse implementieren:

public class Avg implements WindowFunction<TemperatureEvent, TemperatureEvent, Long, org.apache.flink.streaming.api.windowing.windows.Window> { 

    @Override 
    public void apply(Long key, Window window, Iterable<TemperatureEvent> values, Collector<TemperatureEvent> out) { 
    long sum = 0L; 
    int count = 0; 
    for (TemperatureEvent value : values) { 
     sum += value.getTemperature(); 
     count ++; 
    } 

    TemperatureEvent result = values.iterator().next(); 
    result.setTemperature(sum/count); 
    out.collect(result); 
    } 
} 

Hinweis: Wenn es eine Chance gibt, dass Ihre Funktion wird an ein leeres Fenster (z.B. durch Verwendung von benutzerdefinierten Triggern), müssen Sie vor dem Zugriff auf elements.head eine Überprüfung vornehmen.