2016-07-08 21 views
2

ich integriere meine anwendung mit spring-kafka (nicht feder-integration-kafka). Hier ist Frühjahr Dokumentation für das Projekt: http://docs.spring.io/spring-kafka/docs/1.0.1.RELEASE/reference/htmlsinglespring-kafka (nicht integration) verbraucher keine konsummeldung

Mein Produzent funktioniert perfekt, aber Verbraucher verbraucht keine Nachrichten. Irgendwelche Zeiger.

Hier ist meine Konfiguration:

@EnableKafka 
public class MyConfig { 

    @Value("${kafka.broker.list}") // List of servers server:port, 
    private String kafkaBrokerList; 

    @Bean 
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, Message>> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<Integer, Message> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     factory.setConcurrency(12); 
     factory.getContainerProperties().setPollTimeout(3000); 
     factory.getContainerProperties().setIdleEventInterval(60000L); 
     factory.setAutoStartup(Boolean.TRUE); 
     factory.setMessageConverter(new StringJsonMessageConverter()); 
     return factory; 
    } 

    @Bean 
    public ConsumerFactory<Integer, Message> consumerFactory() { 
     JsonDeserializer<Message> messageJsonDeserializer = new JsonDeserializer<>(Message.class); 
     return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new IntegerDeserializer(), messageJsonDeserializer); 
    } 

    @Bean 
    public Map<String, Object> consumerConfigs() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerList); 
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
     props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000); 
     props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000); 
     props.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 60000); 
     props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 10000); 
     return props; 
    } 

    @KafkaListener(topics = "myTopic", containerFactory = "kafkaListenerContainerFactory") 
    public void listen(@Payload Message message) { 
     System.out.println(message); 
    } 

} 

** mit mehr Informationen Herausgegeben **

Dank Gary für die Antwort. Ich sehe keine Ausnahmen im Protokoll. Auch ich versuchte KafkaTemplate mit ähnlicher Konfiguration und ich bin in der Lage, Nachricht zu Warteschlange aber für Verbraucher, kein Glück zu veröffentlichen. Ich ändere Code, um String anstelle meines Nachrichtenobjekts zu verwenden. Hier sind Teile von Log:

2016-07-11 09:31:43.314 INFO [RMI TCP Connection(2)-127.0.0.1] o.a.k.c.c.ConsumerConfig [AbstractConfig.java:165] ConsumerConfig values: 
    metric.reporters = [] 
    metadata.max.age.ms = 300000 
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 
    group.id = 
    partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] 
    reconnect.backoff.ms = 50 
    sasl.kerberos.ticket.renew.window.factor = 0.8 
    max.partition.fetch.bytes = 1048576 
    bootstrap.servers = [app1.qa:9092, app1.qa:9093, app2.qa:9092, app2.qa:9093, app3.qa:9092, app3.qa:9093] 
    retry.backoff.ms = 10000 
    sasl.kerberos.kinit.cmd = /usr/bin/kinit 
    sasl.kerberos.service.name = null 
    sasl.kerberos.ticket.renew.jitter = 0.05 
    ssl.keystore.type = JKS 
    ssl.trustmanager.algorithm = PKIX 
    enable.auto.commit = true 
    ssl.key.password = null 
    fetch.max.wait.ms = 500 
    sasl.kerberos.min.time.before.relogin = 60000 
    connections.max.idle.ms = 60000 
    ssl.truststore.password = null 
    session.timeout.ms = 15000 
    metrics.num.samples = 2 
    client.id = 
    ssl.endpoint.identification.algorithm = null 
    key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer 
    ssl.protocol = TLS 
    check.crcs = true 
    request.timeout.ms = 40000 
    ssl.provider = null 
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
    ssl.keystore.location = null 
    heartbeat.interval.ms = 3000 
    auto.commit.interval.ms = 10000 
    receive.buffer.bytes = 32768 
    ssl.cipher.suites = null 
    ssl.truststore.type = JKS 
    security.protocol = PLAINTEXT 
    ssl.truststore.location = null 
    ssl.keystore.password = null 
    ssl.keymanager.algorithm = SunX509 
    metrics.sample.window.ms = 30000 
    fetch.min.bytes = 1 
    send.buffer.bytes = 131072 
    auto.offset.reset = latest 

Auch ich sehe in log folgende:

2016-07-11 09:31:53.515 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-10] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[] 
2016-07-11 09:31:53.515 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-11] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[] 
2016-07-11 09:31:53.516 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-3] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[] 
2016-07-11 09:31:53.516 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-12] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[] 
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-8] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead. 
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-3] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead. 
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-10] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead. 
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-12] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead. 
+1

Ihr Code sieht gut aus. Also, nicht sicher, nach welcher Probe Sie suchen. Selbst wenn wir eins hinzufügen, wird es ziemlich einfach und ohne so große Konfigurationsoptionen sein. Also, oder du verbindest dich mit falschem Kafka und/oder Thema oder es gibt etwas in den Logs, das irgendwie auf einen Fehler hinweist. –

+0

Danke Gary für die Antwort. Ich sehe keine Ausnahmen im Protokoll.Auch habe ich KafkaTemplate mit ähnlicher Konfiguration versucht, und ich bin in der Lage, Nachricht in der Warteschlange zu veröffentlichen, aber für Verbraucher, kein Glück. Ich ändere Code, um String anstelle meines Nachrichtenobjekts zu verwenden. Aufgrund der Beschränkung der Länge der Kommentare habe ich dies im Hauptpost hinzugefügt. – Shailesh

+0

Sie können weitere Informationen in den Protokollen nach dem Hinzufügen dieser Kategorie verfolgen: 'org.apache.kafka.clients = DEBUG' –

Antwort

2

Die obige Dokumentation verwiesen sagt:

Obwohl Serializer/Deserializer-API ist ziemlich einfach und flexibel Aus der Sicht von Kafka Consumer und Producer ist es nicht ausreichend auf der Messaging-Ebene, wo KafkaTemplate und @KafkaListener vorhanden sind. Zur einfachen Konvertierung nach/from org.springframework.messaging.Message stellt Spring für Apache Kafka die MessageConverter-Abstraktion mit der MessagingMessageConverter-Implementierung und der StringJsonMessageConverter-Anpassung bereit.

Aber in Ihrem Fall Sie kombinieren MessageConverter:

 factory.setMessageConverter(new StringJsonMessageConverter()); 

mit benutzerdefinierten Deserializer:

 JsonDeserializer<Message> messageJsonDeserializer = new JsonDeserializer<>(Message.class); 

Simplest fix für Sie Fall StringDeserializer stattdessen verwendet werden soll:

https://kafka.apache.org/090/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html

Über die oben angegebenen Protokollmeldungen Marking the coordinator XXX dead., der Fehler bezieht sich nicht auf spring-kafka Projekt, aber bedeutet, dass das Problem mit Ihrer Kafka-Konfiguration ist. In meinem Fall hatten wir solche Probleme, wenn Kafka-Knoten für zookeper nicht erreichbar waren. Für die Problembehandlung empfehlen ich sowohl Kafka & Zookeper lokal zu installieren und stellen Sie sicher, dass die Herstellung aufwendige Arbeiten Sie darauf kafka-console-producer und kafka-console-consumer, zB:

https://www.cloudera.com/documentation/kafka/latest/topics/kafka_command_line.html

Dann, als die nächste Stufe, können Sie Überprüfen Sie Ihre Beispielanwendung spring-kafka mit der gleichen lokalen Installation.

+0

Gleichzeitig ermöglicht 'JsonDeserializer' die Verwendung von POJO für Payload und das Extrahieren von Instanzen aus Kafkas Nachrichten. – stepio

+0

Danke Stepio, ich habe versucht mit StringJsonMessageConverter zu entfernen, aber ich sehe immer noch keine Nachrichten konsumiert werden. Um die Komplexität zu reduzieren, ändere ich den Code, um ihn mit StringDesilizer statt mit einem benutzerdefinierten JSON-Deserilizer zu testen. 'code' Rückgabe neuer DefaultKafkaConsumerFactory <> (consumerConfigs(), neuer IntegerDeserializer(), neuer StringDeserializer()); 'Code' und Verbraucher wird folgendermaßen geändert: ' code' containerProps.setMessageListener ((Message ) Nachricht -> { System.out.println (message.value()); }); ' Code " – Shailesh