2015-02-17 12 views

Antwort

3

während der High Level mit Verbraucher eingestellt props.put("auto.offset.reset", "smallest"); in Zeiten der ConsumerConfig

+7

Dies wird NUR sicherstellen, dass, wenn Sie ERSTES MAL lesen, es von Anfang an lesen wird. Nachfolgende Lesevorgänge ignorieren diese Einstellung vollständig und lesen vom letzten Offset. – KingJulien

+0

Verwenden Sie frühestens für das Lesen von Anfang an. –

26

Das Erstellen arbeitet mit dem 0.9.x-Consumer. Grundsätzlich müssen Sie beim Erstellen eines Consumers diesem Konsumenten eine Konsumentengruppen-ID mit der Eigenschaft ConsumerConfig.GROUP_ID_CONFIG zuweisen. Generieren Sie die Verbrauchergruppen-ID nach dem Zufallsprinzip, wenn Sie den Verbraucher zum Beispiel so starten. properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); (properties ist eine Instanz von java.util.Properties, die Sie an den Konstruktor new KafkaConsumer(properties) übergeben).

Die zufällige Generierung des Clients bedeutet, dass der neuen Verbrauchergruppe in kafka kein Offset zugeordnet ist. Was wir danach tun müssen, ist eine Richtlinie für dieses Szenario festzulegen. Da die Dokumentation für die auto.offset.reset Eigenschaft sagt:

Was tun, wenn es keine Anfänge in Kafka oder wenn der Strom auf dem Server nicht mehr existiert nicht Offset Offset ist (zB weil die Daten gelöscht wurden):

  • frühestens: automatisch das die frühesten Offset zurückgesetzt Offset
  • zuletzt: automatisch zurückgesetzt den Offset auf den neueste Offset
  • none: throw Ausnahme für die Verbraucher, wenn keine vorherige Offset gefunden wird oder die Gruppe des Verbrauchers
  • alles andere: Ausnahme zum Verbraucher werfen.

So aus den oben aufgeführten Optionen müssen wir die earliest Politik wählen, so dass die neue Verbrauchergruppe von Anfang an jedes Mal beginnt.

Ihr Code in Java, wird wie folgt aussehen:

properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); 
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "your_client_id"); 
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
consumer = new KafkaConsumer(properties); 

Das einzige, was Sie brauchen es jetzt, um herauszufinden, ist, wenn mehrere Verbraucher aufweist, die auf den gleichen Verbrauchergruppe gehören, sondern verteilt, wie um eine zufällige ID zu generieren und sie zwischen diesen Instanzen zu verteilen, so dass sie alle derselben Verbrauchergruppe angehören.

Hoffe es hilft!

+1

Dies ist keine gute Lösung. Das führt dazu, dass sich die Daten des Tierpflegers häufen und neue Einträge ständig erstellt und aufgegeben werden. Es ist besser, den Eintrag für Ihre Gruppe, wie von KingJulien unten angegeben, und die verknüpfte Antwort, die er gepostet hat, zu löschen. – CamW

+0

Sie müssen den Verbraucher schließen. consumer.close(); – Nautilus

0

Wenn Sie die java consumer api genauer org.apache.kafka.clients.consumer.Consumer verwenden, können Sie die seek * -Methoden ausprobieren.

consumer.seekToBeginning(consumer.assignment()) 

Hier consumer.assignment() gibt alle zugeordneten Partitionen zu einem bestimmten Verbraucher und seekToBeginning wird von den frühesten für die gegebene Sammlung von Partitionen versetzt starten.

2

Eine Möglichkeit, dies zu tun, wäre eine eindeutige Gruppen-ID bei jedem Start, was bedeutet, dass Kafka Ihnen die Nachrichten in dem Thema von Anfang an senden würde. Tun Sie etwas wie dies, wenn Sie Ihre Eigenschaften für KafkaConsumer gesetzt:

properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); 

Die andere Option ist consumer.seekToBeginning (consumer.assignment()) zu verwenden, aber das wird nicht funktionieren, es sei denn Kafka zunächst einen Herzschlag von Ihrem Verbraucher bekommt durch den Verbraucher dazu bringen, die Poll-Methode aufzurufen. Rufen Sie also poll() auf, führen Sie dann seekToBeginning() aus und rufen Sie dann erneut poll() auf, wenn Sie alle Datensätze von Anfang an möchten. Es ist ein kleiner Hackey, aber dies scheint der zuverlässigste Weg zu sein, um es ab Version 0.9 zu machen.

// At this point, there is no heartbeat from consumer so seekToBeinning() wont work 
// So call poll() 
consumer.poll(0); 
// Now there is heartbeat and consumer is "alive" 
consumer.seekToBeginning(consumer.assingment()); 
// Now consume 
ConsumerRecords<String, String> records = consumer.poll(0); 
0

Eine mögliche Lösung ist eine Implementierung von ConsumerRebalanceListener zu verwenden, während zu einem oder mehreren Themen abonnieren möchte. Der ConsumerRebalanceListener enthält Callback-Methoden, wenn neue Partitionen einem Consumer zugewiesen oder daraus entfernt werden. Das folgende Codebeispiel veranschaulicht dies:

public class SkillsConsumer { 

private String topic; 

private KafkaConsumer<String, String> consumer; 

private static final int POLL_TIMEOUT = 5000; 

public SkillsConsumer(String topic) { 
    this.topic = topic; 
    Properties properties = ConsumerUtil.getConsumerProperties(); 
    properties.put("group.id", "consumer-skills"); 
    this.consumer = new KafkaConsumer<>(properties); 
    this.consumer.subscribe(Collections.singletonList(this.topic), 
      new PartitionOffsetAssignerListener(this.consumer)); 
} 

}

public class PartitionOffsetAssignerListener implements ConsumerRebalanceListener { 

private KafkaConsumer consumer; 

public PartitionOffsetAssignerListener(KafkaConsumer kafkaConsumer) { 
    this.consumer = kafkaConsumer; 
} 

@Override 
public void onPartitionsRevoked(Collection<TopicPartition> partitions) { 

} 

@Override 
public void onPartitionsAssigned(Collection<TopicPartition> partitions) { 
    //reading all partitions from the beginning 
    for(TopicPartition partition : partitions) 
     consumer.seekToBeginning(partition); 
} 

}

Nun, wenn die Partitionen den Verbraucher zugeordnet sind, wird jede Partition von Anfang an gelesen werden.