2010-10-01 9 views
8

Ich benutze RXTX, um Daten von einer seriellen Schnittstelle zu lesen. Der Messwert wird in einem in der folgenden Weise erzeugte Thread durchgeführt:Thread-Interrupt nicht endend blockierend Call on Input-Stream lesen

CommPortIdentifier portIdentifier = CommPortIdentifier.getPortIdentifier(port); 
CommPort comm = portIdentifier.open("Whatever", 2000); 
SerialPort serial = (SerialPort)comm; 
...settings 
Thread t = new Thread(new SerialReader(serial.getInputStream())); 
t.start(); 

Die SerialReader Klasse implementiert Runnable und nur Schlaufen auf unbestimmte Zeit, von dem Port zu lesen und die Daten in nützliche Pakete konstruieren, bevor es aus anderen Anwendungen gesendet werden. Allerdings habe ich es reduziert auf die folgende Einfachheit unten:

public void run() { 
    ReadableByteChannel byteChan = Channels.newChannel(in); //in = InputStream passed to SerialReader 
    ByteBuffer buffer = ByteBuffer.allocate(100); 
    while (true) { 
    try { 
     byteChan.read(buffer); 
    } catch (Exception e) { 
     System.out.println(e); 
    } 
    } 
} 

Wenn ein Benutzer eine Stopptaste klickt, werden die folgenden Funktionen ausgelöst, die den Eingangsstrom in der Theorie schließen und brechen aus dem Sperr byteChan.read (Puffer) Anruf. Der Code lautet wie folgt:

public void stop() { 
    t.interrupt(); 
    serial.close(); 
} 

Allerdings, wenn ich diesen Code ausführen, bekomme ich nie eine ClosedByInterruptException, die einst der Eingangsstrom schließt ausgelöst werden soll. Außerdem blockiert die Ausführung beim Aufruf von serial.close() - weil der zugrundeliegende Eingabestream beim Lesen weiterhin blockiert. Ich habe versucht, den Unterbrechungsaufruf mit byteChan.close() zu ersetzen, der dann eine AsynchronousCloseException verursachen sollte, jedoch erhalte ich die gleichen Resultate.

Jede Hilfe auf, was ich vermisse, würde sehr geschätzt werden.

Antwort

3

Die RXTX SerialInputStream (was durch die serial.getInputStream() Aufruf zurückgegeben wird) ein Timeout-Schema, das am Ende alle meine Probleme lösen. die folgende Zugabe, bevor das neue SerialReader Objekt erstellen bewirkt, dass der liest nicht mehr auf unbestimmte Zeit zu blockieren:

serial.enableReceiveTimeout(1000); 

Im SerialReader Objekt, hatte ich um ein paar Dinge zu ändern direkt aus dem Input anstatt die ReadableByteChannel Erstellen lesen Aber jetzt kann ich den Reader anhalten und neu starten, ohne dass es Probleme gibt.

+0

Das funktioniert definitiv. Vielen Dank. –

5

Sie können einen Stream, der unterbrechbare E/A nicht unterstützt, einfach in einen InterruptibleChannel umwandeln, indem Sie ihn umschließen (und ReadableByteChannel wird nicht erweitert InterruptibleChannel).

Sie müssen den Vertrag des zugrunde liegenden InputStream betrachten. Was sagt SerialPort.getInputStream() über die Unterbrechbarkeit seines Ergebnisses? Wenn es nichts sagt, sollten Sie davon ausgehen, dass es Interrupts ignoriert.

Für alle E/A, die Interrupt nicht explizit unterstützen, ist die einzige Option, den Stream aus einem anderen Thread zu schließen. Dies kann sofort einen IOException (obwohl es möglicherweise kein AsynchronousCloseException ist) in dem Thread auslösen, der bei einem Aufruf des Streams blockiert wird.

Aber auch das ist extrem abhängig von der Umsetzung der — und das zugrunde liegende Betriebssystem kann auch ein Faktor sein.


Notieren Sie sich der Quellcode kommentiert die ReadableByteChannelImpl Klasse zurückgegeben von newChannel():

private static class ReadableByteChannelImpl 
    extends AbstractInterruptibleChannel  // Not really interruptible 
    implements ReadableByteChannel 
    { 
    InputStream in; 
    ⋮ 
+0

In meinem Beispiel Channels.newChannel () ein Objekt vom Typ ReadableByteChannelImpl gibt, die wichtiger ist ReadableByteChannel (aber implementiert erstreckt AbstractInterruptibleChannel die Arbeitsgeräte Unterbrechbarer Kanal). – JDS

+0

Hoppla ... geben Sie den Kommentar ein. Wie auch immer, eine instanceof Prüfung auf ByteChan gegen InterruptibleChannel gibt true zurück. Da dies nicht klar war, wird der Aufruf von stop() in dem Thread durchgeführt, der den Lese-Loop-Thread erzeugt. – JDS

+0

@JDS - ... und doch, es funktioniert nicht, oder? Bitte sehen Sie mein Update. Der Kanal ist nicht unterbrechbar, und Sie haben möglicherweise keine praktikable Option, um einen RXTX-Lesevorgang auszuführen. – erickson

1

Ich verwende den folgenden Code zum Herunterfahren von rxtx. Ich führe Tests aus, die sie starten und herunterfahren und das scheint gut zu funktionieren.meine Leser wie folgt aussieht:

private void addPartsToQueue(final InputStream inputStream) { 
    byte[] buffer = new byte[1024]; 
    int len = -1; 
    boolean first = true; 
    // the read can throw 
    try { 
     while ((len = inputStream.read(buffer)) > -1) { 
      if (len > 0) { 
       if (first) { 
        first = false; 
        t0 = System.currentTimeMillis(); 
       } else 
        t1 = System.currentTimeMillis(); 
       final String part = new String(new String(buffer, 0, len)); 
       queue.add(part); 
       //System.out.println(part + " " + (t1 - t0)); 
      } 
      try { 
       Thread.sleep(sleep); 
      } catch (InterruptedException e) { 
       //System.out.println(Thread.currentThread().getName() + " interrupted " + e); 
       break; 
      } 
     } 
    } catch (IOException e) { 
     System.err.println(Thread.currentThread().getName() + " " + e); 
     //if(interruSystem.err.println(e); 
     e.printStackTrace(); 
    } 
    //System.out.println(Thread.currentThread().getName() + " is ending."); 
} 

dank

public void shutdown(final Device device) { 
    shutdown(serialReaderThread); 
    shutdown(messageAssemblerThread); 
    serialPort.close(); 
    if (device != null) 
     device.setSerialPort(null); 
} 

public static void shutdown(final Thread thread) { 
    if (thread != null) { 
     //System.out.println("before intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState()); 
     thread.interrupt(); 
     //System.out.println("after intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState()); 
     try { 
      Thread.sleep(100); 
     } catch (InterruptedException e) { 
      System.out.println(Thread.currentThread().getName() + " was interrupted trying to sleep after interrupting" + thread.getName() + " " + e); 
     } 
     //System.out.println("before join() on thread " + thread.getName() + ", it's state is " + thread.getState()); 
     try { 
      thread.join(); 
     } catch (InterruptedException e) { 
      System.out.println(Thread.currentThread().getName() + " join interruped"); 
     } 
     //System.out.println(Thread.currentThread().getName() + " after join() on thread " + thread.getName() + ", it's state is" + thread.getState()); 
    }