5

Ich bin neu in Kafka 0.9 und testen einige Funktionen, die ich ein seltsames Verhalten in der Java-implementiert Consumer (KafkaConsumer) realisiert.Kafka Consumer Poll() -Methode wird blockiert

Der Kafka-Broker befindet sich in einem externen Gerät Ambari.

Auch wenn ich einen Producer implementieren und Nachrichten an den externen Broker senden könnte, habe ich keine Ahnung, warum, wenn der Verbraucher versucht, die Ereignisse (Umfrage) zu lesen, bleibt er stecken.

Ich weiß, dass der Produzent gut funktioniert, da ich Nachrichten über den Konsolenverbraucher konsumieren kann (der lokal auf ambari arbeitet). Aber wenn ich den Java Consumer ausführe, passiert nichts, bleibt einfach hängen. Debuggen der Code, den ich konnte sehen, dass es an der poll() Linie blockiert wird:

ConsumerRecords<String, String> records = consumer.poll(100); 

Der Timeout nichts tut, nebenbei bemerkt. Es spielt keine Rolle, ob Sie 0, 100 oder 1000 ms setzen, der Verbraucher wird in dieser Zeile blockiert und gibt keine Zeitüberschreitungen oder Ausnahmen aus.

habe ich versucht, alle Arten von alternativen Eigenschaften, wie advertised.host.name, advertised.listener, ... und so weiter, mit Null Glück.

Jede Hilfe würde sehr geschätzt werden. Danke im Voraus!

+0

Können Sie die Nachrichten auf andere Weise verwenden, z. B. mit 'kafka-console-consumer.sh'? –

+0

Ja, ich bin. Von der Maschine, die den Ambari hostet, kann ich Nachrichten über den Konsolenverbraucher konsumieren. –

+0

Und was ist mit der Maschine, auf der Sie Ihren Kunden betreiben? Hast du den Konsolenverbraucher dort getestet? –

Antwort

1

Grund könnte sein, dass der Computer, auf dem der Consumer-Code ausgeführt wird, keine Verbindung zum Zoowächter herstellen kann. Versuchen Sie, den gleichen Verbrauchercode auf dem Computer auszuführen, auf dem Ihr Kafka installiert ist (ich habe es versucht und für mich gearbeitet). Ich löste auch Problem durch Erwähnen der folgenden Eigenschaften in der Datei server.properties: // in meinem Fall ist es öffentliche IP von ec2 Maschine, ich habe kafka und zookeeeper auf demselben ec2 installiert. ConsumerRecords<String, String> records = consumer.poll(100); Die obige Aussage bedeutet nicht, Verbraucher wird nach 100 ms Timeout, es ist die Abfragezeit. Alle Daten, die in 100 ms erfasst werden, werden in die Datensatzerfassung eingelesen.