Ich versuche, die Ausgabe einer SQL-Abfrage parallel zu verarbeiten. unten angegeben ist mein Code. Ich habe SYSOUT im Aggregator. Aber ich sehe zufällig, dass der SYSOUT im Aggregator nicht gedruckt wird. Die Veröffentlichungsmethode im Aggregator druckt auch keine Sysouts. Ich denke, ich verliere die Nachrichten irgendwo. Kann jemand etwas Licht abwerfen?Nachrichten verloren nach Spring Integration Splitter. Daten erreichen Aggregator nicht zufällig
<int:bridge input-channel="inputChannel" output-channel="dbRequestChannel" />
<jdbc:outbound-gateway request-channel="dbRequestChannel"
max-rows-per-poll="0" data-source="dataSource" reply-channel="headerEnricher"
query="select empname, empno, empdob from employee where empno = 1234" />
<int:header-enricher input-channel="headerEnricher"
output-channel="splitterChannel">
<int:header name="payloadSize" value="3"></int:header>
</int:header-enricher>
<int:chain input-channel="splitterChannel" output-channel="splitterOutputChannel">
<int:splitter />
</int:chain>
<int:channel id="splitterOutputChannel">
<int:dispatcher task-executor="sampleTaskExecutor" />
</int:channel>
<bean id="sampleTaskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="5" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="25" />
</bean>
<int:service-activator input-channel="splitterOutputChannel"
ref="springIntegrationtest" method="testMethod" output-channel="aggregatePayload">
</int:service-activator>
<int:aggregator input-channel="aggregatePayload"
release-strategy-method="release" output-channel="nullChannel"
send-partial-result-on-expiry="true" ref="springIntegrationtest"
method="aggregateData" />
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:spring-integration.xml" })
public class SpringIntegrationTest {
@Autowired
private MessageChannel inputChannel;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Test
public void testQueue() {
Message<String> quoteMessage = MessageBuilder
.withPayload("testPayload").build();
inputChannel.send(quoteMessage);
}
public Map<String, String> testMethod(Message<?> m) {
System.out.println(sdf.format(new Date()));
return (Map<String, String>) m.getPayload();
}
public boolean release(ArrayList<Map<String, Object>> payload) {
boolean release = false;
int size = payload.size();
if (size == 3) {
release = true;
}
System.out.println(release);
return release;
}
public Message<String> aggregateData(ArrayList<Map<String, Object>> payload) {
System.out.println("In aggregateData " + payload);
Message<String> quoteMessage = MessageBuilder
.withPayload("testPayload").build();
return quoteMessage;
}
}
Die Ausgabe der Abfrage gibt immer nur 3 Ergebnisse zurück. Aus diesem Grund habe ich diese Bedingung hinzugefügt. –
Ok. Stellen Sie dann bitte Debug-Logs für org.springframework.integration bereit –