2016-05-22 22 views
0

Ich habe einen (Legacy) TCP-Dienst, der mehrere Prozesse hat. Jeder Prozess läuft auf demselben Host, aber auf einem anderen Port. Da es sich bei dem Dienst um einen einzelnen Thread handelt, besteht die Möglichkeit, den Durchsatz zu erhöhen, darin, dass jede Anforderung über jeden der Ports gerundet wird.Spring Integration - Load Balance über mehrere TCP Outbound-Gateways

Ich biete eine AMQP-Exposition zu dieser Legacy-Anwendung. Es ist sehr einfach - nehmen Sie eine Zeichenfolge aus der AMQP-Warteschlange, übergeben Sie sie an die Anwendung, und geben Sie die Antwortzeichenfolge an die AMQP-Antwortwarteschlange zurück.

Dies funktioniert gut an einem einzigen Port. Allerdings möchte ich die Anfragen über alle Ports verteilen.

Frühling Integration scheint nur AbstractClientConnectionFactory Implementierungen zu schaffen, die entweder direkt an einen einzelnen Host/Port-Verbindung (TcpNetClientConnectionFactory) oder einen Pool von Verbindungen zu einem einzelnen Host/Port (CachingClientConnectionFactory) aufrechtzuerhalten. Es gibt keine Poolverbindungen zwischen einem einzelnen Host und mehreren Ports.

Ich habe versucht, meine eigene AbstractClientConnectionFactory schreiben, die einen Pool von AbstractClientConnectionFactory Objekte und Round-Robins zwischen ihnen verwaltet. Ich habe jedoch mehrere Probleme mit der Übergabe der TCP-Verbindungen, wenn der Zieldienst weggeht oder das Netzwerk unterbrochen ist, die ich nicht lösen konnte.

Es gibt auch den Ansatz dieser Frage: Spring Integration 4 - configuring a LoadBalancingStrategy in Java DSL, aber die Lösung bestand darin, die Anzahl der Endpunkte fest zu codieren. In meinem Fall ist die Anzahl der Endpunkte nur zur Laufzeit bekannt und ist eine vom Benutzer konfigurierbare Einstellung.

Also, im Grunde muss ich dynamisch zur Laufzeit eine TcpOutboundGateway pro Port erstellen und irgendwie registrieren es in meinem IntegrationFlow. Ich habe folgendes versucht:

@Bean 
public IntegrationFlow xmlQueryWorkerIntegrationFlow() { 
    SimpleMessageListenerContainer inboundQueue = getMessageListenerContainer(); 

    DirectChannel rabbitReplyChannel = MessageChannels.direct().get(); 

    IntegrationFlowBuilder builder = IntegrationFlows 
      .from(Amqp.inboundGateway(inboundQueue) 
         .replyChannel(rabbitReplyChannel))  
      /* SOMEHOW DO THE ROUND ROBIN HERE */ 
      //I have tried: 
      .channel(handlerChannel()) //doesnt work, the gateways dont get started and the message doesnt get sent to the gateway 
      //and I have also tried: 
      .handle(gateway1) 
      .handle(gateway2) //doesnt work, it chains the handlers instead of round-robining between them 
      // 
      .transform(new ObjectToStringTransformer()) 
      .channel(rabbitReplyChannel); 

    return builder.get(); 

} 

@Bean 
//my attempt at dynamically adding handlers to the same channel and load balancing between them 
public DirectChannel handlerChannel() { 
    DirectChannel channel = MessageChannels.direct().loadBalancer(new RoundRobinLoadBalancingStrategy()).get(); 
    for (AbstractClientConnectionFactory factory : generateConnections()) { 
     channel.subscribe(generateTcpOutboundGateway(factory)); 
    } 
    return channel; 
} 

Weiß jemand, wie ich dieses Problem lösen kann?

Antwort

0

Siehe dynamic ftp sample - im Wesentlichen geht jedes Outbound-Gateway in einen eigenen Anwendungskontext und der dynamische Router routet zu dem entsprechenden Kanal (für den der Outbound-Adapter bei Bedarf erstellt wird, falls erforderlich).

Obwohl das Beispiel XML verwendet, können Sie das Gleiche mit Java-Konfiguration oder sogar mit dem Java-DSL tun.

Siehe meine Antwort auf eine similar question for multiple IMAP mail adapters using Java configuration und dann eine follow-up question.