2016-03-22 12 views
1

Ich bin ein neuerer Frühling amqp. Wenn Config Frühling amqp Publisher Bestätigt und zurück, traf Probleme.wie man den frühling amqp verleger bestätigt und zurückgibt?

AMQP config:

SimpleMessageListenerContainer container(CachingConnectionFactory connectionFactory, @Qualifier("topicListenerAdapter")MessageListenerAdapter listenerAdapter) { 

    connectionFactory.setChannelCacheSize(5); 
    connectionFactory.setPublisherConfirms(true); 
    connectionFactory.setPublisherReturns(true); 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
    container.setConnectionFactory(connectionFactory); 
    container.setQueueNames("request.queue","reply.queue"); 
    container.setMessageConverter(json2MessageConverter()); 
    container.setReceiveTimeout(3000); 
    container.setMessageListener(listenerAdapter); 

    return container; 
} 

Nachrichtens:

rabbitTemplate.convertAndSend("spring-boots5", message); 
     rabbitTemplate.setConfirmCallback(new ConfirmCallback(){ 

      @Override 
      public void confirm(CorrelationData correlationData, boolean ack, String cause) { 
       // TODO Auto-generated method stub 

       System.out.println("confirm correlationData is : "+correlationData+"ack is : "+ 
       ack); 
      } 

     }); 
     rabbitTemplate.setMandatory(true); 

Bei der Ausführung dieser Anwendung AMQP-Nachricht empfangen:

Body:'This is my first message'MessageProperties [headers={bar=baz}, timestamp=null, messageId=123456, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=request.queue, deliveryTag=1, messageCount=0]) > 

en Fehler erfüllt:

22 20:48:08.661[0;39m [31mERROR[0;39m [35m37792[0;39m [2m---[0;39m [2m[ 127.0.0.1:5672][0;39m [36mo.s.a.r.s.PublisherCallbackChannelImpl [0;39m [2m:[0;39m No listener for seq:1 

Und nicht erwarten Zeichenfolge auf der Konsole:

"bestätigen correlationData ist: "+ correlationData +" ack ist:" + ack

und nicht wissen, wie Antworten Messaging config (i verwenden java config)

Antwort

3

Sie müssen die ConfirmCallback (und ReturnCallback) vor dem Senden der Nachricht festlegen.

Sie müssen auch einige Korrelationsdaten zum Senden bereitstellen, damit Sie feststellen können, für welche ausgehende Nachricht die Bestätigung gilt.

Siehe this test case ...

@Test 
public void testPublisherConfirmWithSendAndReceive() throws Exception { 
    final CountDownLatch latch = new CountDownLatch(1); 
    final AtomicReference<CorrelationData> confirmCD = new AtomicReference<CorrelationData>(); 
    templateWithConfirmsEnabled.setConfirmCallback(new ConfirmCallback() { 

     @Override 
     public void confirm(CorrelationData correlationData, boolean ack, String cause) { 
      confirmCD.set(correlationData); 
      latch.countDown(); 
     } 
    }); 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactoryWithConfirmsEnabled); 
    container.setQueueNames(ROUTE); 
    container.setMessageListener(new MessageListenerAdapter(new Object() { 

     @SuppressWarnings("unused") 
     public String handleMessage(String in) { 
      return in.toUpperCase(); 
     } 
    })); 
    container.start(); 
    CorrelationData correlationData = new CorrelationData("abc"); 
    String result = (String) this.templateWithConfirmsEnabled.convertSendAndReceive(ROUTE, (Object) "message", correlationData); 
    container.stop(); 
    assertEquals("MESSAGE", result); 
    assertTrue(latch.await(10, TimeUnit.SECONDS)); 
    assertEquals(correlationData, confirmCD.get()); 
} 
+0

Ich habe eingestellt 'ConfirmCallback (und ReturnCallback)' Position ** Nachricht senden ** vor. Weil ich ein neuerer bin, weiß ich nicht, wie man sie einstellt (Korrelationsdaten, Antwortwarteschlange), einige Beispiele? Vielen Dank! –

+0

Siehe [dieser Testfall] (https://github.com/spring-projects/spring-amqp/blob/master/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePublisherCallbacksIntegrationTests. java # L205) - Ich habe es in meine Antwort kopiert. –

+0

Das 'rabbitTemplate.convertSendAndReceive' Ergebnis ist korrekt. Aber 'confirmCD.get() ist: null' und en Fehler' [127.0.0.1:5672] [osarsPublisherCallbackChannelImpl: Kein Listener für seq: 1' nicht den Grund kennen ... –