Ich versuche Stateful Spark-Streaming-Berechnungen über (gefälschte) Apache-Webserver-Logs von Kafka lesen. Das Ziel ist es, den Web-Verkehr ähnlich wie this blog postSpark Streaming GroupByKey und updateStateByKey Implementierung
zu "sessionize" Der einzige Unterschied ist, dass ich jede Seite der IP-Hits, statt der gesamten Sitzung "sessionize" will. Ich konnte dieses Lesen aus einer Datei mit gefälschtem Web-Traffic mit Spark im Batch-Modus durchführen, aber jetzt möchte ich es in einem Streaming-Kontext tun.
Log-Dateien werden von Kafka und analysiert in K/V
Paare (String, (String, Long, Long))
oder
(IP, (requestPage, time, time))
lesen.
Ich rufe dann groupByKey()
auf dieser K/V pair
. Im Batch-Modus erzeugen würde dies ein:
(String, CollectionBuffer((String, Long, Long), ...)
oder
(IP, CollectionBuffer((requestPage, time, time), ...)
In einem Streaming, produziert sie ein:
(String, ArrayBuffer((String, Long, Long), ...)
wie so:
(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
jedoch als das nächste Microbatch (DStream) kommt an, diese Information wird verworfen.
Schließlich was ich will ist für diese ArrayBuffer
zu füllen im Laufe der Zeit als eine bestimmte IP weiterhin zu interagieren und einige Berechnungen auf seine Daten zu "sessionize" die Seitenzeit.
Ich glaube, dass der Betreiber dies geschieht ist "updateStateByKey
." Ich habe einige Probleme mit diesem Operator (ich bin neu bei beiden Spark & Scala);
jede Hilfe wird geschätzt.
Bisher:
val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
def updateGroupByKey(
a: Seq[(String, ArrayBuffer[(String, Long, Long)])],
b: Option[(String, ArrayBuffer[(String, Long, Long)])]
): Option[(String, ArrayBuffer[(String, Long, Long)])] = {
}