2015-06-11 10 views
9

Ich versuche, eine einfache Spring-Boot-App mit Spring-Boot zu erstellen, die Nachrichten an eine Rabbitmq Exchange/Queue und eine andere Beispiel Spring Boot-App "produzieren" diese Nachrichten "produzieren". Also habe ich zwei Apps (oder Microservices, wenn Sie möchten). 1) "Produzent" Microservice 2) "Consumer" MicroserviceSpringboot rabbitmq MappingJackson2MessageConverter benutzerdefinierte Objektkonvertierung

Der "Produzent" hat 2 Domain-Objekte. Foo und Bar, die zu json konvertiert und an rabbitmq gesendet werden sollen. Der "Consumer" sollte die JSON-Nachricht empfangen und in eine Domäne Foo bzw. Bar konvertieren. Aus irgendeinem Grund kann ich diese einfache Aufgabe nicht machen. Dazu gibt es nicht viele Beispiele. Für den Nachrichtenwandler I Hier org.springframework.messaging.converter.MappingJackson2MessageConverter

verwenden möchten ist, was ich bisher habe:

PRODUZENT Micro

package demo.producer; 

import org.springframework.amqp.core.Binding; 
import org.springframework.amqp.core.BindingBuilder; 
import org.springframework.amqp.core.Queue; 
import org.springframework.amqp.core.TopicExchange; 
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.boot.CommandLineRunner; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.context.annotation.Bean; 
import org.springframework.messaging.converter.MappingJackson2MessageConverter; 
import org.springframework.stereotype.Service; 

@SpringBootApplication 
public class ProducerApplication implements CommandLineRunner { 

    public static void main(String[] args) { 
     SpringApplication.run(ProducerApplication.class, args); 
    } 

    @Bean 
    Queue queue() { 
     return new Queue("queue", false); 
    } 

    @Bean 
    TopicExchange exchange() { 
     return new TopicExchange("exchange"); 
    } 

    @Bean 
    Binding binding(Queue queue, TopicExchange exchange) { 
     return BindingBuilder.bind(queue).to(exchange).with("queue"); 
    } 

    @Bean 
    public MappingJackson2MessageConverter jackson2Converter() { 
     MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); 
     return converter; 
    } 

    @Autowired 
    private Sender sender; 

    @Override 
    public void run(String... args) throws Exception { 
     sender.sendToRabbitmq(new Foo(), new Bar()); 
    } 
} 

@Service 
class Sender { 

    @Autowired 
    private RabbitMessagingTemplate rabbitMessagingTemplate; 
    @Autowired 
    private MappingJackson2MessageConverter mappingJackson2MessageConverter; 

    public void sendToRabbitmq(final Foo foo, final Bar bar) { 

     this.rabbitMessagingTemplate.setMessageConverter(this.mappingJackson2MessageConverter); 

     this.rabbitMessagingTemplate.convertAndSend("exchange", "queue", foo); 
     this.rabbitMessagingTemplate.convertAndSend("exchange", "queue", bar); 

    } 
} 

class Bar { 
    public int age = 33; 
} 

class Foo { 
    public String name = "gustavo"; 
} 

CONSUMER Micro

package demo.consumer; 

import org.springframework.amqp.rabbit.annotation.EnableRabbit; 
import org.springframework.amqp.rabbit.annotation.RabbitListener; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.boot.CommandLineRunner; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.stereotype.Service; 

@SpringBootApplication 
@EnableRabbit 
public class ConsumerApplication implements CommandLineRunner { 

    public static void main(String[] args) { 
     SpringApplication.run(ConsumerApplication.class, args); 
    } 

    @Autowired 
    private Receiver receiver; 

    @Override 
    public void run(String... args) throws Exception { 

    } 

} 

@Service 
class Receiver { 
    @RabbitListener(queues = "queue") 
    public void receiveMessage(Foo foo) { 
     System.out.println("Received <" + foo.name + ">"); 
    } 

    @RabbitListener(queues = "queue") 
    public void receiveMessage(Bar bar) { 
     System.out.println("Received <" + bar.age + ">"); 
    } 
} 

class Foo { 
    public String name; 
} 

class Bar { 
    public int age; 
} 

Und hier ist die Ausnahme, die ich bekomme:

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message 
Endpoint handler details: 
Method [public void demo.consumer.Receiver.receiveMessage(demo.consumer.Bar)] 
Bean [[email protected]] 
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:116) 
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:93) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:83) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:170) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1257) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1021) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1005) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:83) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1119) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message 
    ... 13 common frames omitted 
Caused by: org.springframework.messaging.converter.MessageConversionException: No converter found to convert to class demo.consumer.Bar, message=GenericMessage [payload=byte[10], headers={amqp_receivedRoutingKey=queue, amqp_receivedExchange=exchange, amqp_deliveryTag=1, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=queue, amqp_redelivered=false, id=87cf7e06-a78a-ddc1-71f5-c55066b46b11, amqp_consumerTag=amq.ctag-msWSwB4bYGWVO2diWSAHlw, contentType=application/json;charset=UTF-8, timestamp=1433989934574}] 
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:115) 
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:77) 
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:127) 
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:100) 
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:113) 
    ... 12 common frames omitted 

Die Ausnahme sagt, daß es kein Konverter ist, und das ist wahr, mein Problem ist, dass ich keine Ahnung, wie haben die MappingJackson2MessageConverter-Wandler in der Verbraucherseite setzen (bitte beachten Sie, dass ich org.springframework verwenden möchten .messaging.converter.MappingJackson2MessageConverter und nicht org.springframework.amqp.support.converter.JsonMessageConverter)

Irgendwelche Gedanken?

Nur für den Fall, können Sie dieses Beispielprojekt an der Gabelung: https://github.com/gustavoorsi/rabbitmq-consumer-receiver

+0

Werfen Sie einen Blick hier: http://stackoverflow.com/questions/29337550/using-rabbitlistener-with-jackson2jsonmessageconverter – stalet

+0

In diesem Beispiel verwendet es ** org.springframework.amqp.support.converter.Jackson2JsonMessageConverter ** (die gehört zur Abhängigkeit spring-amqp), in meinem Fall möchte ich ** org.springframework.messaging.converter.MappingJackson2MessageConverter ** verwenden (das zu spring-messaging gehört). – Gustavo

Antwort

12

Ok, ich habe endlich funktioniert.

Feder verwendet einen PayloadArgumentResolver zu extrahieren, konvertieren und die konvertierte Nachricht zu dem Methodenparameter mit @RabbitListener kommentierte eingestellt. Irgendwie müssen wir den mappingJackson2MessageConverter in dieses Objekt setzen.

Also, in der Consumer App, müssen wir implementieren RabbitListenerConfigurer.Durch das Überschreiben configureRabbitListeners (RabbitListenerEndpointRegistrar Registrar) können wir eine benutzerdefinierte DefaultMessageHandlerMethodFactory, zu dieser Fabrik haben wir den Nachrichtenwandler eingestellt, und die Fabrik wird unser PayloadArgumentResolver mit dem korrekten convert erstellen.

Hier ist ein Ausschnitt des Codes, ich habe auch die git project aktualisiert.

ConsumerApplication.java

package demo.consumer; 

import org.springframework.amqp.rabbit.annotation.EnableRabbit; 
import org.springframework.amqp.rabbit.annotation.RabbitListener; 
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; 
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.context.annotation.Bean; 
import org.springframework.messaging.converter.MappingJackson2MessageConverter; 
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; 
import org.springframework.stereotype.Service; 

@SpringBootApplication 
@EnableRabbit 
public class ConsumerApplication implements RabbitListenerConfigurer { 

    public static void main(String[] args) { 
     SpringApplication.run(ConsumerApplication.class, args); 
    } 

    @Bean 
    public MappingJackson2MessageConverter jackson2Converter() { 
     MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); 
     return converter; 
    } 

    @Bean 
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() { 
     DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); 
     factory.setMessageConverter(jackson2Converter()); 
     return factory; 
    } 

    @Override 
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { 
     registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory()); 
    } 

    @Autowired 
    private Receiver receiver; 

} 

@Service 
class Receiver { 
    @RabbitListener(queues = "queue") 
    public void receiveMessage(Foo foo) { 
     System.out.println("Received <" + foo.name + ">"); 
    } 

    @RabbitListener(queues = "queue") 
    public void receiveMessage(Bar bar) { 
     System.out.println("Received <" + bar.age + ">"); 
    } 
} 

class Foo { 
    public String name; 
} 

class Bar { 
    public int age; 
} 

Also, wenn Sie den Hersteller Micro laufen wird es zwei Nachrichten in der Warteschlange hinzuzufügen. Eines, das ein Foo-Objekt repräsentiert, und ein anderes, das ein Bar-Objekt darstellt. Durch Ausführen des Consumer-Microservice werden Sie sehen, dass beide von der jeweiligen Methode in der Empfänger Klasse verbraucht werden.


Aktualisiert Ausgabe:

Es ist ein konzeptionelles Problem von meiner Seite über Warteschlangen, denke ich. Was ich erreichen wollte, kann nicht möglich sein, indem 2 Methoden mit @RabbitListener kommentiert werden, die auf dieselbe Warteschlange verweisen. Die obige Lösung funktionierte nicht richtig. Wenn Sie an rabbitmq senden, sagen wir 6 Foo-Nachrichten und 3 Bar-Nachrichten, werden sie nicht 6 mal vom Listener mit Foo-Parameter empfangen. Es scheint, dass der Listener parallel aufgerufen wird, so dass es keine Möglichkeit gibt, zu unterscheiden, welcher Listener basierend auf dem Argumenttyp der Methode aufgerufen werden soll. Meine Lösung (und ich bin mir nicht sicher, ob das der beste Weg ist, ich bin offen für Vorschläge hier) ist, eine Warteschlange für jede Entität zu erstellen. So, jetzt habe ich queue.bar und queue.foo und @RabbitListener (Warteschlangen = "queue.foo") Wieder einmal aktualisieren, habe ich den Code aktualisiert und Sie können es heraus in mein git repository.

+1

1.5.0 (derzeit bei Meilenstein 1) unterstützt [class-level '@ RabbitListener'] (https://spring.io/blog/2015/05/08/spring-amqp-1-4-5-release-and -1-5-0-m1-verfügbar) zusammen mit @ RabbitHandler zu einzelnen Methoden, um diesen Anwendungsfall zu unterstützen. –

+0

das ist schön zu wissen. Vielen Dank ! – Gustavo

0

Lassen Sie sich dies nicht selbst getan, aber es scheint, wie Sie die entsprechenden Umsätze registrieren müssen durch eine RabbitTemplate einrichten. Sehen Sie sich Abschnitt 3.6.2 in this Spring documentation an. Ich weiß, dass es mit den AMQP-Klassen konfiguriert ist, aber wenn die Messaging-Klasse, die Sie erwähnen, kompatibel ist, gibt es keinen Grund, den Sie nicht ersetzen können. Sieht aus wie this reference erklärt, wie Sie es mit Java-Konfiguration statt XML tun können. Ich habe Rabbit nicht wirklich benutzt, also habe ich keine persönliche Erfahrung, aber ich würde gerne hören, was du herausfindest.