2016-07-18 16 views
2

Ich versuche, die Ausgabe einer SQL-Abfrage parallel zu verarbeiten. unten angegeben ist mein Code. Ich habe SYSOUT im Aggregator. Aber ich sehe zufällig, dass der SYSOUT im Aggregator nicht gedruckt wird. Die Veröffentlichungsmethode im Aggregator druckt auch keine Sysouts. Ich denke, ich verliere die Nachrichten irgendwo. Kann jemand etwas Licht abwerfen?Nachrichten verloren nach Spring Integration Splitter. Daten erreichen Aggregator nicht zufällig

<int:bridge input-channel="inputChannel" output-channel="dbRequestChannel" /> 

     <jdbc:outbound-gateway request-channel="dbRequestChannel" 
      max-rows-per-poll="0" data-source="dataSource" reply-channel="headerEnricher" 
      query="select empname, empno, empdob from employee where empno = 1234" /> 

     <int:header-enricher input-channel="headerEnricher" 
      output-channel="splitterChannel"> 
      <int:header name="payloadSize" value="3"></int:header> 
     </int:header-enricher> 

     <int:chain input-channel="splitterChannel" output-channel="splitterOutputChannel"> 
      <int:splitter /> 
     </int:chain> 


     <int:channel id="splitterOutputChannel"> 
      <int:dispatcher task-executor="sampleTaskExecutor" /> 
     </int:channel> 

     <bean id="sampleTaskExecutor" 
      class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> 
      <property name="corePoolSize" value="5" /> 
      <property name="maxPoolSize" value="10" /> 
      <property name="queueCapacity" value="25" /> 
     </bean> 

     <int:service-activator input-channel="splitterOutputChannel" 
      ref="springIntegrationtest" method="testMethod" output-channel="aggregatePayload"> 
     </int:service-activator> 

     <int:aggregator input-channel="aggregatePayload" 
      release-strategy-method="release" output-channel="nullChannel" 
      send-partial-result-on-expiry="true" ref="springIntegrationtest" 
      method="aggregateData" /> 

    @RunWith(SpringJUnit4ClassRunner.class) 
@ContextConfiguration(locations = { "classpath:spring-integration.xml" }) 
public class SpringIntegrationTest { 


    @Autowired 
    private MessageChannel inputChannel; 

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 



    @Test 
    public void testQueue() { 
     Message<String> quoteMessage = MessageBuilder 
       .withPayload("testPayload").build(); 
     inputChannel.send(quoteMessage); 
    } 


    public Map<String, String> testMethod(Message<?> m) { 

     System.out.println(sdf.format(new Date())); 
     return (Map<String, String>) m.getPayload(); 
    } 

    public boolean release(ArrayList<Map<String, Object>> payload) { 
     boolean release = false; 
     int size = payload.size(); 
     if (size == 3) { 
      release = true; 
     } 
     System.out.println(release); 
     return release; 
    } 

    public Message<String> aggregateData(ArrayList<Map<String, Object>> payload) { 

     System.out.println("In aggregateData " + payload); 
     Message<String> quoteMessage = MessageBuilder 
       .withPayload("testPayload").build(); 
     return quoteMessage; 
    } 

} 

Antwort

1

Nun, ich denke, Ihr Problem ist mit der Kombination von Zuständen und Optionen.

Sie haben dies:

int size = payload.size(); 
if (size == 3) { 
    release = true; 
} 

so, Ihr Aggregator die Gruppe gerade nach einem 3 Artikel angekommen veröffentlichen wird, können Sie mittlerweile viel mehr Einzelteile nach der Trennung haben.

Die release Signale Aggregator, um die Gruppe zu beenden und zu beenden. Standardmäßig hat es eine Option wie expireGroupsUponCompletion = false, was bedeutet behalten Gruppe im Geschäft aber mit dem completed Zustand.

Wenn Ihr Ziel nur aus Aggregator-Tupeln von 3 emittieren soll, sollten Sie erwägen, die expireGroupsUponCompletion zu true zu wechseln. Weitere Informationen finden Sie im Aggregator manual.

+0

Die Ausgabe der Abfrage gibt immer nur 3 Ergebnisse zurück. Aus diesem Grund habe ich diese Bedingung hinzugefügt. –

+0

Ok. Stellen Sie dann bitte Debug-Logs für org.springframework.integration bereit –