2016-06-29 11 views
1

Hier ist mein Code für den Aufbau eines Kafka-Consumer aus dem Java-Client.Kafka Consumer nicht konsumieren

def buildConsumer[Key, Value](
    configuration: KafkaConfiguration, commitInterval: Long, groupId: Option[String] = None)(
    implicit keyDeserializer: Deserializer[Key], valueDeserializer: Deserializer[Value] 
): KafkaJavaConsumer[Key, Value] = { 
    val settingsMap: Map[String, Object] = Map(
     "bootstrap.servers" -> s"${configuration.bootstrapHost}:${configuration.bootstrapPort}", 
     "group.id" -> groupId.getOrElse(s"${configuration.topic}-${UUID.randomUUID}"), 
     "enable.auto.commit" -> "true", 
     "auto.commit.interval.ms" -> commitInterval.toString, 
     "auto.offset.reset" -> "earliest" 
    ) ++ configuration.additionalOptions.getOrElse(Map.empty[String, Object]) 
    val consumer = new KafkaJavaConsumer[Key, Value](settingsMap.asJava, keyDeserializer, valueDeserializer) 
    consumer.subscribe(Seq(configuration.topic).asJava) 
    consumer 
    } 

Mein Kafka läuft auf Port 6050 und ich habe es in der Konsole getestet, um von diesem bestimmten Port zu produzieren und zu konsumieren. Ich frage mich, ob mein Problem mit meiner Konfiguration oben zusammenhängt. Ich habe auch den Code oben mit der EmbeddedKafka Rahmen getestet, scheint das Problem mit einem tatsächlichen Kafka-Server ausgeführt werden.

EDIT:

ich vergaß hinzuzufügen, dass ich mehrere Verbraucher (mit unterschiedlichen group.id ‚s) aus dem gleichen Broker raubend, nicht sicher, ob dies das Problem ist.

+0

Können Sie die Verbraucherprotokolle hier einfügen, wenn der DEBUG-Modus aktiviert ist? –

Antwort

1

Stellen Sie sicher, dass

Anzahl der Partitionen im Thema> = Anzahl der Verbraucher-Instanzen in der Gruppe

einige der Verbraucher Instanzen Ansonsten in der Gruppe gewonnen t irgendwelche Partitionen zugewiesen werden.

die Anzahl der Partitionen zu überprüfen, verwenden Sie den Befehl kafka-topics.sh

> sh kafka-topics.sh --zookeeper localhost:2181 --topic test --describe Topic:test PartitionCount:6 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 2 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 3 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 4 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 5 Leader: 0 Replicas: 0 Isr: 0

0

Im noch nicht sicher, was das Problem ist, sondern durch die Tierpfleger Datenordner zu löschen und alle kafka Logs, begann der Verbraucher/Produzent wie vorgesehen zu arbeiten. Ich denke, das hat vielleicht mit dem Problem zu tun, dass ich die Log-Dateien löschte, um die Themen zu löschen, ohne die formalen Kafka-Admin-Tools zum Löschen von Themen zu verwenden.