2016-06-09 13 views
1

Wenn ich den kafka Verbraucher in einem Thread laufen lasse, ohne ihn von außen zu manipulieren, wie den Verbraucher einzuschalten oder ihn aufzuwecken, ist es notwendig, die WakeupException richtig zu behandeln? Und was ist ein guter Ansatz, um damit umzugehen?Kafka Consumer WakeupException Handhabung Java

Der Verbraucher läuft auf einem Webservice, um ständig Daten aus einer Warteschlange zu ziehen und sollte nie damit aufhören. Außerdem hat der Dienst keinen Ruhe- oder Ruhezustand. In der Dokumentation von Kafka wird darauf hingewiesen, dass die Ausnahme nur ausgelöst wird, wenn der Kafka-Konsument von einem anderen Thread blockiert wird, aber das wird niemals passieren. https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/errors/WakeupException.html

Kafka Version 0.10.0.0

catch (WakeupException e) { 
    LOG.info("Kafka Consulmer wakeup exception"); 
    // Ignore exception if closing 
    if (!closed.get()) { 
     throw e; 
    } 
} finally { 
    consumer.close(); 
} 

Grüße, Rakesh

Antwort

0

Sie können einige Beispiele auf dem Confluent Dokument finden, die zeigen, wie man richtig die WakeupException Grundsätzlich [Confluent Consumer Doc]

behandeln, wenn Sie verwenden consumer.poll(Integer.MAX_VALUE) der Verbraucher wird blockieren, bis eine Nachricht abgerufen wird. Wenn Sie in diesem Fall den Verbrauch stoppen möchten, können Sie consumer.wakeup() (von einem anderen Thread) aufrufen und die Ausnahme abfangen, um ordnungsgemäß herunterzufahren.

Während Sie Ihre Offsets gleichzeitig synchron senden, wird ein Anruf an consumer.wakeup() einen WakeupException werfen.