Ich arbeite an einem Forschungsprojekt, bei dem ich eine komplette Datenanalyse-Pipeline auf der Google Cloud Platform installiert habe. Wir schätzen eindeutige Besucher pro URL in Echtzeit mithilfe von HyperLogLog on Spark. Ich habe Dataproc zum Einrichten des Spark-Clusters verwendet. Ein Ziel dieser Arbeit ist es, den Durchsatz der Architektur in Abhängigkeit von der Clustergröße zu messen. Der Spark-Cluster hat drei Knoten (minimale Konfiguration)Durchsatz für Kafka, Spark, Elasticsearch Stack unter GCP/Dataproc
Ein Datenstrom wird mit eigenen Datengeneratoren in Java simuliert, wo ich die Kafka Producer API verwendet habe. Die Architektur sieht wie folgt aus:
Datengeneratoren -> Kafka -> Spark Streaming -> Elasticsearch.
Das Problem ist: Wie ich die Anzahl der produzierten Ereignisse pro Sekunde auf meinen Datengeneratoren erhöht und es geht über ~ 1000 Ereignisse/s die Einführungsrate in meinem Spark-Job plötzlich zusammenbricht und beginnen, viel zu variieren.
Wie Sie auf dem Screenshot von der Spark Web-Benutzeroberfläche sehen können, bleiben die Verarbeitungszeiten und die Verzögerungen bei der Planung konstant kurz, während die Eingabegeschwindigkeit sinkt.
Getestet habe ich es mit einem kompletten einfachen Funkenjob, die nur eine einfache Zuordnung der Fall ist, Ursachen auszuschließen wie langsame Elasticsearch schreibt oder Probleme mit der Arbeit selbst. Kafka scheint auch alle Ereignisse korrekt zu empfangen und zu senden.
Außerdem experimentierte ich mit dem Spark-Konfigurationsparameter: spark.streaming.kafka.maxRatePerPartition
und spark.streaming.receiver.maxRate
mit dem gleichen Ergebnis.
Hat jemand ein paar Ideen was schief geht hier? Es scheint wirklich der Spark Job oder Dataproc zu sein ... aber ich bin mir nicht sicher. Alle CPU- und Speicherauslastungen scheinen in Ordnung zu sein.
EDIT: Derzeit habe ich zwei kafka Partitionen zu diesem Thema (auf einer Maschine platziert). Aber ich denke Kafka sollte sogar mit nur einer Partition mehr als 1500 Events/s machen. Das Problem war auch zu Beginn meiner Experimente mit einer Partition. Ich benutze direkte Annäherung ohne Empfänger, also liest Spark mit zwei Arbeiterknoten concurrretly vom Thema.
EDIT 2: Ich fand heraus, was diesen schlechten Durchsatz verursacht. Ich habe vergessen, eine Komponente in meiner Architektur zu erwähnen. Ich verwende einen zentralen Flume-Agenten, um alle Ereignisse von meinen Simulator-Instanzen über log4j über netcat zu protokollieren. Dieses Gerinne ist die Ursache für das Leistungsproblem! Ich habe die log4j-Konfiguration geändert, um asynchrone Logger (https://logging.apache.org/log4j/2.x/manual/async.html) über Disruptor zu verwenden. Ich skalierte den Flume-Agenten auf mehr CPU-Kerne und RAM und änderte den Kanal in einen Dateikanal. Aber es hat immer noch eine schlechte Leistung. Kein Effekt ... irgendwelche anderen Ideen, wie Flume Leistung zu tunen ist?
Wie viele Kafka-Partitionen sind in Ihrem Thema? Welche Spark-Streaming-Kafka-Integration verwenden Sie (direkt oder Empfänger)? Hatten Sie die Möglichkeit, benutzerdefinierte Spark-Streaming-Empfänger zu verwenden, um Kafka <-> Probleme mit der Funkenintegration auszuschließen? Hatten Sie eine Chance, das Schreiben in ES durch etwas anderes zu ersetzen (z. B. eine hdfs-Datei, bigtable write, pubsub publish, log-Anweisung), um alle Funken-<-> ES-Integrationsprobleme auszuschließen? –
Derzeit habe ich zwei Kafka-Partitionen zu diesem Thema (auf einem Rechner). Aber ich denke Kafka sollte sogar mit nur einer Partition mehr als 1500 Events/s machen. Das Problem war auch zu Beginn meiner Experimente mit einer Partition. Ich benutze direkte Annäherung ohne Empfänger, also liest Spark mit zwei Arbeiterknoten concurrretly vom Thema. Und wie ich im ersten Post gesagt habe, habe ich den Teil des Jobs entfernt, in dem die Ergebnisse in Elasticsearch geschrieben sind, damit wir ein ES-Problem ausschließen können. – JayKay