2016-06-29 12 views
0

Ich habe Kafka High-Level-Verbraucher.Wie Kafka Verbraucher Zustand zu überprüfen

public class KafkaHighLevelConsumer implements Runnable { 
    private final KafkaConsumer<String, String> consumer; 
    private final List<String> topics; 
    private final int id; 

    public KafkaHighLevelConsumer(int id, 
         String groupId, 
         List<String> topics,BlockingQueue<String> storyQueue) { 
     this.id = id; 
     this.topics = topics; 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "localhost:9091"); 
     props.put("group.id", groupId); 
     props.put("key.deserializer", StringDeserializer.class.getName()); 
     props.put("value.deserializer", StringDeserializer.class.getName()); 
     this.consumer = new KafkaConsumer<>(props); 
    } 

    @Override 
    public void run() { 
     try { 
      consumer.subscribe(topics); 

      while (true) { 
       ConsumerRecords<String, String> records = consumer.poll(100); 
       for (ConsumerRecord<String, String> record : records) { 
        Map<String, Object> data = new HashMap<>(); 
        data.put("partition", record.partition()); 
        data.put("offset", record.offset()); 
        data.put("value", record.value()); 
        System.out.println(this.id + ": " + data); 
       } 
      } 
     } catch (WakeupException e) { 
      // ignore for shutdown 
     }finally { 
      consumer.close(); 
     } 
    } 

    public void shutdown() { 
     consumer.wakeup(); 
    } 
} 

Der Verbraucher funktioniert gut, aber ich muss den Zustand des Verbrauchers überwachen. Warum haben wir keine Ausnahme, wenn Server-IP oder -Port falsch ist?

Wenn ich den Port zu einigen falschen props.put("bootstrap.servers", "localhost:9091"); zu props.put("bootstrap.servers", "localhost:100500"); ändern, kann ich immer noch keine Ausnahme bekommen.

Ich würde gerne wissen, ob ich erfolgreich mit Kafka verbunden oder nicht! Ist es möglich, mit einem solchen Fall umzugehen?

Ich benutze solche Maven deps

<dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka-clients</artifactId> 
      <version>0.9.0.1</version> 
     </dependency> 

Dank!

+0

Sie können kafka-consumer-groups.sh verwenden, um Themenkonsumenten und seine Status zu sehen – MGolovanov

+0

Es gibt keine Ausnahme, da der Client annimmt, dass der Broker möglicherweise gerade nicht aktiv ist. Der Client würde sich mit dem Broker verbinden, wenn er online geht. –

Antwort

0

Nach dem Client-Dokumentation (https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html):

Es wird transparent den Ausfall von Servern im Cluster Kafka Griff

Die Bibliothek den Unterschied zwischen einem vorübergehenden Ausfall kann nicht sagen, eine permanente Fehler oder eine Fehlkonfiguration. So betrachtet es jeden Fehler als "wiederholbar" und tut dies genau, wiederholt sich für immer und gibt niemals einen Fehler zurück.

Dies ist eine Behelfslösung des Zustand der Verbindung zu überprüfen: einige Informationen anfordern, die für eine vernünftige Zeit zurückzukehren gewährleistet ist, warten und wenn Sie wissen nichts kehrt dann, dass die Verbindung ein Problem hat:

int CONNECTION_TEST_TIMEOUT_SECONDS = 10; // or whatever is appropriate for your environment 

ExecutorService executor = Executors.newSingleThreadExecutor(); 
Runnable testTask = consumer::listTopics; 

Future future = executor.submit(testTask); 
try { 
    future.get(CONNECTION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); 
} catch (TimeoutException te) { 
    consumer.wakeup(); 
    throw new IOException("Could not communicate with the server within " + CONNECTION_TEST_TIMEOUT_SECONDS + " seconds"); 
} catch (InterruptedException e) { 
    // Nothing to do. Maybe a warning in the log? 
} catch (ExecutionException e) { 
    throw new IOException("Exception while running connection test: " + e.getMessage(), e); 
}