Ich bin dabei, eine einfache Kafka Consumer-Anwendung aus einem bestehenden Framework zu entfernen und fühle mich wie Feder-Cloud-Stream ist eine einfache Möglichkeit, das zu tun. Ich habe Initializr verwendet, um die App zu starten, die jetzt Spring-Boot v1.3.3 und Spring-Cloud-Stream v1.0.0-RC1 verwendet. Die Anwendung ist sehr einfach, Sie müssen lediglich eine Nachricht von Kafka auswählen, das JSON-codierte Objekt deserialisieren und es an unsere vorhandene Bibliothek weitergeben. Um zu beginnen, habe ich nur das LogSink-Beispiel verwendet, da ich schließlich nicht viel mehr tun werde (einfach das Deserialisieren und das Objekt an eine andere Methode übergeben).StringIndexOutOfBoundsException von EmbeddedHeadersMessageConverter
Es funktioniert alles: Es verbindet sich mit Kafka, empfängt die Nachricht und übergibt sie (als Byte []) an meine Senke. Allerdings EmbeddedHeadersMessageConverter eine StringIndexOutOfBoundsException protokolliert:
2016-04-11 10:06:50.287 ERROR 11464 --- [pool-1-thread-1] fkaMessageChannelBinder$ReceivingHandler : Could not convert message: 7B2267656E65726174696F6E223A3 [...]
java.lang.StringIndexOutOfBoundsException: String index out of range: 2009
at java.lang.String.checkBounds(String.java:373) ~[na:1.8.0_25]
at java.lang.String.<init>(String.java:413) ~[na:1.8.0_25]
at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.oldExtractHeaders(EmbeddedHeadersMessageConverter.java:131) ~[spring-cloud-stream-1.0.0.RC1.jar:1.0.0.RC1]
at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.extractHeaders(EmbeddedHeadersMessageConverter.java:104) ~[spring-cloud-stream-1.0.0.RC1.jar:1.0.0.RC1]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ReceivingHandler.handleRequestMessage(KafkaMessageChannelBinder.java:583) ~[spring-cloud-stream-binder-kafka-1.0.0.RC1.jar:1.0.0.RC1]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:69) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:63) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) [spring-messaging-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) [spring-messaging-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) [spring-messaging-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:43) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$AutoAcknowledgingChannelForwardingMessageListener.doOnMessage(KafkaMessageDrivenChannelAdapter.java:171) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
at org.springframework.integration.kafka.listener.AbstractDecodingMessageListener.onMessage(AbstractDecodingMessageListener.java:50) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:221) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:209) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
at reactor.core.processor.util.RingBufferSubscriberUtils.route(RingBufferSubscriberUtils.java:67) [reactor-core-2.0.7.RELEASE.jar:na]
at reactor.core.processor.RingBufferProcessor$BatchSignalProcessor.run(RingBufferProcessor.java:789) [reactor-core-2.0.7.RELEASE.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_25]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_25]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_25]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_25]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25]
https://github.com/spring-cloud/spring-cloud-stream/issues/209 scheint das Problem zu zeigen, fehlt Kafka-Header, was wahr ist, gibt es nicht. Aber die dort erwähnte Lösung ist hinzuzufügen
zu meiner Anwendung Konfiguration. Leider hat das bei mir nicht funktioniert. Auch STS tatsächlich hat Auto-Vervollständigung für die jeweiligen Eigenschaften und vorgeschlagen
spring.cloud.stream.kafka.binder.mode=raw
keiner der 2 (getrennt oder kombiniert) einen Unterschied gemacht, wird die Ausnahme noch angemeldet ist.
Ich habe Spring seit Jahren verwendet, aber dies wäre meine erste Spring-Boot/Spring-Cloud-Anwendung.
Hier ist der Anwendungscode:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
@SpringBootApplication
public class UpdateApplication {
private static Logger logger = LoggerFactory.getLogger(UpdateApplication.class);
public static void main(String[] args) {
SpringApplication.run(UpdateApplication.class, args);
}
@EnableBinding(Sink.class)
public static class UpdateHandler {
@StreamListener(Sink.INPUT)
//@ServiceActivator(inputChannel=Sink.INPUT)
public void loggerSink(Object payload) {
logger.info("Received: " + payload);
}
}
}
Ich habe versucht, beide, @ServiceActivator sowie @StreamListener Anmerkung, die in diesem Fall scheint keinen Unterschied zu machen.
sieht mein application.properties wie folgt aus:
spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.input.destination=updates
spring.cloud.stream.bindings.input.group=update-client
spring.cloud.stream.kafka.binder.brokers=brokerName
spring.cloud.stream.kafka.binder.zkNodes=zookeeperName
spring.cloud.stream.kafka.binder.mode=raw
Jede Hilfe dieses Fehlers würde geschätzt loszuwerden.
Als Randbemerkung: Da ich habe gerade angefangen mit Feder Cloud-Stream zu experimentieren habe ich
spring.cloud.stream.bindings.updates.consumer.resetOffsets=true
spring.cloud.stream.bindings.updates.consumer.startOffset=earlist
die Konfiguration zu vermeiden, neue Nachrichten neu starte jedes Mal, wenn ich zu senden, aber das hat nicht funktioniert .
Hallo - das ist die richtige Version, die RC2-Dokumentation verwendete immer noch den falschen Eigenschaftsnamen. http: //docs.springio/spring-cloud-stream/docs/aktuelle-SNAPSHOT/reference/htmlsingle/# _ consumer_properties (dh 'spring.cloud.stream.bindings.input.consumer.headerMode = roh') –
Ah, danke, @MariusBogoevici, das funktioniert. Auch hatte ich den Eindruck, dass sich in der Dokumentation auf das eigentliche Ziel bezieht (in meinem Fall "update"), während es "einlesen" sollte. Nur aus Neugier: spring.cloud.stream.bindings.input.consumer.resetOffsets = true spring.cloud.stream.bindings.input.consumer.startOffset = Earlist sollte von Anfang an gelesen werden, nein? Das funktioniert immer noch nicht, aber ich brauche es jetzt nicht wirklich. –
Das ist richtig - der Schlüssel ist der Kanalname, nicht der Wert von 'destination' - dies erlaubt Ihnen, Ihre logische Konfiguration beizubehalten, während Sie das Ziel zur Laufzeit zum Beispiel einfach überschreiben können. –