3

Ich arbeite an einem Forschungsprojekt, bei dem ich eine komplette Datenanalyse-Pipeline auf der Google Cloud Platform installiert habe. Wir schätzen eindeutige Besucher pro URL in Echtzeit mithilfe von HyperLogLog on Spark. Ich habe Dataproc zum Einrichten des Spark-Clusters verwendet. Ein Ziel dieser Arbeit ist es, den Durchsatz der Architektur in Abhängigkeit von der Clustergröße zu messen. Der Spark-Cluster hat drei Knoten (minimale Konfiguration)Durchsatz für Kafka, Spark, Elasticsearch Stack unter GCP/Dataproc

Ein Datenstrom wird mit eigenen Datengeneratoren in Java simuliert, wo ich die Kafka Producer API verwendet habe. Die Architektur sieht wie folgt aus:

Datengeneratoren -> Kafka -> Spark Streaming -> Elasticsearch.

Das Problem ist: Wie ich die Anzahl der produzierten Ereignisse pro Sekunde auf meinen Datengeneratoren erhöht und es geht über ~ 1000 Ereignisse/s die Einführungsrate in meinem Spark-Job plötzlich zusammenbricht und beginnen, viel zu variieren.

Wie Sie auf dem Screenshot von der Spark Web-Benutzeroberfläche sehen können, bleiben die Verarbeitungszeiten und die Verzögerungen bei der Planung konstant kurz, während die Eingabegeschwindigkeit sinkt.

Screenshot from Spark Web UI

Getestet habe ich es mit einem kompletten einfachen Funkenjob, die nur eine einfache Zuordnung der Fall ist, Ursachen auszuschließen wie langsame Elasticsearch schreibt oder Probleme mit der Arbeit selbst. Kafka scheint auch alle Ereignisse korrekt zu empfangen und zu senden.

Außerdem experimentierte ich mit dem Spark-Konfigurationsparameter: spark.streaming.kafka.maxRatePerPartition und spark.streaming.receiver.maxRate mit dem gleichen Ergebnis.

Hat jemand ein paar Ideen was schief geht hier? Es scheint wirklich der Spark Job oder Dataproc zu sein ... aber ich bin mir nicht sicher. Alle CPU- und Speicherauslastungen scheinen in Ordnung zu sein.

EDIT: Derzeit habe ich zwei kafka Partitionen zu diesem Thema (auf einer Maschine platziert). Aber ich denke Kafka sollte sogar mit nur einer Partition mehr als 1500 Events/s machen. Das Problem war auch zu Beginn meiner Experimente mit einer Partition. Ich benutze direkte Annäherung ohne Empfänger, also liest Spark mit zwei Arbeiterknoten concurrretly vom Thema.

EDIT 2: Ich fand heraus, was diesen schlechten Durchsatz verursacht. Ich habe vergessen, eine Komponente in meiner Architektur zu erwähnen. Ich verwende einen zentralen Flume-Agenten, um alle Ereignisse von meinen Simulator-Instanzen über log4j über netcat zu protokollieren. Dieses Gerinne ist die Ursache für das Leistungsproblem! Ich habe die log4j-Konfiguration geändert, um asynchrone Logger (https://logging.apache.org/log4j/2.x/manual/async.html) über Disruptor zu verwenden. Ich skalierte den Flume-Agenten auf mehr CPU-Kerne und RAM und änderte den Kanal in einen Dateikanal. Aber es hat immer noch eine schlechte Leistung. Kein Effekt ... irgendwelche anderen Ideen, wie Flume Leistung zu tunen ist?

+0

Wie viele Kafka-Partitionen sind in Ihrem Thema? Welche Spark-Streaming-Kafka-Integration verwenden Sie (direkt oder Empfänger)? Hatten Sie die Möglichkeit, benutzerdefinierte Spark-Streaming-Empfänger zu verwenden, um Kafka <-> Probleme mit der Funkenintegration auszuschließen? Hatten Sie eine Chance, das Schreiben in ES durch etwas anderes zu ersetzen (z. B. eine hdfs-Datei, bigtable write, pubsub publish, log-Anweisung), um alle Funken-<-> ES-Integrationsprobleme auszuschließen? –

+0

Derzeit habe ich zwei Kafka-Partitionen zu diesem Thema (auf einem Rechner). Aber ich denke Kafka sollte sogar mit nur einer Partition mehr als 1500 Events/s machen. Das Problem war auch zu Beginn meiner Experimente mit einer Partition. Ich benutze direkte Annäherung ohne Empfänger, also liest Spark mit zwei Arbeiterknoten concurrretly vom Thema. Und wie ich im ersten Post gesagt habe, habe ich den Teil des Jobs entfernt, in dem die Ergebnisse in Elasticsearch geschrieben sind, damit wir ein ES-Problem ausschließen können. – JayKay

Antwort

0

Schwer zu sagen angesichts der spärlichen Menge an Informationen. Ich würde ein Speicherproblem vermuten - irgendwann können die Server sogar anfangen zu tauschen. Überprüfen Sie daher die JVM-Speicherauslastungen und die Auslagerungsaktivität auf allen Servern. Elasticsearch sollte ~ 15.000 Datensätze/Sekunde mit wenig Feinabstimmung verarbeiten können. Überprüfen Sie den freien und festgeschriebenen Arbeitsspeicher auf den Servern.

0

Wie bereits erwähnt, sind CPU- und RAM-Auslastung völlig in Ordnung. Ich habe ein "magisches Limit" herausgefunden, es scheint genau 1500 Ereignisse pro Sekunde zu sein. Wenn ich diese Grenze überschreite, beginnt die Eingangsrate sofort zu wobbeln.

Die mysteriöse Sache ist, dass Bearbeitungszeiten und Planungsverzögerungen konstant bleiben. So kann man Gegendruckeffekte ausschließen, oder?

Das einzige, was ich erraten kann, ist ein technisches Limit mit GCP/Dataproc ... Ich habe keine Hinweise auf die Google-Dokumentation gefunden.

Einige andere Ideen?