2016-04-15 14 views
4

Wir haben eine Spark-Streaming-Anwendung, liest es Daten aus einer Kafka-Warteschlange im Empfänger und macht einige Transformation und Ausgabe nach HDFS. Das Batch-Intervall ist 1min, wir haben bereits die Gegendruck und spark.streaming.receiver.maxRate Parameter abgestimmt, so dass es die meiste Zeit gut funktioniert.Spark Streaming Kafka Gegendruck

Aber wir haben immer noch ein Problem. Wenn HDFS vollständig heruntergefahren ist, bleibt der Stapeljob lange hängen (sagen wir mal, das HDFS arbeitet 4 Stunden lang nicht, und der Job bleibt 4 Stunden hängen), aber der Empfänger weiß nicht, dass der Job nicht beendet ist , so erhält es noch Daten für die nächsten 4 Stunden. Das verursacht eine OOM-Ausnahme, und die gesamte Anwendung ist ausgefallen, wir haben viele Daten verloren.

Also meine Frage ist: ist es möglich, den Empfänger wissen, dass der Auftrag nicht beendet wird, so wird es weniger (oder gar keine) Daten erhalten, und wenn der Auftrag beendet wird es mehr Daten erhalten, um aufzuholen . In der obigen Situation, wenn HDFS heruntergefahren ist, liest der Empfänger weniger Daten von Kafka und der in den nächsten 4 Stunden erzeugte Block ist wirklich klein, der Empfänger und die gesamte Anwendung sind nicht abgeschaltet, nachdem das HDFS in Ordnung ist, wird der Empfänger lesen mehr Daten und fangen an aufzuholen.

Antwort

6

Sie können den Gegendruck aktivieren, indem Sie die Eigenschaft spark.streaming.backpressure.enabled = true einstellen. Dies ändert dynamisch Ihre Batch-Größen und vermeidet Situationen, in denen Sie ein OOM aus der Warteschlangenerstellung erhalten. Es hat ein paar Parameter:

  • spark.streaming.backpressure.pid.proportional - Antwortsignal auf Fehler im letzten Chargengröße (Standard 1.0)
  • spark.streaming.backpressure.pid.integral - Antwortsignal auf akkumulierte Fehler - effektiv ein Dämpfer (Standard 0.2)
  • spark.streaming.backpressure.pid.derived - als Reaktion auf die Entwicklung des Fehlers (nützlich für schnell auf Veränderungen reagieren, Standard 0.0)
  • spark.streaming.backpressure.pid.minRate - der Mindestsatz, wie von Ihrer Batch-Frequenz implizierte, es Unterschreitung in hohen Durchsatz Arbeitsplätzen (Standard 100)

Die Standardwerte sind ziemlich gut zu reduzieren ändern, aber ich simulierte die Antwort des Algorithmus auf verschiedene Parameter here

+1

nicht den Link verpassen, schöne und detaillierte Analyse .. – raksja