2016-07-06 15 views
1

Ich möchte mit FlinkCEP nur ein "faules" Spiel auf einem Muster machen. Wie kann ich das machen? z.B. Ich habe einen Eingabestrom ACABCABCB und ich möchte auf A gefolgt von C übereinstimmen, um nur 3 Übereinstimmungen und nicht 6 Übereinstimmungen zu erhalten.Ho kann ich ein faules Spiel mit Flink CEP machen

Ich habe das folgende Beispiel zur Veranschaulichung meines Problems erstellt.

val env = StreamExecutionEnvironment.createLocalEnvironment(1) 
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) 

case class MyEvent(id: Int, kind: String, value: String) 
case class MyAggregatedEvent(id: Int, concatenatedValue: String) 

val eventStream = env.fromElements(
    MyEvent(1, "A", "1"), MyEvent(1, "C", "1"), 
    MyEvent(1, "A", "2"), MyEvent(1, "B", "1"), MyEvent(1, "C", "2"), 
    MyEvent(1, "A", "3"), MyEvent(1, "D", "2"), MyEvent(1, "C", "3"), 
    MyEvent(1, "B", "3") 
) 

val pattern: Pattern[MyEvent, _] = Pattern 
    .begin[MyEvent]("pA").where(e => e.kind == "A") 
    .next("pC").where(e => e.kind == "C") 
    .within(Time.seconds(5)) 

val patternNextStream: PatternStream[MyEvent] = CEP.pattern(eventStream.keyBy(_.id), pattern) 

val outNextStream: DataStream[MyAggregatedEvent] = patternNextStream.flatSelect { 
    (pattern: scala.collection.mutable.Map[String, MyEvent], collector: Collector[MyAggregatedEvent]) => 
    val partA = pattern.get("pA").get 
    val partC = pattern.get("pC").get 

    collector.collect(MyAggregatedEvent(partA.id, partA.value + "=>" + partC.value)) 
} 
outNextStream.print() 

env.execute("Experiment") 

Das gibt mir die folgende Ausgabe:

MyAggregatedEvent (1,1 => 1)

Wenn ich ändern das Muster:

val pattern: Pattern[MyEvent, _] = Pattern 
    .begin[MyEvent]("pA").where(e => e.kind == "A") 
    .followedBy("pC").where(e => e.kind == "C") 
    .within(Time.seconds(5)) 

Dann Folgendes wird gedruckt:

MyAggregatedEvent (1,1 => 1)
MyAggregatedEvent (1,1 => 2)
MyAggregatedEvent (1,2 => 2)
MyAggregatedEvent (1,1 => 3)
MyAggregatedEvent (= 1,2 > 3)
MyAggregatedEvent (1,3 => 3)

Wie kann ich ein Muster erstellen, die nur einmal in jedem Fall übereinstimmen, so dass mein ausgegeben:

MyAggregatedEvent (1,1 => 1)
MyAggregatedEvent (1,2 => 2)
MyAggregatedEvent (1,3 => 3)

Antwort

1

Derzeit wird dies nicht von der Flink-CEP-Bibliothek unterstützt. Die passende Semantik kann noch nicht kontrolliert werden. Ich denke, es wäre gut, einen MATCH_ALL und einen passenden MATCH_FIRST Modus zu beginnen, um damit zu beginnen. Die MATCH_FIRST verwirft alle Zwischenzustände, sobald sie eine vollständig übereinstimmende Sequenz gesehen hat. Dies sollte Ihren Anwendungsfall abdecken.