Wie zu verwenden KafkaUtils.createDirectStream
mit den Offsets für eine bestimmte in Pyspark?Wie erstellt man InputDStream mit Offsets in PySpark (mit KafkaUtils.createDirectStream)?
7
A
Antwort
7
Wenn Sie eine RDD aus Datensätzen in einem Kafka-Thema erstellen möchten, verwenden Sie eine statische Gruppe von Tupeln.
verfügbar machen alle Importe
from pyspark.streaming.kafka import KafkaUtils, OffsetRange
Dann erstellen Sie ein Wörterbuch von Kafka Brokers
kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}
Sie dann Ihre Offsets
start = 0
until = 10
partition = 0
topic = 'topic'
offset = OffsetRange(topic,partition,start,until)
offsets = [offset]
Objekt erstellen
Schließlich erstellen Sie die RDD:
kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams,offsets)
erstellen Strom mit Offsets Sie Folgendes tun müssen:
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext
Dann erstellen Sie Ihre sparkstreaming Zusammenhang mit Ihrem sparkcontext
ssc = StreamingContext(sc, 1)
Als nächstes werden wir einrichten alle unsere Parameter
kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}
start = 0
partition = 0
topic = 'topic'
Dann erstellen wir unsere fromOffset Wörterbuch
topicPartion = TopicAndPartition(topic,partition)
fromOffset = {topicPartion: long(start)}
//notice that we must cast the int to long
Endlich haben wir den Stream-
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams,
fromOffsets=fromOffset)
1
Sie tun können, zu erstellen:
from pyspark.streaming.kafka import TopicAndPartition
topic = "test"
brokers = "localhost:9092"
partition = 0
start = 0
topicpartion = TopicAndPartition(topic, partition)
fromoffset = {topicpartion: int(start)}
kafkaDStream = KafkaUtils.createDirectStream(spark_streaming,[topic], \
{"metadata.broker.list": brokers}, fromOffsets = fromoffset)
Hinweis: Funken 2.2.0, Python 3.6
aber ich erhalte den Fehler "TypeError: unhashable type: 'TopicAndPartition'" – pangpang
Dies ist veraltet für K afka 0.8 und Spark 2.0+ :( – rjurney