Ich habe ein Warteschlangensystem auf Apache Kafka
gebaut. Die Anwendung wird Nachrichten an bestimmte Kafka topic
produzieren und auf der Verbraucherseite muss ich alle Datensätze zu dem Thema produziert verbrauchen.
Ich schrieb Verbraucher mit neuen Java Consumer API. Der Code sieht aus wieKafka Consumer (neue Consumer API) läuft für immer
Properties props = new Properties();
props.put("bootstrap.servers", kafkaBrokerIp+":9092");
props.put("group.id",groupId);
props.put("enable.auto.commit", "true");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("consumertest"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.println("Data recieved : "+record.value());
}
}
Hier muß ich immer die Verbraucher führen, so dass jeder Datensatz in kafka Thema vom Hersteller geschoben sollte sofort verbraucht und verarbeitet werden.
Also meine Verwirrung ist, ist es ein richtiger Weg, eine unendliche while-Schleife (wie im Beispielcode) zu verwenden, um die Daten zu verbrauchen?