Ich habe ein Kafka-Cluster-Setup mit drei Knoten. Ich verwende Sturm, um Nachrichten von Kafka zu lesen. Jedes Thema in meinem System hat 7 Partitionen.Offset fehlt in den Kafka-Protokollen - Simple Consumer kann nicht fortfahren
Jetzt habe ich ein seltsames Problem konfrontiert. Bis vor 3 Tagen hat alles gut funktioniert. Jetzt scheint es jedoch, dass meine Sturm Topologie nicht in der Lage ist, speziell von 2 Partitionen zu lesen - # 1 und # 4.
Ich habe versucht, das Problem zu bohren, und fand, dass in meinem kafka Protokolle, für beide Partitionen, eine also nach 5.964.511 fehlt versetzt ist, versetzt nächste ist 5.964.513 und nicht 5964512.
wegen fehlender Versatz , Simple Consumer kann nicht mit den nächsten Offsets fortfahren. Mache ich etwas falsch oder ist es ein bekannter Fehler?
Was könnte möglicherweise der Grund für ein solches Verhalten sein?
ich folgenden Code verwenden Fenster der gültigen Offsets zu lesen:
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 100));
OffsetRequest request = new OffsetRequest(requestInfoMap, kafka.api.OffsetRequest.CurrentVersion() , clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
long[] validOffsets = response.offsets(topic, partition);
for (long validOffset : validOffsets) {
System.out.println(validOffset + " : ");
}
long largestOffset = validOffsets[0];
long smallestOffset = validOffsets[validOffsets.length - 1];
System.out.println(smallestOffset + " : " + largestOffset);
return largestOffset;
}
Das gibt mir folgende Ausgabe:
4529948 : 6000878
die Offset I Bereitstellung bin gut im Offset-Bereich also.