2016-04-03 7 views
1

Ich möchte einen BoundedExecutor machen, nur eine feste Anzahl Thread kann parallel ausgeführt werden. Wenn weitere Aufgaben hinzugefügt werden, blockiert der Executor, bis andere Threads abgeschlossen sind.der ExecutorService werfen RejectException mit SynchronousQueue

Hier ist der Executor ich in anderen Fragen gefunden.

public class BoundedExecutor extends ThreadPoolExecutor { 

    private final Logger logger = LogManager.getLogger(BoundedExecutor.class); 
    private final Semaphore semaphore; 

    public BoundedExecutor(int bound){ 
     super(bound, bound, 0, TimeUnit.SECONDS, new SynchronousQueue<>()); 
     this.semaphore = new Semaphore(bound); 
    } 

    @Override 
    public void execute(Runnable task) { 
     try { 
      semaphore.acquire(); 
      super.execute(task); 
     } catch (InterruptedException e) { 
      logger.error("interruptedException while acquiring semaphore"); 
     } 
    } 

    protected void afterExecute(final Runnable task, final Throwable t){ 
     super.afterExecute(task, t); 
     semaphore.release(); 
    } 
} 

und der Hauptcode

public static void main(String[] args) throws Exception { 

     Runnable task =() -> { 
      try { 
       Thread.sleep(1000); 
       System.out.println(Thread.currentThread().getName() + " complete."); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     }; 
     BoundedExecutor pool = new BoundedExecutor(1); 
     for(int i = 0; i < 10; i++){ 
      pool.execute(task); 
     } 
     pool.shutdown(); 
    } 

Ich dachte, der Code einen einzelnen Thread gemacht und Aufgaben ausführen sequentiell, aber eigentlich, wenn die erste Aufgabe abgeschlossen, der Vollstrecker geworfen java.util.concurrent.RejectedExecutionException .

während ich unterzeichne, wird der semaphore.acquire() den Thread blockieren, bis die erste Aufgabe abgeschlossen ist und den Semaphor freigeben, was ist mit dem Code falsch?

+0

Warum verwenden Sie einen Semaphore mit einem SynchronousQueue? Was versuchst du damit zu erreichen? –

+0

Der Semaphor wird zum Blockieren des ThreadExecutors verwendet, wenn der Pool gefüllt wird. Die synchroneQueue wird verwendet, da ich die Aufgabe nicht in die Warteschlange stellen möchte, wenn der Pool voll ist. – iceshi

+0

Also wird die Verwendung einer SynchronousQueue blockiert, wenn es keinen freien Thread gibt, um sie aufzunehmen. Was fügt der Semaphore hinzu? –

Antwort

2

würde ich den Queue-Block machen und nicht verwenden, um eine Semaphore

public static void main(String[] args) { 
    SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>() { 
     @Override 
     public boolean offer(Runnable runnable) { 
      try { 
       return super.offer(runnable, 1, TimeUnit.MINUTES); 
      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
       return false; 
      } 
     } 
    }; 
    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 1, TimeUnit.MINUTES, queue); 
    for (int i = 0; i < 10; i++) { 
     final int finalI = i; 
     pool.execute(() -> { 
      try { 
       Thread.sleep(1000); 
       System.out.println(LocalTime.now() + " - " + Thread.currentThread().getName() + " " + finalI + " complete"); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     }); 
    } 
    pool.shutdown(); 
} 

druckt

13:24:14.241 - pool-1-thread-1 0 complete 
13:24:15.247 - pool-1-thread-1 1 complete 
13:24:16.247 - pool-1-thread-1 2 complete 
13:24:17.247 - pool-1-thread-1 3 complete 
13:24:18.248 - pool-1-thread-1 4 complete 
13:24:19.248 - pool-1-thread-1 5 complete 
13:24:20.248 - pool-1-thread-1 6 complete 
13:24:21.248 - pool-1-thread-1 7 complete 
13:24:22.249 - pool-1-thread-1 8 complete 
13:24:23.249 - pool-1-thread-1 9 complete 
+0

Hinweis: Diese Lösung wird machen Die Anzahl der Worker-Threads ist nicht größer als "corePoolSize" – louxiu