2016-08-04 51 views
1

Ich möchte Ereignisse mit derselben Eigenschaft erkennen. Angenommen, ich habe einen einfachen Fall Klasse:Apache Flink + CEP - Erkenne gleiche Ereignisse

case class Record(name: String, value: Int) 

Angenommen, es ist der folgende Strom:

Record("A", 1) 
Record("B", 2) 
Record("A", 3) 
Record("C", 4) 

Dann würde Ich mag die doppelte „A“ -Eintrag zu erkennen. Ist das möglich? Ich habe jetzt das:

val start: Pattern[Record, _] = myStream 
.begin("first") 
.followedBy("second").where(previous_record.name == _.name) 
+0

Zur Erkennung auf gleiche Eigenschaft Sie benötigen großen Speicherplatz, da die Eigenschaft kann unbegrenzt sein, für jede Eigenschaft gesehen, die Sie speichern müssen. Wenn die Eigenschaftenmenge begrenzt ist, können Sie Filter anwenden. – ravthiru

Antwort

1

Ich glaube, Sie definieren, was Ereigniserkennung ist, haben Sie versucht, dies:

val start: Pattern[Record, _] = myStream 
    .begin("first").where(name == "A") 
    .followedBy("second").where(name == "A") 

aktualisieren: Zum Beispiel:

val patternIG: Pattern[(String,String), _] = Pattern.begin[String,String)]("start").where(_.name == "Ignition").where(_.ac == "ON").next("end").where(_.name == "Door").where(_.ac == "ON") 
val patternStream: PatternStream[(String,String)] = CEP.pattern(mystream, patternIG) 
def selectFn(pattern : mutable.Map[String,(String,String)]): String = { 
val startEvent = pattern.get("start").get 
val endEvent = pattern.get("end").get 
    "ALERT Door Open" 
} 
val patternStreamSelected = patternStream.select(selectFn(_)).print() 
+0

Fast. Problem hier ist, dass das "A" im vorherigen Datensatz definiert ist, also ist es variabel. –