2014-11-20 5 views
7

Ich erstelle eine Anwendung, mit der Abonnements für kafka-Themen dynamisch hinzugefügt und entfernt werden können. Wenn ein Themenabonnement hinzugefügt wird, möchte ich jede Stunde einen Stapeljob ausführen, der alle neuen Nachrichten empfängt und in einen anderen Datenspeicher verschiebt.Kafka - Einfachste Möglichkeit, den neuesten Offset zu erhalten

Was ich verstehen möchte ist, wie man den aktuellen Offset eines Themas bekommt. Sobald eine Subskription hinzugefügt wird, möchte ich, dass der nächste Batchjob alle Nachrichten seit dem ungefähren Zeitpunkt des Abonnements erhält.

Als Beispiel, stellen Sie sich vor, ich habe ein Thema namens "TopicA", die ständig Nachrichten empfängt. Wenn ich um 19.15 Uhr ein Abonnement hinzufüge, wenn der Stapeljob um 20 Uhr läuft, möchte ich, dass alle Nachrichten seit 19.15 Uhr zusammengelegt werden. Ich bin froh, dass die Zeit ungefähr ist - 7.10, 7.20 etc. 5 oder 10 Minuten auf beiden Seiten machen mir keine Sorgen.

Meine beabsichtigte Lösung ist also, den aktuellen Offset eines Themas in dem Moment zu erhalten, in dem ein Abonnement hinzugefügt wird. Ich habe mir den einfachen Verbraucher angeschaut, möchte mich aber nicht in alle Aspekte des Cluster-Managements für diesen grundlegenden Anwendungsfall einmischen.

Ich habe auch auf den High-Level-Verbraucher geschaut. Ich könnte so etwas wie diese:

consumer.createMessageStreamsByFilter(new Whitelist(topicName)).head.head.offset 

Was mich bei diesem Ansatz ist der Aufruf von „Kopf“ ist eigentlich ein Strom. Ich glaube also, es wird das Warten auf die nächste Nachricht blockieren. Das Blockieren ist problematisch, da es dazu führen kann, dass andere Abonnements in die Warteschlange gestellt werden, bis die nächste Nachricht eintrifft.

Ich bin glücklich, etwas Zeit mit der Implementierung des letzteren Ansatzes zu verbringen, aber wenn es einen einfacheren Weg gibt, den ich nicht fehleranfälligen simultanen Code schreiben muss, dann würde ich lieber keine Zeit verschwenden.

Ich brauche auch eine Möglichkeit, alle Protokolle seit diesem Offset zu bekommen.

Antwort

2

Jede Antwort auf eine Abrufanforderung gibt eine "HighWaterMark" zurück, die den letzten Offset im Protokoll der aktuell verwendeten Partition darstellt. In der Theorie könnten Sie also die früheste Nachricht oder tatsächlich jede Nachricht (vorausgesetzt, es existiert eine) für ein bestimmtes Thema holen und die HighWaterMark aus der Antwort ziehen. Es gibt weitere Einzelheiten über die Highwatermark hier: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse

Natürlich die HighWaterMarkOffset aus der Antwort auf Ihrem Client-depends ziehen zu können Daten, die Zurverfügungstellung durch eigenen Kafka-API.

+0

Dies wäre die obere Grenze für eine bestimmte Partition. Ich denke, er fragt nach den Informationen der "letzten Nachricht" {partitionId, offsetId}. – arviman

+1

Ich denke, es gibt keine globale "letzte Nachricht". Kafka würde nicht skalieren, wenn es einen globalen Synchronisationsmechanismus hätte ... –