2016-01-17 13 views
13

Ich mache die Kafka Quickstart für Kafka 0.9.0.0.Consumer keine Nachrichten, Kafka-Konsole, neue Consumer-API, Kafka 0,9

Ich habe zookeeper bei localhost:2181 hören, weil ich lief

bin/zookeeper-server-start.sh config/zookeeper.properties 

Ich habe einen einzigen Broker bei localhost:9092 hören, weil ich

lief
bin/kafka-server-start.sh config/server.properties 

ich einen Produzenten Posting Thema „test“, weil ich ran

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
yello 
is this thing on? 
let's try another 
gimme more 

Wenn ich die alte API Consu ausführen mer, es durch Ausführen

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning 

Allerdings funktioniert, wenn ich die neuen API Verbraucher laufen, ich habe nicht alles, wenn ich laufe

bin/kafka-console-consumer.sh --new-consumer --topic test --from-beginning \ 
    --bootstrap-server localhost:9092 

Ist es möglich, ein Thema abonnieren aus der Konsolenverbraucher mit der neuen API? Wie kann ich es reparieren?

+0

Welche Scala-Version verwenden Sie? Haben Sie kompiliert kafka kompiliert? Ich hatte ein paar kleinere Probleme mit kafka_2.10-0.9.0.0.tgz, aber mit kafka_2.101-0.9.0.0.tgz funktioniert es wie ein Zauber, Ihr Beispiel enthalten. – vlain

+0

Ok danke, das war mit 2.10. Wenn ich es nochmal versuche, wird es mit 2.11 sein. – EthanP

+0

hast du 'test' Thema erstellt? –

Antwort

1

Können Sie bitte wie folgt versuchen:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic 
+1

Dieser Befehl dient zum Ausführen der alten Consumer-API. Ihre Antwort wird bereits in Problem erwähnt. –

0

verwenden: bin/kafka-console-consumer.sh --bootstrap-Server localhost: 9092 --topic Test --from-Anfang

Hinweis: entfernen „--new-Verbraucher“ von Ihrem Befehl

als Referenz siehe hier: https://kafka.apache.org/quickstart

+0

Dann wird es den neuen Verbraucher nicht verwenden, und die Frage ist, wie Nachrichten mit dem neuen Verbraucher abgerufen werden. –

+0

0.9.0.0 in dieser Version von Kafka, ihre neue Konsole Verbraucher funktionierte nicht, sie gaben Java Verbraucher aber nicht Konsole Verbraucher. Und jetzt haben sie "-New-Consumer" von späteren Versionen komplett entfernt. –

0

ich das gleiche Problem bekam, jetzt habe ich herausgefunden.

Wenn Sie --zoookeke verwenden, sollte es als Parameter mit einer Tierpflegeradresse versehen sein.

Wenn Sie --bootstrap-server verwenden, sollte es mit einer Brokeradresse als Parameter versehen sein.

+0

Die Frage besagt, dass sie bereits eine Brokeradresse als Parameter bereitstellen; Port 9092 ist der Standard-Kafka-Port. –

+0

gut, nicht immer, wenn Sie die HDP-Sandbox für Docker herunterladen es standardmäßig auf 6667 – Loebre

1

Ich lief gerade in dieses Problem und die Lösung war /brokers in zookeeper zu löschen und die kafka Knoten neu zu starten.

bin/zookeeper-shell <zk-host>:2181

und dann

rmr /brokers

nicht sicher, warum dies sie löst.

Wenn ich die Debug-Protokollierung aktiviert ist, habe ich diese Fehlermeldung immer und immer wieder in den Verbraucher:

2017-07-07 01:20:12 DEBUG AbstractCoordinator:548 - Sending GroupCoordinator request for group test to broker xx.xx.xx.xx:9092 (id: 1007 rack: null) 2017-07-07 01:20:12 DEBUG AbstractCoordinator:559 - Received GroupCoordinator response ClientResponse(receivedTimeMs=1499390412231, latencyMs=84, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=13,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group test 2017-07-07 01:20:12 DEBUG AbstractCoordinator:581 - Group coordinator lookup for group test failed: The group coordinator is not available. 2017-07-07 01:20:12 DEBUG AbstractCoordinator:215 - Coordinator discovery failed for group test, refreshing metadata

-2

In kafka_2.11-0.11.0.0 der zookeeper Server ist veraltet und es wird mit Bootstrap-Server, und es wird Broker IP-Adresse und Port. Wenn Sie korrekte Broker-Parameter angeben, können Sie Nachrichten konsumieren.

z.B. $ bin/kafka-konsolenverbraucher.sh --bootstrap-server: 9093 --topic test --von Anfang an

Ich benutze Port 9093, für Sie kann es variieren.

grüße.

+0

Die Frage sagt, dass das ist, was sie bereits tun. –

0

Ihr localhost ist das foo hier. Wenn Sie das Localhost-Wort für den tatsächlichen Hostnamen ersetzen, sollte es funktionieren.

wie folgt aus:

Produzent

./bin/kafka-console-producer.sh --broker-list \ 
sandbox-hdp.hortonworks.com:9092 --topic test 

Verbraucher:

./bin/kafka-console-consumer.sh --topic test --from-beginning \  
--bootstrap-server bin/kafka-console-consumer.sh --new-consumer \ 
--topic test --from-beginning \ 
--bootstrap-server localhost:9092 
1

Dieses Problem wirkt sich auch auf Daten aus dem kafka Einnahme Gerinne mit und die Daten an HDFS sinken.

die oben Problem zu beheben:

  1. Stopp Kafka Makler
  2. Connect Cluster Zookeeper und entfernen/Makler z Knoten
  3. Restart kafka Makler

Es gibt keine Frage bezüglich zu kafka client version und scala version, dass wir den cluster verwenden. Der Zookeeper hat möglicherweise falsche Informationen über Broker-Hosts.

die Aktion, um zu überprüfen:

Thema erstellen in kafka.

kafka-console-Verbraucher --bootstrap-Server slavenode01.cdh.com:9092 --topic rkkrishnaa3210 --from-Anfang

Öffnen Sie ein Produzent Kanal und einige Nachrichten, um es zu füttern.

kafkas-console-Produzent --broker-Liste slavenode03.cdh.com:9092 --topic rkkrishnaa3210

öffnen Sie einen Consumer-Kanal die Nachricht von einem bestimmten Thema zu konsumieren.

kafkas-console-Verbraucher --bootstrap-Server slavenode01.cdh.com:9092 --topic rkkrishnaa3210 --from-Anfang

Um dies zu testen von Gerinne:

Flume Mittel config:

rk.sources = source1 
rk.channels = channel1 
rk.sinks = sink1 

rk.sources.source1.type = org.apache.flume.source.kafka.KafkaSource 
rk.sources.source1.zookeeperConnect = ip-20-0-21-161.ec2.internal:2181 
rk.sources.source1.topic = rkkrishnaa321 
rk.sources.source1.groupId = flume1 
rk.sources.source1.channels = channel1 
rk.sources.source1.interceptors = i1 
rk.sources.source1.interceptors.i1.type = timestamp 
rk.sources.source1.kafka.consumer.timeout.ms = 100 
rk.channels.channel1.type = memory 
rk.channels.channel1.capacity = 10000 
rk.channels.channel1.transactionCapacity = 1000 
rk.sinks.sink1.type = hdfs 
rk.sinks.sink1.hdfs.path = /user/ce_rk/kafka/%{topic}/%y-%m-%d 
rk.sinks.sink1.hdfs.rollInterval = 5 
rk.sinks.sink1.hdfs.rollSize = 0 
rk.sinks.sink1.hdfs.rollCount = 0 
rk.sinks.sink1.hdfs.fileType = DataStream 
rk.sinks.sink1.channel = channel1 

Run Gerinne Mittel:

flume-ng agent --conf . -f flume.conf -Dflume.root.logger=DEBUG,console -n rk 

vom Verbraucher beachten, dass die Protokolle mich Ssage aus dem Thema wird in HDFS geschrieben.

18/02/16 05:21:14 INFO internals.AbstractCoordinator: Successfully joined group flume1 with generation 1 
18/02/16 05:21:14 INFO internals.ConsumerCoordinator: Setting newly assigned partitions [rkkrishnaa3210-0] for group flume1 
18/02/16 05:21:14 INFO kafka.SourceRebalanceListener: topic rkkrishnaa3210 - partition 0 assigned. 
18/02/16 05:21:14 INFO kafka.KafkaSource: Kafka source source1 started. 
18/02/16 05:21:14 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean. 
18/02/16 05:21:14 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: source1 started 
18/02/16 05:21:41 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false 
18/02/16 05:21:42 INFO hdfs.BucketWriter: Creating /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp 
18/02/16 05:21:48 INFO hdfs.BucketWriter: Closing /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp 
18/02/16 05:21:48 INFO hdfs.BucketWriter: Renaming /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp to /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920 
18/02/16 05:21:48 INFO hdfs.HDFSEventSink: Writer callback called.