Ich versuche, eine kleine Spring Boot-Anwendung mit Federintegration einzurichten. Alles, was es tun muss, ist eine Nachricht aus einer JMS-Warteschlange zu ziehen, die Anfrage zurück zu einem Objekt zu entlarven und zu einer bestimmten Bean zu routen, um persistent zu bleiben. Ich habe den Routing-Teil getestet und kann bestätigen, dass es funktioniert.kämpfen mit Federintegration dsl
Ich habe in meinem Test einen eingebetteten activemq Broker, ich kann eine Nachricht über die Feder JmsTemplate senden, aber es scheint nicht zu entpacken die XML-Nutzlast und route die Nachricht. Ich kann dies im Protokoll sehen:
16:42:09.285 [main] INFO c.m.z.v.o.VitelAsyncPersisterApplicationTests - Sending message
16:42:09.289 [main] DEBUG o.s.j.c.CachingConnectionFactory - Registering cached JMS Session for mode 1: ActiveMQSession {id=ID:theblacklodge-59640-1460558526948-4:1:2,started=false} [email protected]
16:42:09.289 [main] DEBUG o.s.j.c.JmsTemplate - Executing callback on JMS Session: Cached JMS Session: ActiveMQSession {id=ID:theblacklodge-59640-1460558526948-4:1:2,started=false} [email protected]
16:42:09.295 [ActiveMQ Transport: tcp:///127.0.0.1:[email protected]] DEBUG o.a.a.b.TransportConnector - Publishing: tcp://localhost:61616 for broker transport URI: tcp://localhost:61616
16:42:09.295 [ActiveMQ Transport: tcp:///127.0.0.1:[email protected]] DEBUG o.a.a.b.TransportConnector - Publishing: tcp://localhost:61616 for broker transport URI: tcp://localhost:61616
16:42:09.295 [ActiveMQ Transport: tcp:///127.0.0.1:[email protected]] DEBUG o.a.a.b.r.AbstractRegion - test_broker adding destination: topic://ActiveMQ.Advisory.Producer.Queue.jms/test
16:42:09.298 [main] DEBUG o.s.j.c.CachingConnectionFactory - Registering cached JMS MessageProducer for destination [queue://jms/test]: ActiveMQMessageProducer { value=ID:theblacklodge-59640-1460558526948-4:1:2:1 }
16:42:09.301 [main] DEBUG o.s.j.c.JmsTemplate - Sending created message: ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = <?xml version="1.0" encoding="UTF-8" standalo...uteLogEvent>}
16:42:09.305 [ActiveMQ Transport: tcp:///127.0.0.1:[email protected]] DEBUG o.a.a.b.r.Queue - test_broker Message ID:theblacklodge-59640-1460558526948-4:1:2:1:1 sent to queue://jms/test
16:42:09.306 [ActiveMQ BrokerService[test_broker] Task-2] DEBUG o.a.a.b.r.Queue - queue://jms/test, subscriptions=1, memory=0%, size=1, pending=0 toPageIn: 1, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 1, dequeueCount: 0, memUsage:1332
16:42:09.314 [main] INFO c.m.z.v.o.VitelAsyncPersisterApplicationTests - Message sent
Frühling Integration Protokollierung:
13:43:48.996 [main] INFO o.s.i.j.JmsMessageDrivenEndpoint - started [email protected]04469
13:43:48.996 [main] INFO o.s.i.d.j.JmsInboundGateway - started org.springframework.integration.dsl.jms.JmsInboundGateway#0
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.dsl.jms.JmsInboundGateway#0'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - Adding {xml:unmarshalling-transformer} as a subscriber to the 'buildReceiverFlow.channel#0' channel
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildReceiverFlow.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#0
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - Adding {router} as a subscriber to the 'msg.router' channel
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.msg.router' has 1 subscriber(s).
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#1
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#2' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#0.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#2
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#2'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#3' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#1.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#3
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#3'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#4' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#2.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#4
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#4'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#5' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#3.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#5
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#5'
13:43:48.996 [main] INFO o.s.c.s.DefaultLifecycleProcessor - Starting beans in phase 2147483647
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.jms.config.internalJmsListenerEndpointRegistry' of type [class org.springframework.jms.config.JmsListenerEndpointRegistry]
13:43:48.997 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.jms.config.internalJmsListenerEndpointRegistry'
13:43:49.002 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.boot.context.properties.ConfigurationPropertiesBindingPostProcessor'
13:43:49.002 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.context.annotation.internalScheduledAnnotationProcessor'
13:43:49.002 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.integration.config.IdGeneratorConfigurer#0'
13:43:49.017 [main] DEBUG o.s.b.a.l.AutoConfigurationReportLoggingInitializer -
Ich bin nicht sicher, was ich weggelassen oder falsch konfiguriert:
Testfall:
@Test
public void jmsIntegrationTest() {
RouteLogEvent log = new RouteLogEvent();
log.setAgentId(8888);
log.setInteracitonId(95634);
log.setMax(5);
log.setQueueTime(1256L);
log.setRouteTime(96541L);
log.setScore(8);
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(500);
marshaller.marshal(log, new StreamResult(bytesOut));
final String xmlPayload = new String(bytesOut.toByteArray());
LOG.info("Sending message");
jmsTemplate.send(jmsQueue, (s) -> {
return s.createTextMessage(xmlPayload);
});
LOG.info("Message sent");
List<RouteLogEvent> events = testDao.findAllRouteLogs();
assertNotNull(events);
assertFalse(events.isEmpty());
List<RouteLogEvent> filtered = events.stream().filter(val -> val.getAgentId() == 8888).collect(Collectors.toList());
assertNotNull(filtered);
assertFalse(filtered.isEmpty());
}
Konfiguration der Federintegration:
@SpringBootApplication
@EnableIntegration
public class VitelAsyncPersisterApplication {
private static final Map<Class, String> ROUTING_EVENTS = new HashMap<>();
private static final String CHANNEL_RECORDING = "channel-recording";
private static final String CHANNEL_INTERACTION_STATE = "channel-interaction-state";
private static final String CHANNEL_AGENT_STATE = "channel-agent-state";
private static final String CHANNEL_ROUTE_LOG = "channel-route";
static {
ROUTING_EVENTS.put(AgentStateChangeEvent.class, CHANNEL_AGENT_STATE);
ROUTING_EVENTS.put(InteractionStateChangeEvent.class, CHANNEL_INTERACTION_STATE);
ROUTING_EVENTS.put(Recording.class, CHANNEL_RECORDING);
ROUTING_EVENTS.put(RouteLogEvent.class, CHANNEL_ROUTE_LOG);
}
@Value("${jms.queue.entity.persist}")
private String jmsQueueName;
@Value("${jms.broker.url}")
private String jmsBrokerUrl;
@Autowired
private EventDao eventDao;
@Bean
public Jaxb2Marshaller xmlMarshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setSchema(new ClassPathResource("entities.xsd"));
marshaller.setPackagesToScan("com.mhgad.za.vitel.persister.entities");
return marshaller;
}
@Bean
public ConnectionFactory jmsConnFactory() {
ActiveMQConnectionFactory activeMq = new ActiveMQConnectionFactory(jmsBrokerUrl);
CachingConnectionFactory cachingConnFactory = new CachingConnectionFactory();
cachingConnFactory.setTargetConnectionFactory(activeMq);
return cachingConnFactory;
}
@Bean
public IntegrationFlow buildReceiverFlow(ConnectionFactory jmsConnectionFactory, Jaxb2Marshaller marshaller) {
UnmarshallingTransformer xmlToObjTransformer = Transformers.unmarshaller(marshaller);
JmsInboundGatewaySpec jmsSpec = Jms.inboundGateway(jmsConnectionFactory).destination(jmsQueueName);
return IntegrationFlows.from(jmsSpec).transform(xmlToObjTransformer).channel("msg.router").get();
}
@Bean
public IntegrationFlow buildRouterFlow() {
Function router = (p) -> {
if (ROUTING_EVENTS.containsKey(p.getClass())) {
return ROUTING_EVENTS.get(p.getClass());
} else {
return null;
}
};
return IntegrationFlows.from("msg.router").route(router, m -> m
.subFlowMapping(CHANNEL_AGENT_STATE, sf -> sf.handle((p) -> eventDao.save((AgentStateChangeEvent) p.getPayload())))
.subFlowMapping(CHANNEL_INTERACTION_STATE, sf -> sf.handle((p) -> eventDao.save((InteractionStateChangeEvent) p.getPayload())))
.subFlowMapping(CHANNEL_RECORDING, sf -> sf.handle((p) -> eventDao.save((Recording) p.getPayload())))
.subFlowMapping(CHANNEL_ROUTE_LOG, sf -> sf.handle((p) -> eventDao.save((RouteLogEvent) p.getPayload())))).get();
}
public static void main(String[] args) {
SpringApplication.run(VitelAsyncPersisterApplication.class, args);
}
}
Hallo Artem, ich habe das Konfigurations-Snippet geändert. Es hatte @EnableIntegration. Ja, ich dachte, die Anfrage/Antwort war inkorrekt, aber sie kämpften darum, wo ein Poller platziert werden musste. WRT Logs Ich habe einige zusätzliche Ausgaben hinzugefügt – user3465651
Geändert zu Jms.messageDriverChannelAdapter() sowie korrigiert die Version von activemq von 5.13.2 zu 5.12.3, als ich ein ähnliches Problem in diesem Post sah -> http: // stackoverflow .com/questions/36007782/spring-configuring-embedded-brokerservice – user3465651