2016-04-06 3 views
2

Ich habe ein paar Probleme beim Testen der neuen Flink 1.0.0-Funktionen. Ich habe mit CEP tüftelte und ich habe es noch nicht geschafft, eine einfache Demo-Code auszuführen:"Eingabefehler: Tupeltyp erwartet" beim Versuch, auf PatternStream zu wählen

val pattern : Pattern[TrafficEvent, _] = Pattern.begin[TrafficEvent]("start") 
val patternStream = CEP.pattern(stream.javaStream, pattern); 

class MyPatternSelectFunction extends PatternSelectFunction[TrafficEvent, TrafficEvent] { 
    override def select(pattern : java.util.Map[String, TrafficEvent]) : TrafficEvent ={ 
     pattern.get("start") 
    } 
} 
val alerts = patternStream.select(new MyPatternSelectFunction()) 

Der Code kompiliert gut, und Maven zeigen keine Warnungen. TrafficEvent ist eine Klasse mit wenigen einfachen Feldern und stream ist ein Scala DataStream dieser Klasse. Der Fehler wird angezeigt, wenn der Code auf Flink ausgeführt wird. Es läuft für eine Sekunde, und dann der Code beendet mit dieser Fehlermeldung:

Das Programm mit folgenden Ausnahme beendet:

Input mismatch: Tuple type expected. 
      org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:878) 
      org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:302) 
      org.apache.flink.cep.PatternStream.select(PatternStream.java:64) 
      com.demo.DemoTraffic$.main(DemoTraffic.scala:311) 

Ich habe versucht, die Funktionalität zu Java zu bewegen, durch einen statischen Aufbau Klasse wie folgt (Vielleicht gibt es einige seltsame Fragen, die die API von Scala Aufruf):

public static DataStream<DemoTraffic.trafficEvent> getStreamByPattern(DataStream<DemoTraffic.trafficEvent> stream) { 
    Pattern<DemoTraffic.trafficEvent, ?> pattern = Pattern.<DemoTraffic.trafficEvent>begin("start"); 
    PatternStream<DemoTraffic.trafficEvent> patternStream = CEP.pattern(stream, pattern); 
    DataStream<DemoTraffic.trafficEvent> rvalue = patternStream.select(new PatternSelectFunction<DemoTraffic.trafficEvent, DemoTraffic.trafficEvent>() { 
    @Override 
    public DemoTraffic.trafficEvent select(Map<String, DemoTraffic.trafficEvent> pattern) throws Exception { 
     return pattern.get("start"); 
    } 
    }); 
    return rvalue; 
} 

Aber das Ergebnis ist genau das gleiche, und es führt den gleichen Fehler in der PatternStream.select Linie. Irgendwelche Hinweise auf was kann ich versuchen oder was mache ich falsch? Wie Sie sehen können, ist das Muster ziemlich dumm, und es ist nur für Testzwecke. Es akzeptiert nur alle Ereignisse und gibt dieses Ereignis als Antwort zurück. Flink ist 1.0.0, mit Scala 2.10 Version.

Dank

+0

Was ist die Definition von 'TrafficEvent'? –

+0

Können Sie Ihr Beispiel mit dem neuesten SNAPSHOT versuchen? Vielleicht hat es etwas mit dem kürzlich festgelegten https://issues.apache.org/jira/browse/FLINK-3563 zu tun. – twalthr

Antwort

1

Ich gehe davon aus, dass TrafficEvent eine Klasse Scala Fall ist. Die CEP-Bibliothek wurde für die Java-API von Flink geschrieben und unterstützt daher noch keine Scala-Fallklassen.

Als Workaround könnten Sie Ihre Fallklasse in eine normale Scala-Klasse übersetzen.

Es gibt auch eine JIRA ticket, die die Entwicklung der CEP Scala API verfolgt.

+0

Die Problemumgehung funktioniert auf Flink 1.0.1. Vielen Dank! – midnight1247