2016-04-20 6 views
3

Ich habe einfache Spark-Streaming-Anwendung, die Daten aus Kafka liest und senden Sie diese Daten nach der Transformation auf einem HTTP-Endpunkt (oder ein anderes Kafka - für diese Frage lassen Sie uns http betrachten). Ich überweise Aufträge mit job-server.Pause Spark Streaming Job

Ich beginne gerade den Verbrauch von Quelle kafka mit "auto.offset.reset" = "kleinste" und Intervall = 3s. Im glücklichen Fall sieht alles gut aus. Hier ein Auszug:

kafkaInputDStream.foreachRDD(rdd => { 
    rdd.foreach(item => { 
    //This will throw exception if http endpoint isn't reachable 
     httpProcessor.process(item._1, item._2) 
    }) 
}) 

Da "auto.offset.reset" = "kleinste", diese Prozesse zu 200K Nachrichten in einem Job. Wenn ich den HTTP-Server-Mid-Job stoppe (ein Problem in POSTing simuliert) und httpProcessor.process die Ausnahme auslöst, schlägt dieser Job fehl und alles, was nicht verarbeitet wird, ist verloren. Ich sehe, dass es danach alle 3 Sekunden pollt.

Also meine Frage ist: Wenn in der nächsten 3 Sekunden Job

  1. meine Vermutung ist richtig, dass, wenn es X-Nachrichten bekommt und nur Y, bevor man einen Fehler verarbeitet werden kann, Rest X-Y wird nicht bearbeitet?
  2. Gibt es eine Möglichkeit, den Stream/Verbrauch von Kafka anzuhalten? Zum Beispiel, wenn es ein intermittierendes Netzwerkproblem gibt und höchstwahrscheinlich alle verbrauchten Nachrichten in dieser Zeit verloren gehen. Etwas, das immer wieder versucht (möglicherweise exponentielles Zurücksetzen) und wann auch immer der http-Endpunkt UP ist, fange wieder an zu konsumieren.

Dank

+1

Wenn bei der Verarbeitung einer bestimmten Jobstadiumsstufe Netzwerkprobleme auftreten, können Sie die Ausnahmebedingung weitergeben (werfen), damit der gesamte Job fehlschlägt, und den gesamten fehlgeschlagenen Batch erneut wiedergeben. Dies hat einige Gemeinkosten und funktioniert nur, wenn Ihre DAG referenziell transparent ist. –

Antwort

0

denke ich, Frühling Wolke Strom Ihr Problem lösen könnte. Kafka ist Quelle. Spark Streaming ist ein Prozessor. Http ist sinken. Nur wenn eine Eingabe von Kafka vorliegt, wird Spark Streaming verarbeitet. Sie müssen die Eingabe von Kafka nicht stoppen oder fortsetzen. Hoffe es hilft.

+0

Ich verstehe nicht - Sie müssen die Eingabe von Kafka nicht stoppen/fortsetzen? Was ist, wenn Http Sink nicht reagiert ~ vielleicht ist es ausgefallen oder ein temporärer Netzwerkausfall? –

+0

Quellprozessor und Senke funktionieren wie Linux PIPELINE. Es ist in Ordnung, ohne zu sinken, und Prozessor würde gut funktionieren. Sie können mehr aus [Feder-Wolke-Stream] (http://docs.spring.io/spring-cloud-stream/docs/1.0.0.RC2/reference/htmlsingle/index.html) und [Feder-Wolke -Datenfluss] (http://docs.spring.io/spring-cloud-dataflow/docs/1.0.0.M2/reference/html/) –

+0

Wenn Http sinken ist und Sie keine Daten verlieren möchten Von Spark Streaming können Sie Kafka oder Redis zwischen Spark Streaming und Http Sink hinzufügen. Wenn Http Sink oben ist, erhält es Daten von Kafka. –

2

Ja, Ihre Annahme ist richtig, dass, wenn Ihre Partition ausfällt, die verbleibenden Ereignisse für den Moment nicht verarbeitet werden.

Es gibt jedoch einige Parameter, die Sie einstellen müssen, um das gewünschte Verhalten zu erzielen (wenn Sie DirectKafkaInputDStream verwenden).

Beginnen wir mit "auto.offset.reset" = "kleinste": Dieser Parameter sagt Kafka, von Anfang an zu beginnen, wenn es kein gespeichertes Commit für die aktuelle Gruppe gibt. Da Sie erwähnt haben, dass Ihre RDD nach dem Start eine Menge Nachrichten enthält, gehe ich davon aus, dass Sie Ihre Nachrichten nicht korrekt übertragen. Wenn Sie erwarten, genau einmal Semantik, sollten Sie auf jeden Fall in Betracht ziehen, Ihre Offsets zu verfolgen, da der DirectKafkaStreamInput dies explizit nicht verfolgt.

Startverschiebungen im Voraus festgelegt werden, und diese DSTREAM ist für die Begehung von Versetzungen nicht verantwortlich, so dass Sie genau einmal

Comment in the DirectKafkaInputSream Branch 1.6

, dass Ihre Nachricht erneut verarbeitet werden derzeit die steuern können, Bei jedem Neustart des Streaming-Jobs.

Wenn Sie Ihre verarbeiteten Offsets festschreiben und beim Start in den InputDStream übergeben, wird der Listener mit dem letzten festgeschriebenen Offset fortgesetzt.

In Bezug auf Rückstau, die DirectKafkaInputDStream nutzt bereits ein RateController, die schätzt, wie viele Ereignisse in einer Charge verarbeitet werden sollen.

es nutzen zu können, müssen Sie Rückstau aktivieren:

"spark.streaming.backpressure.enabled": true 

Sie können auch die „spark.streaming.kafka.maxRatePerPartition“ begrenzen eine Obergrenze für die Chargengröße hinzuzufügen.

Wenn Sie den Gegendruck selbst regeln möchten (und vielleicht den Verbraucher für eine Weile komplett stoppen), sollten Sie einige Methoden von StreamingListener implementieren und in Ihrer Arbeit verwenden. Sie können z.B. Entscheiden Sie sich nach jedem abgeschlossenen Batch, um den Streaming-Job zu stoppen oder nicht mit dem StreamingListener.

+0

Alle guten Punkte. Sie haben vergessen, auch CheckPointing zu erwähnen, das ich getestet habe, und funktioniert wirklich gut, wenn Sie den Treiber für ein Upgrade etc. neu starten möchten. Ich war mehr besorgt über zeitweilige Ausfälle, wie Netzwerkfehler für 2 Minuten ~ 10K möglicherweise fehlgeschlagene Nachrichten. Wenn ich irgendwo den Versatz erfasse, sagen Zookeeper/Cassandra, wird es schwierig, Nachrichten wiederzugeben, wenn sich das Netzwerk erholt und der Stream die Verarbeitung der Nachrichten wieder aufnimmt. Müssen Sie darüber ein wenig mehr nachdenken, danke für die Antwort. Wenn es keine bessere Antwort gibt, vergebe ich dir Kopfgeldpunkte. –

+1

@KP Wenn Sie bereits teilweise durch Gegendruck isoliert sind und nur intermittierende Fehler behandeln möchten, scheitern Sie nicht. Da jede Partition sequentiell behandelt wird, kann die "try recoverWithDelay recoverWithDelay ... fail" -Kette für jedes Element ausreichen, um den Stream für eine kurze Zeitspanne effektiv zu "pausieren". – zero323