2015-05-20 6 views
9

Ich versuche Low-Level-Consumer-Java-API verwenden, um Offsets manuell zu verwalten, mit dem neuesten kafka_2.10-0.8.2.1. Um zu überprüfen, ob die von Kafka eingegebenen/gelesenen Offsets korrekt sind, verwende ich das Tool kafka.tools.ConsumerOffsetChecker.Kafka Java API Offset Operationen Klärung

Hier ist ein Beispiel für die Ausgabe für mein Thema/Verbrauchergruppe:

./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group elastic_search_group --zookeeper localhost:2181 --topic my_log_topic 
Group           Topic                          Pid Offset          logSize         Lag             Owner 
elastic_search_group my_log_topic              0   5               29              24              none 

Hier ist meine Interpretation des Ergebnisses:

Offset = 5 -> Dies ist der aktuelle Versatz meine 'elastic_search_group' Verbraucher

lOGSIZE = 29 -> dies ist die neueste Offset - die der nächsten Nachricht versetzt, die zu diesem Thema/partition

kommen

Lag = 24 -> 29-5 - wie viele Nachrichten von meinem 'elastic_search_group' Verbraucher

Pid noch nicht verarbeitet - Partitions-ID

Q1: Ist das richtig?

Jetzt möchte ich die gleichen Informationen von meinem Java-Verbraucher erhalten. Hier fand ich, dass ich zwei verschiedene APIs verwenden musste:

kafka.javaapi. OffsetRequest zu frühesten und neuesten Offsets erhalten, aber kafka.javaapi. OffsetFetchRequest, um den aktuellen Offset zu erhalten.

Um Frühest (oder Latest) versetzt ich tun:

TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition); 
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); 
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1)); 
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1)); 
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); 
OffsetResponse response = simpleConsumer.getOffsetsBefore(request); 
long[] offsets = response.offsets(topic, partition); 
long myEarliestOffset = offsets[0]; 
// OR for Latest: long myLatestOffset = offsets[0]; 

Und der Strom zu bekommen Offset ich eine ganz andere API verwenden:

short versionID = 0; 
int correlationId = 0; 
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>();  
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition); 
topicPartitionList.add(myTopicAndPartition); 
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId); 
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq); 
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset(); 

Q2: Ist es richtig? Warum gibt es zwei verschiedene APIs, um sehr ähnliche Informationen zu erhalten?

Q3: ist es wichtig, welche VersionId und correlationId ich hier verwende? Ich denke, VersionId sollte 0 für pre-0.8.2.1 kafka sein, und 1 für 0.8.2.1 und später - aber es scheint, als ob es auch mit 0 für 0.8.2.1 funktioniert - siehe unten?

Also, für den das Beispiel Zustand des Themas oben, und die obigen Ausgabe des ConsumerOffsetChecker, hier ist das, was ich von meinem Java-Code erhalten:

currentOffset = 5; frühesteOffset = 29; nextOffset = 29

'currentOffset' scheint in Ordnung zu sein, 'nextOffset' ist auch korrekt, aber die 'frühesteOffset'? Ich würde erwarten, dass es mindestens "5" ist?

Q4: Wie konnte es passieren, dass der frühesteOffset höher ist als der aktuelleOffset? Mein einziger Verdacht ist, dass Nachrichten aus dem Thema aufgrund von Aufbewahrungsrichtlinien möglicherweise gelöscht wurden .... In allen anderen Fällen hätte dies passieren können?

Antwort

10

Ich war auf der Suche nach Möglichkeiten, Lag in Partitionen zu finden. Und das beinhaltet dieselben Schritte, die Sie unternommen haben. Soweit ich gelernt habe, kann ich Ihnen Antworten geben.

  1. logSize zeigt direkt an, wie viele Nachrichten in dieser bestimmten Partition gesammelt wurden. Oder es gibt den maximalen Offset der Nachrichten in dieser Partition an. Offset ist der Offset der letzten erfolgreich konsumierten Nachricht. Lag ist also nur der Unterschied zwischen Log-Größe und Offset.
  2. Ja, es ist korrekt. Bisher sind dies die einzigen zwei Möglichkeiten, den aktuellen Offset und den frühesten oder letzten Offset zu finden.
  3. Ich weiß nicht, warum es notwendig ist, versionId anzugeben. Sie können kafka.api.OffsetRequest.CurrentVersion() verwenden, um VersionId zu erhalten. So Hardcoding kann vermieden werden. Sie können sicher annehmen, dass correlationId 0 ist.
  4. Das ist seltsam. Wenn ich EarliestTime() verwende, bekomme ich frühestes Offset als 0, selbst wenn mein aktueller Offset viel weiter fortgeschritten ist. Es bedeutet, es ist der Beginn der Partitionierung. Wenn also einige Nachrichten in einer zukünftigen Zeit abgelaufen sind, ist dieser früheste Versatz eine Zahl ungleich Null. Nun, wenn Nachrichten aufgrund von Aufbewahrungsrichtlinien gelöscht wurden, sollte die Verzögerung geändert worden sein. Ich bin unsicher über dieses Verhalten. Ein Weg, um sicher zu sein, wäre, Verbraucher laufen zu lassen, nachdem man solches Lesen bemerkt und seine Protokolle eincheckt. Es sollte Linien wie diese zeigen.

    2015-06-09 18:49:15 :: DEBUG :: PartitionTopicInfo: 52 :: zurücksetzen Verbrauch Offset von Anforderungen: 2: abgerufen Offset = 405952: verbraucht Offset = 335372 bis 335372 2015-06-09 18 : 49: 15 :: :: DEBUG PartitionTopicInfo: 52 :: Reset Verbrauch von Anfragen offset: 2: geholten offset = 405952: konsumierte offset = 335373 zu 335373

anzumerken, dass in oben Protokollzeilen, geholten Reste Offset gleicher und verbrauchter Offset erhöht sich. Schließlich wäre es in

2015-06-09 18:49:16 Ende :: DEBUG :: PartitionTopicInfo: 52 :: Reset consume von Anfragen Offset: 2: hergeholt offset = 405952: konsumierte offset = 405.952-405.952

Dann würde dies bedeuten, dass aufgrund der Log-Retention-Richtlinie Offset von 335372 bis 405952 abgelaufen sind

+1

Danke, @ Shades88! Nach einigem Test, für # 4 - kam ich zu der gleichen Schlussfolgerung, dass diese Situation passieren würde, wenn Protokolle aufgrund von Aufbewahrungsrichtlinien gelöscht wurden. Also habe ich die Handhabung dieses Eckfalls in meine Verbraucherlogik aufgenommen - validiere, dass der aktuelle Versatz> = frühester Versatz ist und setze ihn auf EarliestOffset, wenn nicht. Vielen Dank! – Marina

+0

In Bezug auf 'versionId', wenn Sie '0' angeben, werden Offsets in Zookeeper gespeichert und wenn Sie '1' verwenden, werden Offsets in einem speziellen Kafka-Thema gespeichert. –

+0

Eine nützliche Seite http://grokbase.com/t/kafka/users/154g34g133/simpleconsumer-getoffsetsbefore-problem –