2014-12-17 2 views
7

Ich versuche Stateful Spark-Streaming-Berechnungen über (gefälschte) Apache-Webserver-Logs von Kafka lesen. Das Ziel ist es, den Web-Verkehr ähnlich wie this blog postSpark Streaming GroupByKey und updateStateByKey Implementierung

zu "sessionize" Der einzige Unterschied ist, dass ich jede Seite der IP-Hits, statt der gesamten Sitzung "sessionize" will. Ich konnte dieses Lesen aus einer Datei mit gefälschtem Web-Traffic mit Spark im Batch-Modus durchführen, aber jetzt möchte ich es in einem Streaming-Kontext tun.

Log-Dateien werden von Kafka und analysiert in K/V Paare (String, (String, Long, Long)) oder

(IP, (requestPage, time, time)) lesen.

Ich rufe dann groupByKey() auf dieser K/V pair. Im Batch-Modus erzeugen würde dies ein:

(String, CollectionBuffer((String, Long, Long), ...) oder

(IP, CollectionBuffer((requestPage, time, time), ...)

In einem Streaming, produziert sie ein:

(String, ArrayBuffer((String, Long, Long), ...) wie so:

(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000))) 

jedoch als das nächste Microbatch (DStream) kommt an, diese Information wird verworfen.

Schließlich was ich will ist für diese ArrayBuffer zu füllen im Laufe der Zeit als eine bestimmte IP weiterhin zu interagieren und einige Berechnungen auf seine Daten zu "sessionize" die Seitenzeit.

Ich glaube, dass der Betreiber dies geschieht ist "updateStateByKey." Ich habe einige Probleme mit diesem Operator (ich bin neu bei beiden Spark & Scala);

jede Hilfe wird geschätzt.

Bisher:

val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) 


    def updateGroupByKey(
          a: Seq[(String, ArrayBuffer[(String, Long, Long)])], 
          b: Option[(String, ArrayBuffer[(String, Long, Long)])] 
         ): Option[(String, ArrayBuffer[(String, Long, Long)])] = { 

    } 

Antwort

2

Gabor Antwort angefangen hat mir den richtigen Weg, aber hier ist eine Antwort, die die erwartete produziert Ausgabe.

Zuerst für den Ausgang Ich möchte:

(100.40.49.235,List((/,1418934075000,1418934075000), (/,1418934105000,1418934105000), (/contactus.html,1418934174000,1418934174000))) 

ich nicht groupByKey() brauchen. updateStateByKey akkumuliert die Werte bereits in einem Seq, so dass die Addition von groupByKey unnötig (und teuer) ist. Spark-Benutzer empfehlen dringend, groupByKey nicht zu verwenden. Hier

ist der Code, gearbeitet:

def updateValues(newValues: Seq[(String, Long, Long)], 
         currentValue: Option[Seq[ (String, Long, Long)]] 
        ): Option[Seq[(String, Long, Long)]] = { 

    Some(currentValue.getOrElse(Seq.empty) ++ newValues) 

    } 


val grouped = ipTimeStamp.updateStateByKey(updateValues) 

Hier updateStateByKey geben wird eine Funktion (update), die die Akkumulation von Werten über die Zeit (NewValues) sowie eine Option für den aktuellen Wert in dem Strom hat (aktueller Wert). Es gibt dann die Kombination von diesen zurück.getOrElse ist erforderlich, da currentValue gelegentlich leer sein kann. Kredit an https://twitter.com/granturing für den richtigen Code.

2

Ich glaube, Sie suchen nach etwas wie folgt aus:

def updateGroupByKey(
          newValues: Seq[(String, ArrayBuffer[(String, Long, Long)])], 
          currentValue: Option[(String, ArrayBuffer[(String, Long, Long)])] 
         ): Option[(String, ArrayBuffer[(String, Long, Long)])] = { 
    //Collect the values 
    val buffs: Seq[ArrayBuffer[(String, Long, Long)]] = (for (v <- newValues) yield v._2) 
    val buffs2 = if (currentValue.isEmpty) buffs else currentValue.get._2 :: buffs 
    //Convert state to buffer 
    if (buffs2.isEmpty) None else { 
     val key = if (currentValue.isEmpty) newValues(0)._1 else currentValue.get._1 
     Some((key, buffs2.foldLeft(new ArrayBuffer[(String, Long, Long)])((v, a) => v++a))) 
    } 
    }