Ich versuche Low-Level-Consumer-Java-API verwenden, um Offsets manuell zu verwalten, mit dem neuesten kafka_2.10-0.8.2.1. Um zu überprüfen, ob die von Kafka eingegebenen/gelesenen Offsets korrekt sind, verwende ich das Tool kafka.tools.ConsumerOffsetChecker.Kafka Java API Offset Operationen Klärung
Hier ist ein Beispiel für die Ausgabe für mein Thema/Verbrauchergruppe:
./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group elastic_search_group --zookeeper localhost:2181 --topic my_log_topic
Group Topic Pid Offset logSize Lag Owner
elastic_search_group my_log_topic 0 5 29 24 none
Hier ist meine Interpretation des Ergebnisses:
Offset = 5 -> Dies ist der aktuelle Versatz meine 'elastic_search_group' Verbraucher
lOGSIZE = 29 -> dies ist die neueste Offset - die der nächsten Nachricht versetzt, die zu diesem Thema/partition
kommenLag = 24 -> 29-5 - wie viele Nachrichten von meinem 'elastic_search_group' Verbraucher
Pid noch nicht verarbeitet - Partitions-ID
Q1: Ist das richtig?
Jetzt möchte ich die gleichen Informationen von meinem Java-Verbraucher erhalten. Hier fand ich, dass ich zwei verschiedene APIs verwenden musste:
kafka.javaapi. OffsetRequest zu frühesten und neuesten Offsets erhalten, aber kafka.javaapi. OffsetFetchRequest, um den aktuellen Offset zu erhalten.
Um Frühest (oder Latest) versetzt ich tun:
TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
long[] offsets = response.offsets(topic, partition);
long myEarliestOffset = offsets[0];
// OR for Latest: long myLatestOffset = offsets[0];
Und der Strom zu bekommen Offset ich eine ganz andere API verwenden:
short versionID = 0;
int correlationId = 0;
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>();
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition);
topicPartitionList.add(myTopicAndPartition);
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId);
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq);
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset();
Q2: Ist es richtig? Warum gibt es zwei verschiedene APIs, um sehr ähnliche Informationen zu erhalten?
Q3: ist es wichtig, welche VersionId und correlationId ich hier verwende? Ich denke, VersionId sollte 0 für pre-0.8.2.1 kafka sein, und 1 für 0.8.2.1 und später - aber es scheint, als ob es auch mit 0 für 0.8.2.1 funktioniert - siehe unten?
Also, für den das Beispiel Zustand des Themas oben, und die obigen Ausgabe des ConsumerOffsetChecker, hier ist das, was ich von meinem Java-Code erhalten:
currentOffset = 5; frühesteOffset = 29; nextOffset = 29
'currentOffset' scheint in Ordnung zu sein, 'nextOffset' ist auch korrekt, aber die 'frühesteOffset'? Ich würde erwarten, dass es mindestens "5" ist?
Q4: Wie konnte es passieren, dass der frühesteOffset höher ist als der aktuelleOffset? Mein einziger Verdacht ist, dass Nachrichten aus dem Thema aufgrund von Aufbewahrungsrichtlinien möglicherweise gelöscht wurden .... In allen anderen Fällen hätte dies passieren können?
Danke, @ Shades88! Nach einigem Test, für # 4 - kam ich zu der gleichen Schlussfolgerung, dass diese Situation passieren würde, wenn Protokolle aufgrund von Aufbewahrungsrichtlinien gelöscht wurden. Also habe ich die Handhabung dieses Eckfalls in meine Verbraucherlogik aufgenommen - validiere, dass der aktuelle Versatz> = frühester Versatz ist und setze ihn auf EarliestOffset, wenn nicht. Vielen Dank! – Marina
In Bezug auf 'versionId', wenn Sie '0' angeben, werden Offsets in Zookeeper gespeichert und wenn Sie '1' verwenden, werden Offsets in einem speziellen Kafka-Thema gespeichert. –
Eine nützliche Seite http://grokbase.com/t/kafka/users/154g34g133/simpleconsumer-getoffsetsbefore-problem –