2016-04-18 18 views
1

ich bin neu zu Multi-Threading und ich versuche, das folgende einfache Programm zu schreiben:Consumer/Producer mit ExecutorService klemmt

  1. Lesen einer Datei
  2. Drucken Sie die Ausgabe
zu screenen

ich habe die folgenden Klassen erstellt: Consumer

public class Consumer implements Runnable { 
    private BlockingQueue<String> m_Queue; 

public Consumer(BlockingQueue<String> i_Queue) 
{ 
    m_Queue = i_Queue; 
} 

@Override 
public void run() 
{ 
    try 
    { 
     String referenceID1; 

     //Consuming message until exit message is received. 
     while((referenceID1 = m_Queue.take()) !="EOF") 
     { 
      System.out.println(referenceID1); 
     } 
    } 
    catch (Exception e) 
    { 
     e.printStackTrace(); 
    } 
}} 

Hersteller:

public class Producer implements Runnable { 
private BlockingQueue<String> m_Queue; 
private String m_FilePath; 

public Producer(BlockingQueue<String> i_Queue, String i_FilePath) 
{ 
    m_Queue = i_Queue; 
    m_FilePath = i_FilePath; 
} 

@Override 
public void run() 
{ 
    try (BufferedReader reader = new BufferedReader(new FileReader(m_FilePath))) 
    { 
     String line; 
     while ((line = reader.readLine()) != null) 
     { 
      m_Queue.put(line); 
      System.out.println(line + " Was added to queue."); 
     } 

     //Adding an exit message. 
     m_Queue.put("EOF"); 
     System.out.println("EOF Was added to queue."); 
    } 
    catch (IOException | InterruptedException e) 
    { 
     e.printStackTrace(); 
    } 
}} 

ProducerConsumerService

public static void main(String[] args) { 
    ExecutorService threadPool = Executors.newFixedThreadPool(5); 
    BlockingQueue<String> queue = new ArrayBlockingQueue<>(100); 

    //Start the producer; Will read the file 
    threadPool.execute(new Producer(queue, args[0])); 

    for (int i = 0; i < 4; i++) 
    { 
     System.out.println("Generating consumer " + i+1); 
     threadPool.execute(new Consumer(queue)); 
    } 

    try 
    { 
     threadPool.shutdown(); 
     System.out.println("Shutting down threads."); 

     threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 
     System.out.println("Terminated successfully."); 
    } 
    catch (InterruptedException e) 
    { 
     e.printStackTrace(); 
    } } 

Die Eingabedatei besteht aus den Nummern 1-20, wobei jede Zahl in einer neuen Zeile. Wenn ich das Programm ausführen, kann ich sehen, dass alle Zahlen gelesen werden, aber das Programm scheint zu hängen/stecken und ich sehe nicht die Meldung "Erfolgreich beendet".
Dies passiert nicht, wenn ich einen einzelnen Thread zum Lesen und einen einzelnen Thread zum Drucken auf dem Bildschirm verwenden, aber die Verwendung von 1 Thread trotzt meiner Notwendigkeit eines "Multi-Threaded" -Programms.
Meine Vermutung ist, dass ich vergessen, eine Ressource zu veröffentlichen, aber ich habe keine Ahnung warum.

+1

Bitte ersetzen Sie erhalten '(referenceID1 = m_Queue.take()) =" EOF "' mit '! (referenceID1 = m_Queue.take()). ist gleich (" EOF ")'. String-Vergleich mit '==' funktioniert sehr unwahrscheinlich. – OldCurmudgeon

+0

@OldCurmudgeon Sie haben Recht. Aber wie Nicolas Filotto vorgeschlagen hat, habe ich den EOF-Teil komplett entfernt. Vielen Dank! – ocp1000

Antwort

2

Ihr Problem ist, dass Sie take() in Ihrem Consumer verwenden, die:

Ermittelt und entfernt den Kopf dieser Warteschlange, falls erforderlich zu warten, bis ein Element zur Verfügung steht.

Und Sie testen, ob der zurückgegebene Wert EOF (nicht richtig BTW müssen Sie equals benutzen), aber Sie ihn nur einmal in die Warteschlange und Sie haben 4 Consumer so 3 Consumer noch darauf warten.

So sollten Sie poll() verwenden, anstatt wie unten:

while((referenceID1 = m_Queue.poll()) != null && !Objects.equals(referenceID1, "EOF")) 

Oder einfach, wenn Sie loswerden EOF

while((referenceID1 = m_Queue.poll()) != null) 
+0

Das hat den Trick gemacht! Ich habe auch den Teil "EOF" losgeworden. – ocp1000

+0

@ ocp1000 gute Nachrichten, ja "EOF" wird bei diesem Ansatz nicht mehr benötigt –