2016-07-28 11 views
1

Ich versuche, nur ein Element aus einer sqs-Warteschlange zu einem bestimmten Zeitpunkt bearbeitet werden. Momentan wird nur eine einzige Nachricht von einer Warteschlange angerufen, aber dies wird immer so weitergeführt, wie es bei jeder Abfrage der Fall ist.AWS Integrationsfeder: Garantiere nur einen Artikel von sqs

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
     executor.setCorePoolSize(2); 
     executor.setMaxPoolSize(2); 
     executor.setQueueCapacity(10); 
     executor.setThreadNamePrefix("test-"); 
     executor.initialize(); 
     return executor; 

     new SqsMessageDrivenChannelAdapter(amazon)); 
     adapter.setMaxNumberOfMessages(1); 
     adapter.setSendTimeout(2000); 
     adapter.setVisibilityTimeout(1200); 
     adapter.setWaitTimeOut(20); 
     adapter.setTaskExecutor(this.asyncTaskExecutor()); 

Das Problem scheint innerhalb der ThreadPoolTaskExecutor und mein Verständnis davon zu sein. Da die Warteschlangengröße 10 ist, wird sie jedes Mal angehoben, bis diese voll ist?

die MaxPoolSize 1. Einstellung bewirkt, dass ein

Caused by: java.util.concurrent.RejectedExecutionException: Task org.springframework.cloud.aws.messaging.[email protected]406354e5 rejected from [email protected][Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0] 
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) 
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) 
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) 
    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:293) 
    ... 6 common frames omitted 

Antwort

1

Das Problem ist, dass Ihr ThreadPoolExecutor eine Blocking Größe von 10 unter Verwendung von 2 Threads haben eingestellt wird, um Nachrichten aus dieser Warteschlange zu konsumieren. So können Sie jederzeit 2 Threads gleichzeitig an Nachrichten arbeiten lassen. Wenn Sie PoolSize auf 1 setzen, können Sie garantieren, dass zu einem bestimmten Zeitpunkt nur eine Nachricht bearbeitet wird.

aus dem Quellcode:

/* 
* Proceed in 3 steps: 
* 
* 1. If fewer than corePoolSize threads are running, try to 
* start a new thread with the given command as its first 
* task. The call to addWorker atomically checks runState and 
* workerCount, and so prevents false alarms that would add 
* threads when it shouldn't, by returning false. 
* 
* 2. If a task can be successfully queued, then we still need 
* to double-check whether we should have added a thread 
* (because existing ones died since last checking) or that 
* the pool shut down since entry into this method. So we 
* recheck state and if necessary roll back the enqueuing if 
* stopped, or start a new thread if there are none. 
* 
* 3. If we cannot queue task, then we try to add a new 
* thread. If it fails, we know we are shut down or saturated 
* and so reject the task. 

Sie den dritten Fall zu treffen.

+0

Das war meine erste Idee das hat nicht funktioniert, siehe oben. @Gandalf – user101010101

+0

Will wahrscheinlich viel mehr Code, um dies herauszufinden. GitHub? – Gandalf