Ich habe die Dokumentation für spring-cloud-stream 1.0.0.RELEASE ein bisschen durchgelesen und finde keine Dokumentation über Fehlerbehandlung.spring-cloud-stream kafka fehlerbehandlung
Basierend auf Beobachtungen mit Kafka 0.9, wenn mein Consumer eine RuntimeException auslöst, sehe ich 3 Wiederholungen. Nach den drei Wiederholungen, sehe ich in den Protokollen:
2016-05-17 09:35:59.216 ERROR 8983 --- [ kafka-binder-] o.s.i.k.listener.LoggingErrorHandler : Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3731457175, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=130 cap=130]), KafkaMessageMetadata [offset=2, nextOffset=3, Partition[topic='reservation', id=1]]
org.springframework.messaging.MessagingException: Exception thrown while invoking demo.sink.ReservationConsumer#handleReservation[1 args]; nested exception is java.lang.RuntimeException: no message
an dieser Stelle, versetzt die Verbraucher Lags von 1, und wenn ich den Verbraucher neu starten, wird die Meldung erneut 3-mal wiederholt. Wenn ich dann jedoch eine weitere Nachricht an die gleiche Partition sende, so dass der Consumer keine Ausnahme auslöst, wird der Consumer-Offset aktualisiert, und die ursprüngliche Nachricht, für die wir eine Ausnahme ausgelöst haben, wird nach einem Neustart nicht mehr wiederholt.
Ist dies irgendwo dokumentiert, dass ich nicht gefunden habe? Ist die Fehlerbehandlung binder-spezifisch, oder abstrahiert s-c-s, dass dies für alle Binder konsistent ist? Ich vermute, dass dies eine ungeplante Konsequenz davon ist, wie die Verbraucher-Offsets mit dem Kafka-Binder aktualisiert werden. Ich sehe, dass eine enableDlq kafka Consumer-Eigenschaft hinzugefügt wurde, und ich bin dabei, damit zu testen, aber ich bin mir nicht sicher, wie wir mit toten Buchstaben in kafka umgehen können. Ich bin vertraut mit Dead-Letter-Warteschlangen in Rabbitmq, aber mit Rabbitmq können wir das Rabbitmq Shovel-Plugin verwenden, um dlq-Nachrichten erneut zu veröffentlichen und erneut zu versuchen, um Fälle abzudecken, in denen der Fehler auf einen vorübergehenden Dienstausfall zurückzuführen war. Mir ist keine ähnliche Funktionalität bekannt, die für kafka zur Verfügung steht, ohne selbst ein ähnliches Programm zu schreiben.
AKTUALISIEREN: Das Testen mit aktivierter Eigenschaft enableDlq kafka consumer zeigt das gleiche Consumer-Offset-Problem mit Fehlerbehandlung. Wenn der Consumer eine RuntimeException auslöst, sehe ich 3 Wiederholungen, danach wird die Fehlermeldung nicht protokolliert, und ich sehe eine Meldung wie unter error.<destination>.<group>
veröffentlicht, aber der Consumer-Offset wird nicht aktualisiert und bleibt um 1. Wenn ich den Consumer neu starte, es versucht, dieselbe fehlgeschlagene Nachricht von der ursprünglichen Themenpartition erneut zu verarbeiten, wiederholt dreimal und fügt die gleiche Nachricht erneut in das Feld error.<destination>.<group>
ein (doppelte DLQ-Nachricht). Wenn ich eine weitere Nachricht in derselben Topic-Partition veröffentliche, für die der Consumer keine RuntimeException auslöst, wird der Offset aktualisiert, und die ursprüngliche fehlgeschlagene Nachricht wird beim Neustart nicht mehr wiederholt.
Ich denke, der Verbraucher sollte den Verbraucher Offset in kafka aktualisieren, wenn der Verbraucher einen Fehler auslöst, unabhängig davon, ob enableDlq wahr ist oder nicht. Das würde es zumindest konsistent machen, dass eine Nachricht, die bei allen Wiederholungsversuchen fehlgeschlagen ist, entweder verworfen wird (wenn enableDlq falsch ist) oder in der dlq veröffentlicht wird und nie erneut versucht wird (wenn enableDlq wahr ist).
danke für die Bestätigung. https://github.com/spring-cloud/spring-cloud-stream/issues/542 – gadams00