Ich habe Kafka High-Level-Verbraucher.Wie Kafka Verbraucher Zustand zu überprüfen
public class KafkaHighLevelConsumer implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
private final int id;
public KafkaHighLevelConsumer(int id,
String groupId,
List<String> topics,BlockingQueue<String> storyQueue) {
this.id = id;
this.topics = topics;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9091");
props.put("group.id", groupId);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out.println(this.id + ": " + data);
}
}
} catch (WakeupException e) {
// ignore for shutdown
}finally {
consumer.close();
}
}
public void shutdown() {
consumer.wakeup();
}
}
Der Verbraucher funktioniert gut, aber ich muss den Zustand des Verbrauchers überwachen. Warum haben wir keine Ausnahme, wenn Server-IP oder -Port falsch ist?
Wenn ich den Port zu einigen falschen props.put("bootstrap.servers", "localhost:9091");
zu props.put("bootstrap.servers", "localhost:100500");
ändern, kann ich immer noch keine Ausnahme bekommen.
Ich würde gerne wissen, ob ich erfolgreich mit Kafka verbunden oder nicht! Ist es möglich, mit einem solchen Fall umzugehen?
Ich benutze solche Maven deps
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
Dank!
Sie können kafka-consumer-groups.sh verwenden, um Themenkonsumenten und seine Status zu sehen – MGolovanov
Es gibt keine Ausnahme, da der Client annimmt, dass der Broker möglicherweise gerade nicht aktiv ist. Der Client würde sich mit dem Broker verbinden, wenn er online geht. –