2013-02-18 14 views
29

Ich benutze zookeeeper, um Daten von kafka zu erhalten. Und hier bekomme ich immer Daten vom letzten Offsetpunkt. Gibt es eine Möglichkeit, den Zeitpunkt des Offsets anzugeben, um alte Daten zu erhalten?Wie bekomme ich Daten vom alten Offsetpunkt in Kafka?

Es gibt eine Option autooffset.reset. Es akzeptiert kleinste oder größte. Kann mir bitte jemand erklären, was am kleinsten und am größten ist. Kann autooffset.reset dabei helfen, Daten vom alten Offset-Punkt anstatt vom letzten Offset-Punkt zu bekommen?

Antwort

20

Die Verbraucher gehören immer zu einer Gruppe und für jede Partition.

von Anfang an zu holen, können Sie alle Daten mit dem Fortschritt assoziiert löschen wie Hussain

refered
ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}"); 

Sie können auch den Versatz der Partition angeben Sie wollen, wie in Kern/src/main/scala angegeben /kafka/tools/UpdateOffsetsInZK.scala

ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString) 

jedoch die nicht indiziert Zeitversatz ist, aber man weiß ja für jede Partition eine Sequenz ist.

Wenn Ihre Nachricht einen Zeitstempel enthält (und beachten Sie, dass dieser Zeitstempel nichts mit dem Moment zu tun hat, in dem Kafka Ihre Nachricht erhalten hat), können Sie einen Indexer versuchen, indem Sie den Offset um N erhöhen , und speichern Sie das Tupel (Thema X, Teil 2, Offset 100, Zeitstempel) irgendwo.

Wenn Sie Einträge von einem bestimmten Zeitpunkt abrufen möchten, können Sie eine binäre Suche auf Ihren groben Index anwenden, bis Sie den gewünschten Eintrag gefunden haben und von dort abrufen.

3

Lesen Sie das Dokument über kafka config: http://kafka.apache.org/08/configuration.html für Ihre Abfrage der kleinsten und größten Werte des Offset-Parameters.

BTW, Beim Erkunden von Kafka habe ich mich gefragt, wie man alle Nachrichten für einen Verbraucher wiedergibt. Ich meine, wenn eine Verbrauchergruppe alle Nachrichten abgefragt hat und sie diese erneut erhalten möchte.

Die Art, wie es erreicht werden kann, ist Daten aus Zoowäscher zu löschen. Verwenden Sie die kafka.utils.ZkUtils-Klasse, um einen Knoten auf zookee zu löschen. Unten ist seine Nutzung: die Zookeeper verfolgt den Fortschritt dieser Verbrauchergruppe in der Trennwand

ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}"); 
7

Von Kafka documentation sie sagen „kafka.api.OffsetRequest.EarliestTime(), um den Anfang der Daten in den Protokollen findet und beginnt von dort Streaming kafka.api.OffsetRequest.LatestTime() werden nur neue Nachrichten streamen Nehmen Sie nicht an, dass offset 0 der Anfangsoffset ist, da Nachrichten im Laufe der Zeit aus dem Protokoll auslaufen."

die SimpleConsumerExample Verwenden Sie hier: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

ähnliche Frage: Kafka High Level Consumer Fetch All Messages From Topic Using Java API (Equivalent to --from-beginning)

Diesen

+0

Sie haben auch eine Codebeispiel Referenz. einen Blick wert – Hild

+0

Das Beispiel, auf das sich Hild bezieht, ist: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Beispiel Sie können das 'Consumer'-Beispiel nicht verwenden, Sie müssen das verwenden 'SimpleConsumerDemo' Beispiel um mit Offsets zu spielen. – pherris

1

Kafka Protokoll Doc helfen könnte, ist eine große Quelle mit Request/Response/Versetzt/Messages zu spielen: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol verwenden Sie Simple Consumer Beispiel, wo folgenden Code den Zustand zeigen:

FetchRequest req = new FetchRequestBuilder() 

     .clientId(clientName) 

     .addFetch(a_topic, a_partition, readOffset, 100000) 

     .build(); 

FetchResponse fetchResponse = simpleConsumer.fetch(req); 

set readOffset gesetzt, um den anfänglichen Offset von zu starten. aber Sie müssen überprüfen, ob der maximale Offset wie auch darüber eine begrenzte Offsetsanzahl gemäß FetchSize im letzten Parameter der addFetch-Methode zur Verfügung stellt.

+0

Überprüfen Sie neue Api in der Version 0.9.0.0 von Kafka zur Verfügung gestellt haben sie einen Schritt nach oben durch Kombination von einfachen & High-Level-Verbraucher. – usman

1

For Now

Kafka FAQ für dieses Problem eine Antwort geben.

Wie bekomme ich genau Offsets von Nachrichten für einen bestimmten Zeitstempel mit OffsetRequest?

Kafka ermöglicht das Abfragen von Offsets von Nachrichten nach Zeit und zwar bei Segmentgranularität. Der Timestamp-Parameter ist der Unix-Timestamp, und die Abfrage des Offsets nach Zeitstempel gibt den spätest möglichen Offset der Nachricht zurück, die nicht später als der angegebene Timestamp angehängt wird. Es gibt 2 spezielle Werte des Zeitstempels - der neueste und der früheste. Für jeden anderen Wert des Unix-Zeitstempels erhält Kafka den Start-Offset des Log-Segments, das nicht später als der angegebene Zeitstempel erstellt wird. Aus diesem Grund und da die Offset-Anforderung nur bei Segmentgranularität geliefert wird, gibt die Offsetholanforderung weniger genaue Ergebnisse für größere Segmentgrößen zurück.

Für genauere Ergebnisse können Sie die Protokollsegmentgröße basierend auf der Zeit (log.roll.ms) anstelle der Größe (log.segment.bytes) konfigurieren. Es sollte jedoch Vorsicht walten gelassen werden, da dies die Anzahl der Dateihandler aufgrund des häufigen Log-Segmentswalzens erhöhen könnte.


Plan für die Zukunft

Kafka wird Zeitstempel Nachrichtenformat hinzuzufügen. Siehe

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata

0

haben Sie das versucht?

ist/kafka-console-consumer.sh --bootstrap-Server localhost: 9092 --topic Test --from-Anfang

Es drucken würde alle Nachrichten für das jeweilige Thema aus, "test" in Dieses Beispiel.

Weitere Details über diesen Link https://kafka.apache.org/quickstart