ich integriere meine anwendung mit spring-kafka (nicht feder-integration-kafka). Hier ist Frühjahr Dokumentation für das Projekt: http://docs.spring.io/spring-kafka/docs/1.0.1.RELEASE/reference/htmlsinglespring-kafka (nicht integration) verbraucher keine konsummeldung
Mein Produzent funktioniert perfekt, aber Verbraucher verbraucht keine Nachrichten. Irgendwelche Zeiger.
Hier ist meine Konfiguration:
@EnableKafka
public class MyConfig {
@Value("${kafka.broker.list}") // List of servers server:port,
private String kafkaBrokerList;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, Message>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(12);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setIdleEventInterval(60000L);
factory.setAutoStartup(Boolean.TRUE);
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
@Bean
public ConsumerFactory<Integer, Message> consumerFactory() {
JsonDeserializer<Message> messageJsonDeserializer = new JsonDeserializer<>(Message.class);
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new IntegerDeserializer(), messageJsonDeserializer);
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerList);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
props.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 60000);
props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 10000);
return props;
}
@KafkaListener(topics = "myTopic", containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload Message message) {
System.out.println(message);
}
}
** mit mehr Informationen Herausgegeben **
Dank Gary für die Antwort. Ich sehe keine Ausnahmen im Protokoll. Auch ich versuchte KafkaTemplate
mit ähnlicher Konfiguration und ich bin in der Lage, Nachricht zu Warteschlange aber für Verbraucher, kein Glück zu veröffentlichen. Ich ändere Code, um String anstelle meines Nachrichtenobjekts zu verwenden. Hier sind Teile von Log:
2016-07-11 09:31:43.314 INFO [RMI TCP Connection(2)-127.0.0.1] o.a.k.c.c.ConsumerConfig [AbstractConfig.java:165] ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id =
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [app1.qa:9092, app1.qa:9093, app2.qa:9092, app2.qa:9093, app3.qa:9092, app3.qa:9093]
retry.backoff.ms = 10000
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = true
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 60000
ssl.truststore.password = null
session.timeout.ms = 15000
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 10000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1
send.buffer.bytes = 131072
auto.offset.reset = latest
Auch ich sehe in log folgende:
2016-07-11 09:31:53.515 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-10] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[]
2016-07-11 09:31:53.515 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-11] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[]
2016-07-11 09:31:53.516 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-3] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[]
2016-07-11 09:31:53.516 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-12] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[]
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-8] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead.
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-3] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead.
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-10] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead.
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-12] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead.
Ihr Code sieht gut aus. Also, nicht sicher, nach welcher Probe Sie suchen. Selbst wenn wir eins hinzufügen, wird es ziemlich einfach und ohne so große Konfigurationsoptionen sein. Also, oder du verbindest dich mit falschem Kafka und/oder Thema oder es gibt etwas in den Logs, das irgendwie auf einen Fehler hinweist. –
Danke Gary für die Antwort. Ich sehe keine Ausnahmen im Protokoll.Auch habe ich KafkaTemplate mit ähnlicher Konfiguration versucht, und ich bin in der Lage, Nachricht in der Warteschlange zu veröffentlichen, aber für Verbraucher, kein Glück. Ich ändere Code, um String anstelle meines Nachrichtenobjekts zu verwenden. Aufgrund der Beschränkung der Länge der Kommentare habe ich dies im Hauptpost hinzugefügt. – Shailesh
Sie können weitere Informationen in den Protokollen nach dem Hinzufügen dieser Kategorie verfolgen: 'org.apache.kafka.clients = DEBUG' –