2016-03-22 10 views
2

I KafkaConsumer bin mit Nachrichten von Kafka-Server (Themen) zu konsumieren ..kafka Verbraucher dynamisch Themen erkennen hinzugefügt

  • Es funktioniert für Themen Fein erstellt vor Consumer-Code gestartet ...

Aber das Problem ist, es wird nicht funktionieren, wenn die Themen dynamisch erstellt (ich meine zu sagen, nachdem Verbraucher-Code gestartet), aber die API sagt, dass es dynamische Thema Erstellung unterstützt .. Hier ist der Link für Ihre Referenz ..

Kafka Version verwendet: 0.9.0.1

https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

ist der JAVA-Code ...

Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("group.id", "test"); 
    props.put("enable.auto.commit", "false"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
    Pattern r = Pattern.compile("siddu(\\d)*"); 

    consumer.subscribe(r, new HandleRebalance()); 
    try { 
     while(true) { 
      ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); 
      for (TopicPartition partition : records.partitions()) { 
       List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); 
       for (ConsumerRecord<String, String> record : partitionRecords) { 
        System.out.println(partition.partition() + ": " +record.offset() + ": " + record.value()); 
       } 
       long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); 

       consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); 
      } 
     } 
    } finally { 
     consumer.close(); 
    } 

HINWEIS: Mein Thema Name der Regular Expression passen .. Und wenn ich die Verbraucher neu starten, dann wird es jetzt schon Beiträge lesen gedrückt zum Thema ...

Jede Hilfe ist wirklich zu schätzen ...

Antwort

2

Sie in Zookeeper Haken können. Schauen Sie sich the sample code an. Im Wesentlichen werden Sie einen Beobachter auf dem Zookeeper Knoten /brokers/topics erstellen. Wenn neue Kinder werden hier aufgenommen, es ist ein neues Thema hinzugefügt wird, und Ihre Beobachter wird ausgelöst werden.

+0

Danke für Ihre Antwort und Hilfe ... im Grunde wollte ich KafkaConsumer API verwenden, um dies zu erreichen und ich löste es selbst .. – siddu

+0

Wie wurde das gelöst? Ich habe das gleiche Problem. – madlad

+0

@siddu Können Sie mir sagen, wie Sie dieses Problem gelöst haben? – bhspencer

5

Es war eine Antwort auf diese in den Apache kafka Mail-Archiven. Ich kopiere hier ein:

Der Verbraucher unterstützt eine Konfigurationsoption „metadata.max.age.ms“ die im Grunde steuert, wie oft Thema Metadaten abgerufen wird. Nach Standard ist dies ziemlich hoch eingestellt (5 Minuten), was bedeutet, dass es bis zu 5 Minuten dauern wird, um neue Themen zu entdecken, die zu Ihrem normalen Ausdruck passen. Sie können dies festlegen, um Themen schneller zu finden.

Also in Ihren Requisiten können Sie:

props.put("metadata.max.age.ms", 5000); 

Dies wird dazu führen, dass die Verbraucher alle 5 Sekunden über neue Themen zu erfahren.