Ich versuche, eine große CSV an Kafka zu senden. Die Grundstruktur besteht darin, eine Zeile der CSV-Datei zu lesen und sie mit der Kopfzeile zu komprimieren.Senden von großen CSV an Kafka mit Python Spark
a = dict(zip(header, line.split(",")
Diese dann in ein Json mit umgewandelt wird:
message = json.dumps(a)
dann verwende ich kafka-Python-Bibliothek, die Nachricht zu senden
from kafka import SimpleProducer, KafkaClient
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
producer.send_messages("topic", message)
Mit PYSPARK ich einfach eine RDD erstellt haben von Nachrichten aus der CSV-Datei
sc = SparkContext()
text = sc.textFile("file.csv")
header = text.first().split(',')
def remove_header(itr_index, itr):
return iter(list(itr)[1:]) if itr_index == 0 else itr
noHeader = text.mapPartitionsWithIndex(remove_header)
messageRDD = noHeader.map(lambda x: json.dumps(dict(zip(header, x.split(","))
Jetzt möchte ich diese Nachrichten senden: Ich habe eine Funktion
def sendkafka(message):
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
return producer.send_messages('topic',message)
Dann definiere ich eine neue RDD erstellen, um die Nachrichten zu senden
sentRDD = messageRDD.map(lambda x: kafkasend(x))
ich dann rufen sentRDD.count()
Welche beginnt zu wühlen und Nachrichten zu senden
Leider ist dies sehr langsam. Es sendet 1000 Nachrichten pro Sekunde. Dies ist in einem Cluster von 10 Knoten mit jeweils 4 CPUs und 8 GB Speicher.
Im Vergleich dauert das Erstellen der Nachrichten ca. 7 Sekunden auf einer 10 Millionen Zeile csv. ~ ungefähr 2gb
Ich denke, das Problem ist, dass ich einen Kafka-Produzenten innerhalb der Funktion instanziiere. Allerdings, wenn ich nicht spreche dann beklagt sich, dass der Produzent nicht existiert, obwohl ich versucht habe, es global zu definieren.
Vielleicht kann jemand etwas Licht darauf werfen, wie dieses Problem angegangen werden kann.
Danke,
Dank. mit einem einzigen Produzenten außerhalb des Funkens mit einem Async könnte ich 8000 pro Sekunde bekommen. Also habe ich etwas optimiert. Ich entdeckte, dass ich 15 Partitionen für diese csv hatte, also gab ich dem Job 15 Kerne.Ich spielte dann mit den asynchronen Optionen, bis die Stapelgröße 20000 war. Das gab mir einen maximalen Durchsatz von 225 Tausend pro Sekunde. Mit etwas Tuning habe ich also einen vernünftigen Preis bekommen. Das sind 45 Sekunden, um eine 10 Millionen Zeile csv zu streamen. –
@PhineasDashevsky, wäre es sehr hilfreich, wenn Sie den Code für Ihre endgültige Lösung teilen könnten. – Picarus
https://iabdb.me/2015/09/09/kafka-on-the-shore-my-experiences-benchmarking-apache-kafka-part-i/ In diesem Artikel habe ich den Code und eine längere Beschreibung wie es geht. –