Ich bin wirklich neu auf Spark und Scala, und ich verwende ReduceByKeyAndWindows, um Wörter in Kafka-Nachrichten zu zählen, weil ich Fensterfunktion verwenden muss.Neustart ReduceByKeyAndWindows
der Zweck meiner Anwendung wird eine Warnung gesendet, wenn "x" mal Nachrichten von Kafka entdeckt, die ein bestimmtes Wort in einer bestimmten Zeit enthält. Starten Sie dann von Anfang an neu.
Der Code unten erkennt das Wort, aber ich kann nicht machen, dass meine Anwendung neu gestartet wird. Ich denke, wenn möglich, starten Sie die Akkumulation von ReduceByKeyAndWindows oder eine andere Möglichkeit, dies zu tun.
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
object KafKaWordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("KafKaWordCount")
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint("checkpoint")
val lines = ssc.socketTextStream("localhost", 9999) //using NETCAT for test
val wordCounts =
lines.map(x => (x, 1))
.reduceByKeyAndWindow(_ + _, _ - _, Seconds(60), Seconds(2), 2)
//if the value from the key (word) exceeds 10 , sent alert and Restart the values
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
das zweite Beispiel von Yuval Itzchakov Verwendung und die Anzahl der Reichweite von 10 bis 3 und Senden 7 Meldungen zu reduzieren. Wenn Sie mit Funken 1.6.0 und höher, können Sie die experimentellen DStream.mapWithState
verwenden zu halten
Die Ausgabe aus dem zweiten asnwer ist
Word: hello reached count: 1
Word: hello reached count: 2
//No print this message, its OK but the next word not start with 1
Word: hello reached count: 4
Word: hello reached count: 5
Word: hello reached count: 6
Word: hello reached count: 7
Die Ausgabe, die ich
Word: hello reached count: 1
Word: hello reached count: 2
Word: hello reached count: 1
Word: hello reached count: 2
Word: hello reached count: 1
Es ist ein bisschen schwer zu verstehen, was Sie wollen, aber ich habe den Eindruck, dass Sie tatsächlich eine allgemeinere Funktion wie 'updateStateByKey' benötigen. – zero323
danke, ich werde darüber lesen, Sie können mir ein Beispiel zeigen, wie ich die Funktion nutzen kann? nehme an, dass ich recibe dies: ** Hallo ** i dies akkumulieren ** (Hallo, 1) ** ** (hallo, 2) ** ** (hallo, 3) ** Und wenn , ** hallo ** übertrifft ** 5 ** ich werde den alarm senden und ** hallo ** muss ** 0 ** sein und neu beginnen .. Entschuldigung, ich bin auch neu auf dieser Seite. Danke noch einmal! –
Nichtsdestotrotz dieses hübsche Beispiel: http://StackOverflow.com/a/35565682/1560062 – zero323