2016-05-13 9 views
3

I Federwolke Strom ist mit dem feder Cloud-Starter-stream-kafka verwenden. Ich habe meine Kanäle gebunden Themen KAFKA wie in dem application.properties folgt:Frühling Wolke [email protected] Messaging nicht zu errorChannel auf Ausnahme

spring.cloud.stream.bindings.gatewayOutput.destination=received 
spring.cloud.stream.bindings.enrichingInput.destination=received 
spring.cloud.stream.bindings.enrichingOutput.destination=enriched 
spring.cloud.stream.bindings.redeemingInput.destination=enriched 
spring.cloud.stream.bindings.redeemingOutput.destination=redeemed 
spring.cloud.stream.bindings.fulfillingInput.destination=redeemed 
spring.cloud.stream.bindings.error.destination=errors12 
spring.cloud.stream.bindings.errorInput.destination=errors12 
spring.cloud.stream.bindings.errorOutput.destination=errors12 

Ich bin nicht in der Lage, mein Programm zu bekommen eine Ausnahmemeldung an den Fehlerkanal zu erzeugen. Überraschenderweise scheint es nicht einmal zu versuchen, es zu erzeugen, obwohl ich in einem anderen Thread bin (Ich habe ein @MessagingGateway, das eine Nachricht in gatewayOutput ablegt, und dann läuft der Rest des Flusses asynchron ab). Hier ist die Definition meines ServiceActivator:

@Named 
@Configuration 
@EnableBinding(Channels.class) 
@EnableIntegration 
public class FulfillingServiceImpl extends AbstractBaseService implements 
     FulfillingService { 

    @Override 
    @Audit(value = "annotatedEvent") 
    @ServiceActivator(inputChannel = Channels.FULFILLING_INPUT, requiresReply = "false") 
    public void fulfill(TrivialRedemption redemption) throws Exception { 

     logger.error("FULFILLED!!!!!!"); 

     throw new Exception("test exception"); 

    } 
} 

Hier ist das Protokoll erzeugt (ich habe die volle Ausnahme abgeschnitten). Es gibt keine ...

  • Beschwerde über errorChannel nicht jeder Teilnehmer mit
  • Kafka Produzent Thread Logging
 
2016-05-13 12:13:14 pool-6-thread-1 DEBUG KafkaMessageChannelBinder$ReceivingHandler:115 - org.springframework.cloud[email protected]2b461688 received message: GenericMessage [payload=byte[400], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18}] - {} 
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DirectChannel:430 - preSend on channel 'fulfillingInput', message: GenericMessage [[email protected][endpoints=[[email protected]],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {} 
2016-05-13 12:13:14 pool-6-thread-1 DEBUG ServiceActivatingHandler:115 - ServiceActivator for [org.spr[email protected]64bce7ab] (fulfillingServiceImpl.fulfill.serviceActivator.handler) received message: GenericMessage [[email protected][endpoints=[[email protected]],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {} 
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationEvaluationContext' - {} 
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationConversionService' - {} 
2016-05-13 12:13:14 pool-6-thread-1 ERROR FulfillingServiceImpl$$EnhancerBySpringCGLIB$$9dad62:42 - FULFILLED!!!!!! - {} 
2016-05-13 12:13:14 pool-6-thread-1 ERROR LoggingErrorHandler:35 - Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3373691507, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=400 cap=400]), KafkaMessageMetadata [offset=17, nextOffset=18, Partition[topic='redeemed', id=0]] - {} 
... 
... 
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 
2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='enriched', id=0]@18 - {} 
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 
2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='redeemed', id=0]@18 - {} 
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 
2016-05-13 12:13:15 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='errors12', id=0]@0 - {} 
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 

EDIT: Hier ist der Inhalt meiner Kanäle Klasse:

public interface Kanäle {

public static final String GATEWAY_OUTPUT = "gatewayOutput"; 

public static final String ENRICHING_INPUT = "enrichingInput"; 
public static final String ENRICHING_OUTPUT = "enrichingOutput"; 

public static final String REDEEMING_INPUT = "redeemingInput"; 
public static final String REDEEMING_OUTPUT = "redeemingOutput"; 

public static final String FULFILLING_INPUT = "fulfillingInput"; 
public static final String FULFILLING_OUTPUT = "fulfillingOutput"; 

@Output(GATEWAY_OUTPUT) 
MessageChannel gatewayOutput(); 

@Input(ENRICHING_INPUT) 
MessageChannel enrichingInput(); 

@Output(ENRICHING_OUTPUT) 
MessageChannel enrichingOutput(); 

@Input(REDEEMING_INPUT) 
MessageChannel redeemingInput(); 

@Output(REDEEMING_OUTPUT) 
MessageChannel redeemingOutput(); 

@Input(FULFILLING_INPUT) 
MessageChannel fulfillingInput(); 

@Output(FULFILLING_OUTPUT) 
MessageChannel fulfillingOutput(); 

Antwort

0

Sie zeigen Ihre nicht Channels Klasse, aber das Bindemittel nicht weiß, dass Ihre „Fehler“ Kanäle „special“.

Das Bindemittel kann mit Wiederholung und zu routen Ausnahmen von einem unzustellbaren Thema konfiguriert werden; siehe this PR, die in der 1.0.0.RELEASE ist.

Alternativ können Sie auch eine "mid-flow" Gateway vor dem Service-Aktivator hinzufügen - denken Sie daran, wie ein "try/catch" -Block in Java:

@MessageEndpoint 
public static class GatewayInvoker { 

    @Autowired 
    private ErrorHandlingGateway gw; 

    @ServiceActivator(inputChannel = Channels.FULFILLING_INPUT) 
    public void send(Message<?> message) { 
     this.gw.send(message); 
    } 

} 

@Bean 
public GatewayInvoker gate() { 
    return new GatewayInvoker(); 
} 

@MessagingGateway(defaultRequestChannel = "toService", errorChannel = Channels.ERRORS) 
public interface ErrorHandlingGateway { 

    void send(Message<?> message); 

} 

Ändern Sie Ihre Service-Aktivator des Eingangskanal toService .

Sie müssen Ihrer Konfigurationsklasse @IntegrationComponentScan hinzufügen, damit das Framework die Schnittstelle @MessagingGateway erkennen und einen Proxy dafür erstellen kann.

EDIT

Eine weitere Alternative nur mir vorgeschlagen, wäre ein ExpressionEvaluatingAdvice in Ihrem Service-Aktivator Rat Kette hinzuzufügen.

+0

auch, folgen Sie bitte dem Fortschritt auf https://github.com/spring-cloud/spring-cloud-stream/issues/538 für eine zukünftige Lösung im Frühjahr Wolke Strom. –

+0

Danke für die umfassende Antwort, ich verdaue es immer noch. Ich habe meine Channels.class hinzugefügt und mag den Vorschlag, einen auf Ratschlägen basierenden Ansatz zu verwenden, da ich auch Interesse an einem Stateful Retry-Verhalten habe. Ich bin ein wenig verwirrt über einige Punkte Ihrer Antwort. Sie erwähnten [Die Sammelmappe kann mit einem Wiederholungsversuch konfiguriert werden und um Ausnahmen an ein Thema mit toten Buchstaben weiterzuleiten; Siehe diesen PR, der in der 1.0.0.RELEASE ist.]. Ich benutze 1.0.0.RC2, die der PR vorauszugehen scheinen. Wann wird 1.0.0.RELEASE veröffentlicht? –

+0

Es war [letzte Woche veröffentlicht] (https://spring.io/blog/2016/05/10/spring-cloud-stream-1-0-0-release-is-available). –