Ich habe einfache Spark-Streaming-Anwendung, die Daten aus Kafka liest und senden Sie diese Daten nach der Transformation auf einem HTTP-Endpunkt (oder ein anderes Kafka - für diese Frage lassen Sie uns http betrachten). Ich überweise Aufträge mit job-server.Pause Spark Streaming Job
Ich beginne gerade den Verbrauch von Quelle kafka mit "auto.offset.reset" = "kleinste" und Intervall = 3s. Im glücklichen Fall sieht alles gut aus. Hier ein Auszug:
kafkaInputDStream.foreachRDD(rdd => {
rdd.foreach(item => {
//This will throw exception if http endpoint isn't reachable
httpProcessor.process(item._1, item._2)
})
})
Da "auto.offset.reset" = "kleinste", diese Prozesse zu 200K Nachrichten in einem Job. Wenn ich den HTTP-Server-Mid-Job stoppe (ein Problem in POSTing simuliert) und httpProcessor.process die Ausnahme auslöst, schlägt dieser Job fehl und alles, was nicht verarbeitet wird, ist verloren. Ich sehe, dass es danach alle 3 Sekunden pollt.
Also meine Frage ist: Wenn in der nächsten 3 Sekunden Job
- meine Vermutung ist richtig, dass, wenn es X-Nachrichten bekommt und nur Y, bevor man einen Fehler verarbeitet werden kann, Rest X-Y wird nicht bearbeitet?
- Gibt es eine Möglichkeit, den Stream/Verbrauch von Kafka anzuhalten? Zum Beispiel, wenn es ein intermittierendes Netzwerkproblem gibt und höchstwahrscheinlich alle verbrauchten Nachrichten in dieser Zeit verloren gehen. Etwas, das immer wieder versucht (möglicherweise exponentielles Zurücksetzen) und wann auch immer der http-Endpunkt UP ist, fange wieder an zu konsumieren.
Dank
Wenn bei der Verarbeitung einer bestimmten Jobstadiumsstufe Netzwerkprobleme auftreten, können Sie die Ausnahmebedingung weitergeben (werfen), damit der gesamte Job fehlschlägt, und den gesamten fehlgeschlagenen Batch erneut wiedergeben. Dies hat einige Gemeinkosten und funktioniert nur, wenn Ihre DAG referenziell transparent ist. –