2016-03-24 3 views
0

Ich habe seit Tagen kämpfen versucht, eine wirklich einfache Anwendung mit einem Kafka-Broker und einem Kafka-Produzent auf localhost laufen zu lassen, habe ich alle möglichen gelesen Antworten auf ähnliche Probleme bei Google, aber ich konnte es immer noch nicht funktionieren.kafka.producer.async.DefaultEventHandler - Senden von Anfragen für Themen fehlgeschlagen

Dies ist der Fehler, den ich bekommen:

INFO kafka.client.ClientUtils$ - Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s) Set(clicks) 
[ProducerSendThread-] INFO kafka.producer.SyncProducer - Connected to localhost:9092 for producing 
[ProducerSendThread-] INFO kafka.producer.SyncProducer - Disconnecting from localhost:9092 
[ProducerSendThread-] WARN kafka.producer.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic clicks -> 
No partition metadata for topic clicks due to kafka.common.LeaderNotAvailableException}] for topic [clicks]: class kafka.common.LeaderNotAvailableException 
[ProducerSendThread-] INFO kafka.client.ClientUtils$ - Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 1 for 1 topic(s) Set(clicks) 
[ProducerSendThread-] INFO kafka.producer.SyncProducer - Connected to localhost:9092 for producing 
[ProducerSendThread-] INFO kafka.producer.SyncProducer - Disconnecting from localhost:9092 
[ProducerSendThread-] WARN kafka.producer.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic clicks -> 
No partition metadata for topic clicks due to kafka.common.LeaderNotAvailableException}] for topic [clicks]: class kafka.common.LeaderNotAvailableException 
[ProducerSendThread-] ERROR kafka.producer.async.DefaultEventHandler - Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: clicks 
[ProducerSendThread-] INFO kafka.producer.async.DefaultEventHandler - Back off for 100 ms before retrying send. Remaining retries = 3 

Und dies ist mein Code:

Properties properties= new Properties(); 
properties.put("broker.id", "1"); 
properties.put("advertised.host.name", "localhost"); 
properties.put("advertised.port", "9092"); 
properties.put("host.name", "localhost"); 
properties.put("auto.create.topics.enable","true"); 
properties.put("zookeeper.connect", zookeeperConnectString); 
properties.put("port","9092"); 
properties.setProperty("num.partitions", "1"); 
properties.setProperty("log.dirs", newPath(KAFKA_LOG_DIR).toString()); 

KafkaConfig kafkaConfig = new KafkaConfig(properties); 
KafkaServerStartable kafkaServer = new KafkaServerStartable(kafkaConfig); 
kafkaServer.startup(); 

String topic = clicks; 

ZkClient zookeeper = new ZkClient(zookeeperConnectString, 30000, 30000, ZKStringSerializer$.MODULE$); 
if (!AdminUtils.topicExists(zookeeper, topic)) { 
    AdminUtils.createTopic(new ZkClient(zookeeperConnectString), topic, 1, 1, new Properties()); 
    } 
zookeeper.close(); 

Properties producerProps = new Properties(); 
producerProps.put("serializer.class", "kafka.serializer.StringEncoder"); 
producerProps.put("key.serializer.class", "kafka.serializer.StringEncoder"); 
producerProps.setProperty("producer.type", "async"); 
producerProps.put("metadata.broker.list", "localhost:9092"); 
producerProps.put("request.required.acks","0"); 

Producer producer = new Producer(new ProducerConfig(producerProps)); 

String click = "exampleMessage"; 
producer.send(ImmutableList.of(new KeyedMessage(topic, click))); 
producer.close(); 

Ich habe bereits eine zookeeper Instanz auf localhost korrekt ausgeführt wird: 2181.

Ich verwende die folgenden Versionen von kafka und zookeeper:

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.10</artifactId> 
     <version>0.8.2.2</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.8.2.2</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.zookeeper</groupId> 
     <artifactId>zookeeper</artifactId> 
     <version>3.4.6</version> 
    </dependency> 

Vielen Dank für jede Hilfe und Kommentare :)

+0

Haben Sie versucht, Metainformationen über die Befehlszeile mithilfe des Befehls _describe_ abzurufen? 'bin/kafka-topics.sh --describe --zookeeper localhost: 2181 --topic clicks' – avr

+0

Ich kann die kafka-Befehlszeile nicht verwenden. Wenn ich jedoch auf zoekeeper mit dem Kurator zugreife, sehe ich, dass ein Knoten für das Thema erstellt wurde:/brokers/topic/cliks. Stattdessen ist der Knoten/brokers/ids leer. – nicola

+0

d. H. Es gibt keine aktiven Kafka-Broker. Stellen Sie also sicher, dass mindestens ein Kafka-Broker aktiv ist. – avr

Antwort

0

Falls jemand hat das gleiche Problem gestoßen, habe ich es Arbeit Zugabe Thread.sleep(500); nach der Erstellung des Themas. Ich weiß nicht genau, warum dies der Fall ist, weil die Instanz von Zookeeper auf localhost ausgeführt wird und einige Zeit zur Initialisierung benötigt wird.