2016-06-29 8 views
0

Ich habe ein Warteschlangensystem auf Apache Kafka gebaut. Die Anwendung wird Nachrichten an bestimmte Kafka topic produzieren und auf der Verbraucherseite muss ich alle Datensätze zu dem Thema produziert verbrauchen.
Ich schrieb Verbraucher mit neuen Java Consumer API. Der Code sieht aus wieKafka Consumer (neue Consumer API) läuft für immer

Properties props = new Properties(); 
        props.put("bootstrap.servers", kafkaBrokerIp+":9092"); 
        props.put("group.id",groupId); 
        props.put("enable.auto.commit", "true"); 
        props.put("session.timeout.ms", "30000"); 
        props.put("auto.offset.reset", "earliest"); 
     props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 
     KafkaConsumer<String, String> consumer = new KafkaConsumer(props); 
        consumer.subscribe(Arrays.asList("consumertest")); 
        while (true) { 
         ConsumerRecords<String, String> records = consumer.poll(100); 
         for (ConsumerRecord<String, String> record : records){ 
          System.out.println("Data recieved : "+record.value()); 
          } 
        } 

Hier muß ich immer die Verbraucher führen, so dass jeder Datensatz in kafka Thema vom Hersteller geschoben sollte sofort verbraucht und verarbeitet werden.
Also meine Verwirrung ist, ist es ein richtiger Weg, eine unendliche while-Schleife (wie im Beispielcode) zu verwenden, um die Daten zu verbrauchen?

Antwort

0

Es funktioniert für mich, aber Sie möchten möglicherweise Ihre innere Schleife in einen try/catch-Block setzen, falls Sie irgendwelche Ausnahmen werfen. Berücksichtigen Sie auch eine periodische Wiederverbindungsaufgabe, wenn Sie die Verbindung trennen.

0

Ja, Sie können die Endlosschleife verwenden. Eigentlich ist es keine geschäftige Schleife. Bei jeder Abfrage, wenn Daten nicht verfügbar sind, wartet der Anruf für die angegebene Zeitspanne.

long millisToWait = 100; 
consumer.poll(millisToWait); 

Der neue Benutzer kümmert sich automatisch um die Netzwerkkommunikationsprobleme. Stellen Sie sicher, dass der Benutzer beim Herunterfahren ordnungsgemäß schließt.