Mit dem neuen Kafka Java consumer api führe ich einen einzigen Consumer aus, um Nachrichten zu konsumieren. Wenn alle verfügbaren Nachrichten verbraucht sind, töte ich sie mit kill -15
.Kafka neuer Konsument: (re) setze und setze Offsets mit assign, not subscribe
Jetzt möchte ich die Offsets zurücksetzen, um zu starten. Ich möchte vermeiden, nur eine andere Verbrauchergruppe zu verwenden. Was ich versuchte, ist die folgende Abfolge von Anrufen, wobei dieselbe Gruppe wie der Verbraucher verwendet wird, der gerade die Daten gelesen hat.
assign(topicPartition);
OffsetAndMetadata om = new OffsetAndMetadata(0);
commitSync(Collections.singletonMap(topicPartition, 0));
Ich dachte, ich diese Arbeit in einem Test bekommen hatte, aber jetzt bekomme ich immer nur:
ERROR internals.ConsumerCoordinator: Error UNKNOWN_MEMBER_ID occurred while committing offsets for group queue
Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552)
Ist es grundsätzlich falsch assign
mit commitSync
zu kombinieren, möglicherweise weil nur subscribe
und commitSync
gehen zusammen? Die Dokumente sagen nur, dass assign
nicht mit subscribe
geht, aber ich dachte, dies gilt nur in einem Verbraucherprozess. (Tatsächlich hatte ich sogar gehofft, den Offset-Reset-Verbraucher laufen zu lassen, während der andere Verbraucher auf ist, in der Hoffnung, dass der andere den Offset-Wechsel bemerkt und wieder von vorne beginnt. Aber das Herunterfahren ist auch in Ordnung.)
Irgendwelche Ideen?
Ich muss im normalen Betrieb verpflichten. Das Zurücksetzen ist eine Wartungsaufgabe, die in der Produktion und oft auch in der Entwicklung benötigt wird. – Harald