Ich möchte Retry-Funktionalität implementieren (wenn es ein Problem in den Verbraucher) für 3 mal und dann sollte die Nachricht in eine andere Warteschlange (Dead Brief Exchange) gehen. Ich habe die Warteschlange/exchange konfiguriert, wie untenBeratung Kette in RabbitMQ Frühling funktioniert nicht
regelmäßige Nachrichtenaustausch Name: test_exchange Message Queue: test_dlq_exchange tot Message Queue: test_queue test_queue binden mit Routing-Schlüssel test_queue
toten Buchstaben Austausch Namen test_exchange test_dlq_queue test_dlq_queue binden an test_dlq_exchange mit Routing-Schlüssel test_dlq_queue
In der RabbitMQ UI Konsole ich habe die "x-dead-letter-Austausch" als "test_exchange" konfiguriert
Unten ist der Code für SimpleMessageListenerContainer
@Bean
SimpleMessageListenerContainer getMessageListenerContainer(){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("test_queue");
MessageListenerAdapter adapter = new MessageListenerAdapter();
//configured my message listener class
//configured Jackson2JsonMessageConverter as converter
container.setMessageListener(adapter);
container.setAdviceChain(new Advice[] {retryAdvice()}
return container;
}
//configuration for retryAdvice
@Bean
public MethodInterceptor retryAdvice{
ExponentialBackOffPolicy backoffPolicy = new ExponentialBackOffPolicy();
backoffPolicy.setInitialInterval(10);
backoffPolicy.setMaxInterval(1000);
backoffPolicy.setMultiplier(2);
RabbitTemplate retryTemplate = new RabbitTemplate(connectionFactory());
retryTemplate.setQueue("test_dl_queue");
return RetryInterceptorBuilder
.stateful()
.backOffPolicy(backoffPolicy)
.maxAttempts(3)
.recoverer(
new RepublishMessageRecoverer
(retryTemplate,"test_dl_exchange","test_queue")).build();
}
Zuhörer Meine benutzerdefinierte Nachricht Nachricht POJO angetrieben wird, die nur meine MessageObject hat.
Da ich stateful verwende, habe ich createMessageIds (true) aktiviert. In meinem Nachrichten-Listener rufe ich die Methode des Zielobjekts erneut auf. Nach dem Start des Containers geht der Ablauf zyklisch
dh Veröffentlichen Sie die Nachricht in der Warteschlange -> Message Listener invoke ist die Zielmethode basierend auf einer Ausnahme -> wieder die Nachricht in die Warteschlange -> message listener invoke Zielmethode ... etc. Es wird die Nachricht nicht in den Dead-Letter-Austausch/die Warteschleife und die endlose Schleife geschoben.
Im Protokoll I wie unten zu sehen
o.s.r.i.StatefulRetryOperationsInterceptor - Executing proxied method in stateful retry public abstract void org.springframework.amqp.rabbitlistener.SimpleMessageListenerContainer$ContainerDelegate.invokeListener(Channel,Message) throws java.lang.Exception(55435ee)
Kann jemand mir helfen, dieses Problem zu beheben?
Danke Gary. Es funktioniert gut, nachdem ich den RejectAndDontRequeueRecoverer aufgenommen habe. Nach dem Erreichen der maximalen Wiederholungsversuche verschiebt es nun die Nachricht zu DLQ. – Raja