Ich benutze Spark-Streaming, um kontinuierlich Daten von Kafka zu lesen und einige Statistiken durchzuführen. Ich streame jede Sekunde.Wie findet man die Summe aller Werte in RDDs pro DStream?
So habe ich eine Sekunde Chargen (dstreams). Jede RDD in diesem dstream enthält ein JSON.
Dies ist, wie ich meine DSTREAM:
kafkaStream = KafkaUtils.createDirectStream(stream, ['livedata'], {"metadata.broker.list": 'localhost:9092'})
raw = kafkaStream.map(lambda kafkaS: kafkaS[1])
clean = raw.map(lambda xs:json.loads(xs))
EINER DER RDDs in meinem sauber DSTREAM wie folgt aussieht:
{u'epochseconds': 1458841451, u'protocol': 6, u'source_ip': u'192.168.1.124', \
u'destination_ip': u'149.154.167.120', u'datetime': u'2016-03-24 17:44:11', \
u'length': 1589, u'partitionkey': u'partitionkey', u'packetcount': 10,\
u'source_port': 43375, u'destination_port': 443}
Und ich habe wie 30-150 solche RDDs in jeder DStream.
Nun, was ich versuche zu tun, ist, die gesamte Summe der "Längen" oder sagen "Packetcounts" in jedem DStream. Das heißt,
rdd1.length + rdd2.length + ... + LastRDDInTheOneSecondBatch.length
Was ich versucht:
add=clean.map(lambda xs: (xs['length'],1)).reduceByKey(lambda a, b: a+b)
Was ich bekommen habe:
Frequenz statt Summe.
(17, 6)
(6, 24)
Was soll ich tun, um die Gesamtsumme anstelle der Häufigkeit der Schlüssel zu haben?
Arbeiten, danke! Nur eine zusätzliche Frage, ich möchte noch 2 weitere Parameter von clean in add hinzufügen, sagen wir ('partitionkey', 'timestamp') dies zusammen mit dem 'length' Parameter, der gerade berechnet wurde. Wie mache ich das? – HackCode