2015-10-21 7 views

Antwort

7

Wenn Sie eine RDD aus Datensätzen in einem Kafka-Thema erstellen möchten, verwenden Sie eine statische Gruppe von Tupeln.

verfügbar machen alle Importe

from pyspark.streaming.kafka import KafkaUtils, OffsetRange 

Dann erstellen Sie ein Wörterbuch von Kafka Brokers

kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"} 

Sie dann Ihre Offsets

start = 0 
until = 10 
partition = 0 
topic = 'topic'  
offset = OffsetRange(topic,partition,start,until) 
offsets = [offset] 
Objekt erstellen

Schließlich erstellen Sie die RDD:

kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams,offsets) 

erstellen Strom mit Offsets Sie Folgendes tun müssen:

from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition 
from pyspark.streaming import StreamingContext 

Dann erstellen Sie Ihre sparkstreaming Zusammenhang mit Ihrem sparkcontext

ssc = StreamingContext(sc, 1) 

Als nächstes werden wir einrichten alle unsere Parameter

kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"} 
start = 0 
partition = 0 
topic = 'topic'  

Dann erstellen wir unsere fromOffset Wörterbuch

topicPartion = TopicAndPartition(topic,partition) 
fromOffset = {topicPartion: long(start)} 
//notice that we must cast the int to long 

Endlich haben wir den Stream-

directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams, 
fromOffsets=fromOffset) 
+0

aber ich erhalte den Fehler "TypeError: unhashable type: 'TopicAndPartition'" – pangpang

+1

Dies ist veraltet für K afka 0.8 und Spark 2.0+ :( – rjurney

1

Sie tun können, zu erstellen:

from pyspark.streaming.kafka import TopicAndPartition 
topic = "test" 
brokers = "localhost:9092" 
partition = 0 
start = 0 
topicpartion = TopicAndPartition(topic, partition) 
fromoffset = {topicpartion: int(start)} 
kafkaDStream = KafkaUtils.createDirectStream(spark_streaming,[topic], \ 
     {"metadata.broker.list": brokers}, fromOffsets = fromoffset) 

Hinweis: Funken 2.2.0, Python 3.6