2016-03-18 5 views
2

Ich mag würde wissen, was der beste Weg ist, Bytes von einem Java InputStream mehrere Male und immer noch wirksam sein zu lesen, wenn der Strom sehr groß ist. Lassen Sie uns sagen, dass ich den folgenden Code haben:von einem Java-Input Lesen mit sehr großen Datenmengen mehrfach

public void handleBytes(InputStream in) { 
    doStuff1(in); 
    doStuff2(in); 
    doStuff3(in); 
} 

wo doStuff1, doStuff2 und doStuff3 alle müssen auf den gleichen Bytes arbeiten, aber andere Dinge zu tun. Auch ich gehe davon aus, dass diese Funktionen asynchron sein können.

Ich weiß, dass es zu mark und dann reset der Strom möglich ist, aber ich frage mich, ob dies der Weg zu gehen, wenn in viele Daten hat. Auch wenn ich einen Thread-Arbeiter pro doStuff-X haben möchte, kann ich nicht wirklich reset verwenden.

Sollte ich eine Kopie des Stroms für jede doStuff-X Methode? Aber ich bin mir nicht sicher, ob das für große Datenmengen effizient ist.

+0

Sie müssen weitere Informationen darüber bereitstellen, was die doStuff-Methoden tun. – Raedwald

+0

Wenn die Eingabeströme nicht zu groß sind, laden Sie die Bytes einfach in den Speicher, wenn Sie können, und führen Sie dann Ihre verschiedenen Prozesse gegen den Master-Satz von Bytes aus. – ManoDestra

Antwort

1

Wenn Sie wissen , dass die drei doStuff() Funktionen asynchron ausgeführt werden, dann könnte man mit Apache Commons IO TeeInputStream versuchen, den Inhalt des ursprünglichen Input auf eine PipedOutputStream zu kopieren, die zu einem PipedInputStream verbunden ist, der durch doStuff2 gelesen wird (). Ebenso könnten Sie einen zweiten TeeInputStream einrichten, der mit einem zweiten PipedOutputStream erstellt wurde, der mit einem zweiten PipedInputStream für doStuff3() verbunden ist.

Es gibt einige Einschränkungen für diesen Ansatz:

1) doStuff1(), doStuff2() und doStuff3() muss auf separaten Threads ausgeführt werden, sonst werden Sie die gesamte Datei zweimal Puffer während doStuff1() ausgeführt wird und bevor doStuff2() und doStuff3() ausgeführt werden. Dieser Ansatz geht davon aus, dass doStuff2() und doStuff3() Daten lesen und verarbeiten, während doStuff1() die Daten anfänglich liest.

2) doStuff1() Gebrauch überspringen() nicht die nachgelagerten Funktionen Markierung() oder Reset() als dieser Wille vermasseln (wie in der TeeInputStream javadoc

Dieser Ansatz angemessen sein sollte skizziert verwenden können. Speichereffizient, solange alle drei doStuff() - Funktionen Daten mit ungefähr derselben Rate verarbeiten können

+0

Dies scheint zu erreichen, was ich will. Aber was meinen Sie mit Ihrem ersten Punkt, wenn Sie sagen, dass die Methode annimmt, dass doStuff2 und doStuff3 Daten lesen und verarbeiten, während doStuff1 die Daten anfänglich liest. Warum ist das der Fall? – DrChess

+0

Wenn die drei Funktionen nicht asynchron (d. H. In separaten Threads) ausgeführt werden, werden doStuff2() und doStuff3() nicht ausgeführt, bis doStuff1() abgeschlossen ist. Wenn sie nicht laufen und aus dem PipedInputStream lesen, während doStuff1 läuft, dann sammeln sich die Daten in der Pipe (die im Wesentlichen im Speicher ist, obwohl sie auf die Platte geschrieben werden kann, bin ich mir nicht sicher). Da es zwei Pipes gibt, speichern Sie doppelt so viele Daten. Sie haben angegeben, dass die Daten "sehr groß" sind, also nehme ich an, dass Sie nicht zwei Kopien davon im Speicher haben wollen. –

+0

Aus diesem Grund ist es wichtig, dass die doStuff2()/doStuff3() - Funktionen ausgeführt werden, während doStuff1() ausgeführt wird. Während doStuff1() Daten zu den Pipes hinzufügt, lesen diese beiden Funktionen (doStuff2()/doStuff3()) gleichzeitig aus den Pipes (wodurch die Daten gelöscht werden). Hilft das? –

1

Sie können nur einen Inputstream einmal lesen, ohne die gesamte Eingabe Pufferung auf.

Man könnte es in den Speicher, wenn seine GB laden oder so, oder kopieren Sie sie in eine Datei und wiederholen Sie es, wenn Sie viele GBs haben. Wenn Sie die Daten in einem Thread analysieren können, können Sie sie an die anderen Threads übergeben.

+1

Kopieren in eine Datei scheint eine wirklich einfache Möglichkeit zu sein. Aber ich müsste zuerst alle meine Bytes in eine Datei schreiben, bevor ich sie in den doStuffs Methoden verarbeiten kann. – DrChess

1

Im Allgemeinen scheint dies wie eine schlechte Idee. mark wird nicht durch den Strom werden bei allen unterstützten garantiert, und auch wenn es unterstützt wird, müssen Sie einen Grenzwert festlegen, wie viele Bytes vor reset genannt wird, gelesen werden kann.

Da Sie erwähnen, dass diese dostuff s asynchron ausgeführt werden können, warum nicht startet einen Thread für jeden von ihnen und Verwendung Warteschlangen der Eingabe von dem Haupt-Thread in die drei Warteschlangen gleichzeitig zu füttern? Es bedarf einiger Synchronisation, aber auf diese Weise haben Sie keine Begrenzung für das Eingabevolumen und können die Speicherauslastung immer noch begrenzen.

1

Sie könnten PipedOutputStream und PipedInputStream übernehmen.

static class Task extends Thread{ 
    private final String taskName; 
    private final BufferedInputStream input; 
    public Task(String taskName, PipedInputStream input){ 
     this.taskName = taskName; 
     this.input = new BufferedInputStream(input); 
    } 

    public void run(){ 
     try { 
      System.out.println("Thread "+this.taskName+" Start"); 

      final byte buf[] = new byte[8]; // 8 bytes for demo 
      while(true){ 
       if(input.available() > 0){ 
        input.read(buf); 
        System.out.println(String.format("Task Name %s, read:%s", this.taskName, new String(buf))); 
       } 
       else{ 
        // TODO: Set break Condition:Ex: Check the expected read size 
        Thread.sleep(1000); 
       } 
      } 
     } catch (IOException | InterruptedException e) { 
      throw new RuntimeException(e); 
     } 
    } 
} 
public static void main(String args[]) { 
    try{ 
     final PipedInputStream input1 = new PipedInputStream(); 
     final PipedInputStream input2 = new PipedInputStream(); 
     final PipedInputStream input3 = new PipedInputStream(); 

     final Task t1 = new Task("Task1", input1); 
     final Task t2 = new Task("Task2", input2); 
     final Task t3 = new Task("Task3", input3); 
     t1.start(); 
     t2.start(); 
     t3.start(); 

     Thread.sleep(300); 

     InputStream input = null; 
     try{ 
      input = new FileInputStream("LargeInputFile.txt"); 

      final PipedOutputStream out1 = new PipedOutputStream(input1); 
      final PipedOutputStream out2 = new PipedOutputStream(input2); 
      final PipedOutputStream out3 = new PipedOutputStream(input3); 

      byte buf[] = new byte[8]; // 8 bytes for demo 
      while(true){ 

       if(input.available()>0){ 
        int size = input.read(buf); 

        if(size > 0){ 
         out1.write(buf); 
         out2.write(buf); 
         out3.write(buf); 
         out1.flush(); 
         out2.flush(); 
         out3.flush(); 
        }      
       } 
       else{ 
        System.out.println("Rread is finished!"); 
        break; 
       } 
      } 
     } 
     finally{ 
      if(input!=null){ 
       input.close(); 
      } 
     } 
     t1.join(); 
     t2.join(); 
     t3.join(); 
    } 
    catch(Exception e){ 
     e.printStackTrace(System.err); 
    } 
}