Ich versuche, den Offset für eine bestimmte Nachricht in Kafka mithilfe von Spark Direct Stream zu erhalten und zu speichern. Mit Blick auf die Spark-Dokumentation erhalten Sie einfach die Bereichsoffsets für jede Partition, aber ich muss den Startoffset für jede Nachricht eines Themas nach einem vollständigen Scan der Warteschlange speichern.Ist es möglich, in Kafka + SparkStreaming einen spezifischen Nachrichten-Offset zu erhalten?
5
A
Antwort
6
Ja, Sie können MessageAndMetadata Version von createDirectStream
verwenden, mit der Sie auf message metadata
zugreifen können.
Hier finden Sie ein Beispiel, das Dstream von tuple3
zurückgibt.
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaParams = Map[String, String]("metadata.broker.list" -> (kafkaBroker))
var fromOffsets = Map[TopicAndPartition, Long]()
val topicAndPartition: TopicAndPartition = new TopicAndPartition(kafkaTopic.trim, 0)
val topicAndPartition1: TopicAndPartition = new TopicAndPartition(kafkaTopic1.trim, 0)
fromOffsets += (topicAndPartition -> inputOffset)
fromOffsets += (topicAndPartition1 -> inputOffset1)
val messagesDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple3[String, Long, String]](ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[String, String]) => {
(mmd.topic ,mmd.offset, mmd.message().toString)
})
Im obigen Beispiel tuple3._1
haben topic
, tuple3._2
offset
haben und tuple3._3
message
haben.
Hoffe, das hilft!
Wenn ich auf diese Weise richtig bin, kann ich von einem bestimmten Offset lesen. Ich frage mich immer noch, ob es eine einfache Möglichkeit gibt, den Start-Offset jeder Nachricht innerhalb einer Partition zu berechnen. Was ich brauche ist, den Offset für jede Nachricht zu speichern und dann diesen Code zu verwenden, um eine bestimmte Nachricht zu lesen. Vielen Dank! –
Ja, Sie hatten Recht, aber mit dem obigen Code erhalten Sie auch einen Offset für jede Nachricht in 'messagesDStream'. Ich meine 'createDirectStream' gibt dir' Dstream' von 'Tuple3' und in jedem Tupel erhältst du' themename' und 'message' und den dazugehörigen' offset'. – avr
Hallo, es tut mir leid für die späte Antwort .. Es funktioniert. Ich gehe jedoch davon aus, dass "fromOffset" der Start-Offset ist, von dem aus die Partition gescannt wird. Vielen Dank Avr –