2016-07-05 11 views
0

Wenn ip einen JSON-Typ enthält,
Unter der aktuellen Verbindung zu factory.getConnectionIds() finden Sie die entsprechende IP. Setzen Sie dann die Kopfzeile, um die Logik während der Entwicklung zu senden.Wie sende ich eine einzelne Nachricht an mehrere IP?

Durch factory.getConnectionIds() gefunden die IP-Liste, die derzeit verbunden ist, habe ich eine Kopfzeile eingerichtet. aber unable to find outbound socket Fehler ist aufgetreten.

Was ist die Ursache?

Integration config ...

@Bean 
public TcpReceivingChannelAdapter sslAdapter() { 
    TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter(); 
    adapter.setConnectionFactory(sslServerFactory()); 
    adapter.setOutputChannel(inputWithSSL()); 

    return adapter; 
} 

@Bean 
public TcpSendingMessageHandler sslHandler() { 
    TcpSendingMessageHandler handler = new TcpSendingMessageHandler(); 
    handler.setConnectionFactory(sslServerFactory()); 

    return handler; 
} 

@Bean 
public AbstractConnectionFactory sslServerFactory() { 
    int port = Integer.parseInt(inboundPort); 
    TcpNioServerConnectionFactory factory = new TcpNioServerConnectionFactory(port); 
    factory.setBacklog(BACKLOG); 
    factory.setTaskExecutor(taskSchedulerWithSSL()); 
    factory.setLookupHost(false); 

    factory.setSerializer(echoSerializer); 
    factory.setDeserializer(echoSerializer); 

    factory.setTcpNioConnectionSupport(tcpNioSSLConnectionSupport()); 

    // Nagle's algorithm disabled 
    factory.setSoTcpNoDelay(true); 

    return factory; 
} 

@Bean 
public IntegrationFlow flowForReceiveSslMessage() { 
    return IntegrationFlows 
      .from(sslAdapter) 
      .<byte[], Boolean>route(
        p -> (short) 0 == ByteBuffer.wrap(p, 0, BYTE_LENGTH_OF_SHORT).getShort(), 
        m -> m.channelMapping(TRUE, INPUT_WITH_SSL_JSON) 
          .channelMapping(FALSE, INPUT_WITH_SSL_ECHO)).get(); 
} 

@Bean 
public IntegrationFlow flowForExtractingSslJson() { 
    return IntegrationFlows 
      .from(inputWithSslJson()) 
      .handle(INBOUND_SERVICE, EXTRACT_PAYLOAD_AS_JSON) 
      .<Map<String, Object>, String>route(
        p -> (String) p.get(REQUEST), 
        m -> m.channelMapping(LOGIN, INPUT_WITH_SSL_LOGIN) 
          .channelMapping(LOGOUT, INPUT_WITH_SSL_LOGOUT) 
          .channelMapping(POLICY, INPUT_WITH_SSL_POLICY) 
          .channelMapping(PUSH_TARGET, INPUT_WITH_SSL_PUSH_TARGET).get(); 
} 

@Bean 
public IntegrationFlow flowForHandlingSslNotifyPolicyUpdate() { 
    return IntegrationFlows.from(inputWithSslPushTarget()).handle(POLICY_SERVICE, RESPONSE_POLICY_UPDATE) 
      .split(POLICY_SERVICE, SPLIT_MESSAGES) 
      .channel(outputWithSslJsonBytesToClient()).get(); 
} 

@Bean 
public IntegrationFlow flowForConvertingSslJsonToBytesAndSendClient() { 
    return IntegrationFlows.from(outputWithSslJsonBytesToClient()) 
      .transform(new ObjectToJsonTransformer()) 
      .handle(INBOUND_SERVICE, ATTACH_HEADER_BY_STRING).handle(sslHandler).get(); 
} 

@Bean 
public MessageChannel outputWithSsl() { 
    return MessageChannels.queue(POOL_SIZE).get(); 
} 

@Bean 
public MessageChannel inputWithSslJson() { 
    return MessageChannels.queue(POOL_SIZE).get(); 
} 

@Bean 
public MessageChannel inputWithSslPushTarget() { 
    return MessageChannels.queue(POOL_SIZE).get(); 
} 

@Bean 
public MessageChannel outputWithSslJsonBytesToClient() { 
    return MessageChannels.queue(POOL_SIZE).get(); 
} 

RESPONSE_POLICY_UPDATE und SPLIT_MESSAGES ist ...

@Override 
public Object responsePolicyUpdate(Object payload) throws Exception { 
    log.debug("notify policy update debug : {}", payload); 
    Map<String, Object> params = initParam(payload); 
    Map<String, Object> result = initResult(params); 
    result.put(RESPONSE, PUSH_TARGET); 
    result.put(RESULT, SUCCESS); 
    result.put(REASON, 0); 

    return result; 
} 

@Splitter 
@Override 
@SuppressWarnings("unchecked") 
public List<Message<String>> splitMessages(Object payload) throws Exception { 
    log.debug("split messages debug : {}", payload); 
    Map<String, Object> params = initParam(payload); 
    List<String> pushTargetList = (List<String>) params.get(PUSH_TARGET_LIST); // pushTargetList is ip list. 

    List<Message<String>> messageList = new ArrayList<Message<String>>(); 
    String[] conArray = new String[4]; 
    List<String> sslConnectionIds = sslServerFactory.getOpenConnectionIds(); 
    int sslPort = sslServerFactory.getPort(); 
    for (String con : sslConnectionIds) { 
     log.debug("## con ip : {}", con); 
     conArray = con.split(":"); 
     for (String pushTargetIP : pushTargetList) { 
      if (conArray[0].equals(pushTargetIP)) { 
       Message<String> message = MessageBuilder.withPayload(params.toString()) 
             .setHeader("ip_connectionId", con).build(); 
       messageList.add(message); 
       break; 
      } 
     } 
    } 

    return messageList; 
} 

Debug-Protokoll ist ...
Die erste Zeile ist die aktuelle Verbindungsliste.

2016-07-05 14:30:14.664 DEBUG 56092 --- [sk-scheduler-10] c.m.j.policy.service.PolicyServiceImpl : ## con ip : 192.168.3.57:62370:5443:cdeb011d-91f5-46c4-abc9-b68ba13624b3 

2016-07-05 14:30:14.672 DEBUG 56092 --- [ask-scheduler-1] o.s.i.ip.tcp.TcpSendingMessageHandler : plainHandler received message: GenericMessage [payload=byte[246], headers={sequenceNumber=1, json__TypeId__=class java.lang.String, sequenceSize=1, ip_connectionId=192.168.3.57:62370:5443:cdeb011d-91f5-46c4-abc9-b68ba13624b3, correlationId=fae71250-bf47-3f64-6ad3-1ce22ef69464, id=c6c097f0-9efb-f0a5-4240-924e06879b7f, contentType=application/json, timestamp=1467696614672}] 
2016-07-05 14:30:14.672 ERROR 56092 --- [ask-scheduler-1] o.s.i.ip.tcp.TcpSendingMessageHandler : Unable to find outbound socket for GenericMessage [payload=byte[246], headers={sequenceNumber=1, json__TypeId__=class java.lang.String, sequenceSize=1, ip_connectionId=192.168.3.57:62370:5443:cdeb011d-91f5-46c4-abc9-b68ba13624b3, correlationId=fae71250-bf47-3f64-6ad3-1ce22ef69464, id=c6c097f0-9efb-f0a5-4240-924e06879b7f, contentType=application/json, timestamp=1467696614672}] 
2016-07-05 14:30:14.673 DEBUG 56092 --- [ask-scheduler-1] o.s.i.channel.PublishSubscribeChannel : preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: Unable to find outbound socket, headers={id=273f4477-52cf-645b-d157-e22dc7cc781a, timestamp=1467696614673}] 
2016-07-05 14:30:14.673 DEBUG 56092 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : (inner bean)#6dc2279c received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: Unable to find outbound socket, headers={id=273f4477-52cf-645b-d157-e22dc7cc781a, timestamp=1467696614673}] 
2016-07-05 14:30:14.675 ERROR 56092 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: Unable to find outbound socket 
at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:113) 
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) 
at 
... 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
+0

Senden oder empfangen Sie TCP-Pakete mit diesem Programm? – aksappy

+0

Ja, natürlich Es ist normal, Anfrage und Antwort auf die gleiche IP zu erhalten. – kurochi

+0

Es sollte gut funktionieren - Sie müssen in einem Debugger ausführen und überprüfen Sie den Inhalt des 'TcpSendingMessageHandler's' Verbindungen'-Feld. –

Antwort

0

Dank Gary.
Wie Sie sagen, tritt ein Fehler in der Methode handleMessageInternalTcpSendingMessageHandler Klasse.

Get connectionId ist ...

2016-07-06 10:04:28.704 DEBUG 30144 --- [ask-scheduler-4] c.m.j.policy.service.PolicyServiceImpl : ## con ip : 192.168.3.57:53759:5443:bf93680b-13fe-401b-a1eb-5545917f404a 

connectionId nicht null ist. Aber das Ergebnis von connections.get (connectionId) ist null.
Dies sollte nicht verstanden werden.

Dies ist die TcpSendingMessageHandler Klasse ...

/** 
* Writes the message payload to the underlying socket, using the specified 
* message format. 
* @see org.springframework.messaging.MessageHandler#handleMessage(org.springframework.messaging.Message) 
*/ 
@Override 
public void handleMessageInternal(final Message<?> message) throws 
     MessageHandlingException { 
    if (this.serverConnectionFactory != null) { 
     // We don't own the connection, we are asynchronously replying 
     Object connectionId = message.getHeaders().get(IpHeaders.CONNECTION_ID); 
     TcpConnection connection = null; 
     if (connectionId != null) { 
      connection = connections.get(connectionId); 
     } 
     if (connection != null) { 
      try { 
       connection.send(message); 
      } 
      catch (Exception e) { 
       logger.error("Error sending message", e); 
       connection.close(); 
       if (e instanceof MessageHandlingException) { 
        throw (MessageHandlingException) e; 
       } 
       else { 
        throw new MessageHandlingException(message, "Error sending message", e); 
       } 
      } 
     } 
     else { 
      logger.error("Unable to find outbound socket for " + message); 
      throw new MessageHandlingException(message, "Unable to find outbound socket"); 
     } 
     return; 
    } 
    else { 
     // we own the connection 
     try { 
      doWrite(message); 
     } 
     catch (MessageHandlingException e) { 
      // retry - socket may have closed 
      if (e.getCause() instanceof IOException) { 
       if (logger.isDebugEnabled()) { 
        logger.debug("Fail on first write attempt", e); 
       } 
       doWrite(message); 
      } 
      else { 
       throw e; 
      } 
     } 
    } 
} 
0

Dies ist Message<String> Liste ...
Der Wert von factory.getOpenConnectionIds() Verfahren erhalten in den ip_connectionId zu bekommen.
Warum nicht eine Outboud-Buchse finden?

GenericMessage [payload={result=success, reason=0, response=pushTarget}, headers={ip_connectionId=192.168.3.57:58187:5443:37702eaf-0bbc-44a1-8763-65e841a2f480, id=a1b80cc4-3f56-1b80-9c59-57be98b1031e, timestamp=1467783978378}] 
GenericMessage [payload={result=success, reason=0, response=pushTarget}, headers={ip_connectionId=192.168.3.40:53161:5443:693c394c-d3dd-42a3-95ce-692a39a8b603, id=bb49ea99-5e3b-eccf-df3b-7ce03b4bbf73, timestamp=1467783978378}]