Antwort

6

Ja, Sie können MessageAndMetadata Version von createDirectStream verwenden, mit der Sie auf message metadata zugreifen können.

Hier finden Sie ein Beispiel, das Dstream von tuple3 zurückgibt.

val ssc = new StreamingContext(sparkConf, Seconds(10)) 

val kafkaParams = Map[String, String]("metadata.broker.list" -> (kafkaBroker)) 
var fromOffsets = Map[TopicAndPartition, Long]() 
val topicAndPartition: TopicAndPartition = new TopicAndPartition(kafkaTopic.trim, 0) 
val topicAndPartition1: TopicAndPartition = new TopicAndPartition(kafkaTopic1.trim, 0) 
fromOffsets += (topicAndPartition -> inputOffset) 
fromOffsets += (topicAndPartition1 -> inputOffset1) 

val messagesDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple3[String, Long, String]](ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[String, String]) => { 
    (mmd.topic ,mmd.offset, mmd.message().toString) 
    }) 

Im obigen Beispiel tuple3._1 haben topic, tuple3._2offset haben und tuple3._3message haben.

Hoffe, das hilft!

+0

Wenn ich auf diese Weise richtig bin, kann ich von einem bestimmten Offset lesen. Ich frage mich immer noch, ob es eine einfache Möglichkeit gibt, den Start-Offset jeder Nachricht innerhalb einer Partition zu berechnen. Was ich brauche ist, den Offset für jede Nachricht zu speichern und dann diesen Code zu verwenden, um eine bestimmte Nachricht zu lesen. Vielen Dank! –

+0

Ja, Sie hatten Recht, aber mit dem obigen Code erhalten Sie auch einen Offset für jede Nachricht in 'messagesDStream'. Ich meine 'createDirectStream' gibt dir' Dstream' von 'Tuple3' und in jedem Tupel erhältst du' themename' und 'message' und den dazugehörigen' offset'. – avr

+0

Hallo, es tut mir leid für die späte Antwort .. Es funktioniert. Ich gehe jedoch davon aus, dass "fromOffset" der Start-Offset ist, von dem aus die Partition gescannt wird. Vielen Dank Avr –