Hier ist mein Code für den Aufbau eines Kafka-Consumer aus dem Java-Client.Kafka Consumer nicht konsumieren
def buildConsumer[Key, Value](
configuration: KafkaConfiguration, commitInterval: Long, groupId: Option[String] = None)(
implicit keyDeserializer: Deserializer[Key], valueDeserializer: Deserializer[Value]
): KafkaJavaConsumer[Key, Value] = {
val settingsMap: Map[String, Object] = Map(
"bootstrap.servers" -> s"${configuration.bootstrapHost}:${configuration.bootstrapPort}",
"group.id" -> groupId.getOrElse(s"${configuration.topic}-${UUID.randomUUID}"),
"enable.auto.commit" -> "true",
"auto.commit.interval.ms" -> commitInterval.toString,
"auto.offset.reset" -> "earliest"
) ++ configuration.additionalOptions.getOrElse(Map.empty[String, Object])
val consumer = new KafkaJavaConsumer[Key, Value](settingsMap.asJava, keyDeserializer, valueDeserializer)
consumer.subscribe(Seq(configuration.topic).asJava)
consumer
}
Mein Kafka läuft auf Port 6050 und ich habe es in der Konsole getestet, um von diesem bestimmten Port zu produzieren und zu konsumieren. Ich frage mich, ob mein Problem mit meiner Konfiguration oben zusammenhängt. Ich habe auch den Code oben mit der EmbeddedKafka
Rahmen getestet, scheint das Problem mit einem tatsächlichen Kafka-Server ausgeführt werden.
EDIT:
ich vergaß hinzuzufügen, dass ich mehrere Verbraucher (mit unterschiedlichen group.id
‚s) aus dem gleichen Broker raubend, nicht sicher, ob dies das Problem ist.
Können Sie die Verbraucherprotokolle hier einfügen, wenn der DEBUG-Modus aktiviert ist? –