2016-04-12 10 views
1

Mit dem neuen Kafka Java consumer api führe ich einen einzigen Consumer aus, um Nachrichten zu konsumieren. Wenn alle verfügbaren Nachrichten verbraucht sind, töte ich sie mit kill -15.Kafka neuer Konsument: (re) setze und setze Offsets mit assign, not subscribe

Jetzt möchte ich die Offsets zurücksetzen, um zu starten. Ich möchte vermeiden, nur eine andere Verbrauchergruppe zu verwenden. Was ich versuchte, ist die folgende Abfolge von Anrufen, wobei dieselbe Gruppe wie der Verbraucher verwendet wird, der gerade die Daten gelesen hat.

assign(topicPartition); 
OffsetAndMetadata om = new OffsetAndMetadata(0); 
commitSync(Collections.singletonMap(topicPartition, 0)); 

Ich dachte, ich diese Arbeit in einem Test bekommen hatte, aber jetzt bekomme ich immer nur:

ERROR internals.ConsumerCoordinator: Error UNKNOWN_MEMBER_ID occurred while committing offsets for group queue 
Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance 
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552) 

Ist es grundsätzlich falsch assign mit commitSync zu kombinieren, möglicherweise weil nur subscribe und commitSync gehen zusammen? Die Dokumente sagen nur, dass assign nicht mit subscribe geht, aber ich dachte, dies gilt nur in einem Verbraucherprozess. (Tatsächlich hatte ich sogar gehofft, den Offset-Reset-Verbraucher laufen zu lassen, während der andere Verbraucher auf ist, in der Hoffnung, dass der andere den Offset-Wechsel bemerkt und wieder von vorne beginnt. Aber das Herunterfahren ist auch in Ordnung.)

Irgendwelche Ideen?

Antwort

0

Warum überhaupt Offsets ansetzen? Setzen Sie enable.auto.commit auf false in Properties und verpflichten Sie es nicht, wenn Sie alle Nachrichten beim Neustart erneut lesen.

Zurücksetzen versetzt Sie zum Beispiel verwenden können these methods:

public void seek(TopicPartition partition, long offset) 
public void seekToBeginning(TopicPartition... partitions) 
+0

Ich muss im normalen Betrieb verpflichten. Das Zurücksetzen ist eine Wartungsaufgabe, die in der Produktion und oft auch in der Entwicklung benötigt wird. – Harald

1

das Problem gefunden. Der in meiner Frage beschriebene Ansatz funktioniert gut, wenn wir die folgenden Bedingungen beachten:

a) Möglicherweise läuft kein anderer Verbraucher mit dem Ziel group.id. Selbst wenn ein Konsument nur andere Themen abonniert hat, verhindert dies, dass nach dem Aufruf von assign() statt subscribe() Themen-Offsets übergeben werden.

b) Nachdem der letzte andere Verbraucher gestoppt hat, dauert es 30 Sekunden (ich denke es ist group.max.session.timeout.ms), bevor der Vorgang erfolgreich sein kann. Die indikative Protokollmeldung von kafka ist

Group X generation Y is dead and removed 

Sobald dies im Protokoll angezeigt wird, die Sequenz

assign(topicPartition); 
OffsetAndMetadata om = new OffsetAndMetadata(0); 
commitSync(Collections.singletonMap(topicPartition, 0)); 

gelingen kann.