2016-05-19 52 views
0

Ich habe folgendes pyspark Skript, das zu einem lokalen kafka Cluster verbinden annehmen:pyspark ist nicht in der Lage KafkaUtils.createDirectStream zu finden

from pyspark import SparkConf, SparkContext 

from operator import add 
import sys 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 
## Constants 
APP_NAME = "PythonStreamingDirectKafkaWordCount" 
##OTHER FUNCTIONS/CLASSES 

def main(): 
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount") 
    ssc = StreamingContext(sc, 2) 

    brokers, topic = sys.argv[1:] 
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) 
    lines = kvs.map(lambda x: x[1]) 
    counts = lines.flatMap(lambda line: line.split(" ")) \ 
     .map(lambda word: (word, 1)) \ 
     .reduceByKey(lambda a, b: a+b) 
    counts.pprint() 

    ssc.start() 
    ssc.awaitTermination() 
if __name__ == "__main__": 

    main() 

Als ich das laufen lasse, erhalte ich folgende Fehlermeldung:

File "/home/ubuntu/spark-1.3.0-bin-hadoop2.4/hello1.py", line 16, in main 
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) 
AttributeError: type object 'KafkaUtils' has no attribute 'createDirectStream' 

Was muss ich tun, um auf KafkaUtils.createDirectStream zugreifen zu können?

Antwort

1

Sie verwenden Spark 1.3.0 und die Python-Version createDirectStream wurde in Spark 1.4.0 eingeführt. Spark 1.3 bietet nur Scala- und Java-Implementierungen.

Wenn Sie den direkten Stream verwenden möchten, müssen Sie Ihre Spark-Installation aktualisieren.