I KafkaConsumer bin mit Nachrichten von Kafka-Server (Themen) zu konsumieren ..kafka Verbraucher dynamisch Themen erkennen hinzugefügt
- Es funktioniert für Themen Fein erstellt vor Consumer-Code gestartet ...
Aber das Problem ist, es wird nicht funktionieren, wenn die Themen dynamisch erstellt (ich meine zu sagen, nachdem Verbraucher-Code gestartet), aber die API sagt, dass es dynamische Thema Erstellung unterstützt .. Hier ist der Link für Ihre Referenz ..
Kafka Version verwendet: 0.9.0.1
https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
ist der JAVA-Code ...
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Pattern r = Pattern.compile("siddu(\\d)*");
consumer.subscribe(r, new HandleRebalance());
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(partition.partition() + ": " +record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
HINWEIS: Mein Thema Name der Regular Expression passen .. Und wenn ich die Verbraucher neu starten, dann wird es jetzt schon Beiträge lesen gedrückt zum Thema ...
Jede Hilfe ist wirklich zu schätzen ...
Danke für Ihre Antwort und Hilfe ... im Grunde wollte ich KafkaConsumer API verwenden, um dies zu erreichen und ich löste es selbst .. – siddu
Wie wurde das gelöst? Ich habe das gleiche Problem. – madlad
@siddu Können Sie mir sagen, wie Sie dieses Problem gelöst haben? – bhspencer