2014-02-13 14 views
6

Ich teste den Kafka High Level Consumer mit dem ConsumerGroupExample-Code von der Kafka-Site. Ich möchte alle vorhandenen Nachrichten zum Thema "Test" abrufen, die ich in der Kafka Server-Konfiguration habe. Mit Blick auf andere Blogs, sollte auto.offset.reset eingestellt werden, um „kleinste“ der Lage sein, alle Nachrichten zu erhalten:Kafka High Level Consumer Alle Nachrichten aus dem Thema abrufen Java-API verwenden (entspricht --von Anfang an)

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { 
    Properties props = new Properties(); 
    props.put("zookeeper.connect", a_zookeeper); 
    props.put("group.id", a_groupId); 
    props.put("auto.offset.reset", "smallest"); 
    props.put("zookeeper.session.timeout.ms", "10000");  

    return new ConsumerConfig(props); 
} 

Die Frage, die ich wirklich haben, ist dies: was das Äquivalent-API-Aufruf für das High Java ist Level Consumer, die das äquivalent ist:

ist/kafka-console-consumer.sh --zookeeper localhost: 2181 --topic Test --from-Anfang

Antwort

4

Sieht aus wie Sie die „low level verwenden müssen SimpleConsumer API "

Für die meisten Anwendungen ist der High-End-Verbraucher Api gut genug. Einige Anwendungen möchten Funktionen, die noch nicht dem High-Level-Consumer ausgesetzt sind (z. B. initialen Offset beim Neustart des Verbrauchers einstellen). Sie können stattdessen unsere Low-Level SimpleConsumer Api verwenden. Die Logik wird ein wenig komplizierter sein und Sie können dem Beispiel in here folgen.

Dieses Beispiel arbeitete für alle Nachrichten von einem Thema mit den folgenden Argumenten bekommen: (beachten Sie, dass der Anschluss der Kafka-Port ist, nicht der ZooKeeper Port, Themen eingerichtet von this example):

10 my-replicated-topic 0 localhost 9092 

Insbesondere gibt es ein Verfahren zu erhalten, die readOffset kafka.api.OffsetRequest.EarliestTime nimmt():

long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName); 

Hier ist ein weiterer Beitrag kann einige alternative Ideen zur Verfügung stellen, wie Sie das heraus sortieren: How to get data from old offset point in Kafka?

+0

Was haben Sie früher erreicht? um alle Nachrichten vom Thema zu lesen. – Samra

6

Grundsätzlich jedes Mal, wenn ein neuer Verbraucher versucht, ein Thema zu konsumieren, wird es Nachrichten von Anfang an lesen. Wenn Sie besonders von Anfang an jedes Mal zu Testzwecken konsumieren, werden Sie jedes Mal, wenn Sie Ihren Consumer mit einer neuen groupID initialisieren, die Nachrichten von Anfang an lesen. Hier ist, wie ich es gemacht habe:

properties.put("group.id", UUID.randomUUID().toString()); 

und lesen Sie Nachrichten von Anfang an jedes Mal!

+0

Danke! Brauchte es für Testzwecke. Ich vermute, das liegt daran, dass Sie die gleichen Daten für verschiedene Zwecke wiederverwenden können. – user1758777

+0

@ user1758777 Ja, ich hatte jeden der verschiedenen Tests benötigt, um mit den gleichen Daten zu arbeiten. –

2

Um Nachrichten von Anfang an zu holen, können Sie dies tun:

import kafka.utils.ZkUtils; 
ZkUtils.maybeDeletePath("zkhost:zkport", "/consumers/group.id"); 

dann folgen Sie einfach der Routinearbeit ...

0
Properties props = new Properties(); 
props.put("bootstrap.servers", "localhost:9092"); 
props.put("auto.offset.reset", "earliest"); 
props.put("group.id", UUID.randomUUID().toString()); 

Diese Eigenschaften Ihnen helfen.