Ich habe seit Tagen kämpfen versucht, eine wirklich einfache Anwendung mit einem Kafka-Broker und einem Kafka-Produzent auf localhost laufen zu lassen, habe ich alle möglichen gelesen Antworten auf ähnliche Probleme bei Google, aber ich konnte es immer noch nicht funktionieren.kafka.producer.async.DefaultEventHandler - Senden von Anfragen für Themen fehlgeschlagen
Dies ist der Fehler, den ich bekommen:
INFO kafka.client.ClientUtils$ - Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s) Set(clicks)
[ProducerSendThread-] INFO kafka.producer.SyncProducer - Connected to localhost:9092 for producing
[ProducerSendThread-] INFO kafka.producer.SyncProducer - Disconnecting from localhost:9092
[ProducerSendThread-] WARN kafka.producer.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic clicks ->
No partition metadata for topic clicks due to kafka.common.LeaderNotAvailableException}] for topic [clicks]: class kafka.common.LeaderNotAvailableException
[ProducerSendThread-] INFO kafka.client.ClientUtils$ - Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 1 for 1 topic(s) Set(clicks)
[ProducerSendThread-] INFO kafka.producer.SyncProducer - Connected to localhost:9092 for producing
[ProducerSendThread-] INFO kafka.producer.SyncProducer - Disconnecting from localhost:9092
[ProducerSendThread-] WARN kafka.producer.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic clicks ->
No partition metadata for topic clicks due to kafka.common.LeaderNotAvailableException}] for topic [clicks]: class kafka.common.LeaderNotAvailableException
[ProducerSendThread-] ERROR kafka.producer.async.DefaultEventHandler - Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: clicks
[ProducerSendThread-] INFO kafka.producer.async.DefaultEventHandler - Back off for 100 ms before retrying send. Remaining retries = 3
Und dies ist mein Code:
Properties properties= new Properties();
properties.put("broker.id", "1");
properties.put("advertised.host.name", "localhost");
properties.put("advertised.port", "9092");
properties.put("host.name", "localhost");
properties.put("auto.create.topics.enable","true");
properties.put("zookeeper.connect", zookeeperConnectString);
properties.put("port","9092");
properties.setProperty("num.partitions", "1");
properties.setProperty("log.dirs", newPath(KAFKA_LOG_DIR).toString());
KafkaConfig kafkaConfig = new KafkaConfig(properties);
KafkaServerStartable kafkaServer = new KafkaServerStartable(kafkaConfig);
kafkaServer.startup();
String topic = clicks;
ZkClient zookeeper = new ZkClient(zookeeperConnectString, 30000, 30000, ZKStringSerializer$.MODULE$);
if (!AdminUtils.topicExists(zookeeper, topic)) {
AdminUtils.createTopic(new ZkClient(zookeeperConnectString), topic, 1, 1, new Properties());
}
zookeeper.close();
Properties producerProps = new Properties();
producerProps.put("serializer.class", "kafka.serializer.StringEncoder");
producerProps.put("key.serializer.class", "kafka.serializer.StringEncoder");
producerProps.setProperty("producer.type", "async");
producerProps.put("metadata.broker.list", "localhost:9092");
producerProps.put("request.required.acks","0");
Producer producer = new Producer(new ProducerConfig(producerProps));
String click = "exampleMessage";
producer.send(ImmutableList.of(new KeyedMessage(topic, click)));
producer.close();
Ich habe bereits eine zookeeper Instanz auf localhost korrekt ausgeführt wird: 2181.
Ich verwende die folgenden Versionen von kafka und zookeeper:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
Vielen Dank für jede Hilfe und Kommentare :)
Haben Sie versucht, Metainformationen über die Befehlszeile mithilfe des Befehls _describe_ abzurufen? 'bin/kafka-topics.sh --describe --zookeeper localhost: 2181 --topic clicks' – avr
Ich kann die kafka-Befehlszeile nicht verwenden. Wenn ich jedoch auf zoekeeper mit dem Kurator zugreife, sehe ich, dass ein Knoten für das Thema erstellt wurde:/brokers/topic/cliks. Stattdessen ist der Knoten/brokers/ids leer. – nicola
d. H. Es gibt keine aktiven Kafka-Broker. Stellen Sie also sicher, dass mindestens ein Kafka-Broker aktiv ist. – avr