2016-07-01 11 views
1

Ich habe diesen Code unten:Aufzüge Json Extrahieren von JSON-Objekt

object Test { 
    def main(args: Array[String]) { 
      val sparkConf = new SparkConf().setAppName("Spark").setMaster("local[2]") 
      val sc = new SparkContext(sparkConf) 

      val ssc = new StreamingContext(sc, Seconds(3)) 
      val kafkaBrokers = Map("metadata.broker.list" -> "HostName:9092") 
      val offsetMap = Map(TopicAndPartition("topic_test", 0), 8) 
      val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaBrokers, offsetMap) 

var offsetArray = Array[OffsetRange]() 
       lines.transform {rdd => 
         offsetArray = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
         rdd 
       }.map { 
         _.message() 
       }.foreachRDD {rdd => 
        /* NEW CODE */ 
       } 
       ssc.start() 
       ssc.awaitTermination() 
    } 
} 

ich den neuen Code uder der Kommentar /* NEW CODE */ hinzugefügt haben. Meine Frage ist, dass die Zeilen val eine Sequenz von RDDs enthalten, die im Grunde alle 3 Sekunden den Kafka-Server bilden. Dann greife ich die Nachricht mit der Kartenfunktion.

Aber ich bin ein wenig verwirrt darüber, was die foreachRDD Funktion tut. Ist das iterieren über alle RDD's, die in der lines DStream sind (was ist, was ich versuche zu tun)? Die Sache ist die Parse-Funktion aus der lift-json-Bibliothek akzeptiert nur eine Zeichenfolge, so dass ich über alle der RDDs durchlaufen muss und übergeben Sie diese String-Wert an die Parse-Funktion, was ich versucht habe zu tun. Aus irgendeinem Grund wird nichts ausgedruckt.

Antwort

1

Wenn Sie Daten von einem bestimmten Offset lesen möchten, verwenden Sie die falsche Überladung.

Die Sie benötigen, ist dieses:

createDirectStream[K, 
        V, 
        KD <: Decoder[K], 
        VD <: Decoder[V], R] 
        (ssc: StreamingContext, 
        kafkaParams: Map[String, String], 
        fromOffsets: Map[TopicAndPartition, Long], 
        messageHandler: (MessageAndMetadata[K, V]) ⇒ R): InputDStream[R] 

Sie benötigen ein Map[TopicAndPartition, Long]:

val offsetMap = Map(TopicAndPartition("topic_test", 0), 8L) 

Und Sie brauchen eine Funktion zu übergeben, die eine MessageAndMetadata[K, V] empfängt und gibt den gewünschten Typ, zum Beispiel:

val extractKeyValue: MessageAndMetadata[String, String] => (String, String) = 
     msgAndMeta => (msgAndMeta.key(), msgAndMeta.message()) 

Und es verwenden:

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] 
(ssc, kafkaBrokers, offsetMap, extractKeyValue) 
+0

Omg vielen Dank! Nur eine Frage. In dieser Zeile hier: 'val extractKeyValue: MessageAndMetadata [String, String] => (String, String) = msgAndMeta => (msgAndMeta.key(), msgAndMeta.message())' Was genau ist 'msgAndMeta.key(), msgAndMeta.message() 'kehrt wie im Tupel zurück? Ist es "(TOPIC_NAME, MESSAGE)"? – CapturedTree

+1

@ 1290 Wenn Sie einen Datensatz in Kafka einfügen, geben Sie einen Schlüssel und einen Wert an. Also ist 'key()' der Schlüssel und 'message()' ist der Wert. Diese Methode erstellt ein 'Tuple2' von ihnen. –

+0

Ohh ok, ich verstehe. Können Sie mir bitte meine Frage zum Workflow dieses Streaming-Jobs beantworten? Im Wesentlichen wird mein Spark-Streaming-Kontext Daten von den Kafka-Brokern streamen, die ich in meiner 'kafkaParams'-Map angegeben habe, beginnend bei der' 0th'-Partition und dem '8th'-Offset, wie in meiner' offsetMap' angegeben. So, jetzt ist meine Frage: Alle 3 Sekunden wird es den Code laufen lassen, den ich meiner Frage unter dem Kommentar 'NEUEN CODE BITTE HIER 'beigefügt habe? – CapturedTree