2016-04-01 7 views
1

Ich möchte Retry-Funktionalität implementieren (wenn es ein Problem in den Verbraucher) für 3 mal und dann sollte die Nachricht in eine andere Warteschlange (Dead Brief Exchange) gehen. Ich habe die Warteschlange/exchange konfiguriert, wie untenBeratung Kette in RabbitMQ Frühling funktioniert nicht

regelmäßige Nachrichtenaustausch Name: test_exchange Message Queue: test_dlq_exchange tot Message Queue: test_queue test_queue binden mit Routing-Schlüssel test_queue

toten Buchstaben Austausch Namen test_exchange test_dlq_queue test_dlq_queue binden an test_dlq_exchange mit Routing-Schlüssel test_dlq_queue

In der RabbitMQ UI Konsole ich habe die "x-dead-letter-Austausch" als "test_exchange" konfiguriert

Unten ist der Code für SimpleMessageListenerContainer

@Bean 
    SimpleMessageListenerContainer getMessageListenerContainer(){ 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
    container.setConnectionFactory(connectionFactory); 
    container.setQueueNames("test_queue"); 

    MessageListenerAdapter adapter = new MessageListenerAdapter(); 
    //configured my message listener class 
    //configured Jackson2JsonMessageConverter as converter 
    container.setMessageListener(adapter); 
    container.setAdviceChain(new Advice[] {retryAdvice()} 
    return container; 
    } 

    //configuration for retryAdvice 

    @Bean 
    public MethodInterceptor retryAdvice{ 

     ExponentialBackOffPolicy backoffPolicy = new ExponentialBackOffPolicy(); 
     backoffPolicy.setInitialInterval(10); 
     backoffPolicy.setMaxInterval(1000); 
     backoffPolicy.setMultiplier(2); 
     RabbitTemplate retryTemplate = new RabbitTemplate(connectionFactory()); 
     retryTemplate.setQueue("test_dl_queue"); 
     return RetryInterceptorBuilder 
        .stateful() 
        .backOffPolicy(backoffPolicy) 
        .maxAttempts(3) 
        .recoverer(
         new RepublishMessageRecoverer 
         (retryTemplate,"test_dl_exchange","test_queue")).build(); 
    } 

Zuhörer Meine benutzerdefinierte Nachricht Nachricht POJO angetrieben wird, die nur meine MessageObject hat.

Da ich stateful verwende, habe ich createMessageIds (true) aktiviert. In meinem Nachrichten-Listener rufe ich die Methode des Zielobjekts erneut auf. Nach dem Start des Containers geht der Ablauf zyklisch

dh Veröffentlichen Sie die Nachricht in der Warteschlange -> Message Listener invoke ist die Zielmethode basierend auf einer Ausnahme -> wieder die Nachricht in die Warteschlange -> message listener invoke Zielmethode ... etc. Es wird die Nachricht nicht in den Dead-Letter-Austausch/die Warteschleife und die endlose Schleife geschoben.

Im Protokoll I wie unten zu sehen

 o.s.r.i.StatefulRetryOperationsInterceptor - Executing proxied method in stateful retry public abstract void org.springframework.amqp.rabbitlistener.SimpleMessageListenerContainer$ContainerDelegate.invokeListener(Channel,Message) throws java.lang.Exception(55435ee) 

Kann jemand mir helfen, dieses Problem zu beheben?

Antwort

0

Da Sie die RepublishMessageRecoverer verwenden, tut das Konfigurieren eines DLX/DLQ auf dem Broker nichts. Um zur DLQ zu routen, benötigen Sie eine RejectAndDontRequeueRecover - wenn die Versuche erschöpft sind, wird die Nachricht an die DLX/DLQ gesendet.

Es sieht aus wie Sie zu derselben Warteschlange

(retryTemplate,"test_dl_exchange","test_queue")) 

Daraus ergibt sich die Endlos-Schleife werden die Neuveröffentlichung.

+0

Danke Gary. Es funktioniert gut, nachdem ich den RejectAndDontRequeueRecoverer aufgenommen habe. Nach dem Erreichen der maximalen Wiederholungsversuche verschiebt es nun die Nachricht zu DLQ. – Raja