2015-05-12 9 views
5

Gibt es eine Möglichkeit, ein Stream<String> stream aus einem empfangen BufferedReader reader derart, daß jede Zeichenkette in stream eine Zeile von reader mit der zusätzlichen Bedingung darstellt, daß stream direkt bereitgestellt wird (vor reader alles lesen)? Ich möchte die Daten von stream parallel zu erhalten, um sie von reader zu erhalten, um Zeit zu sparen.Convert `BufferedReader` auf` `Strom <String> in einer parallelen Art und Weise

Edit: Ich möchte die Daten parallel zum Lesen verarbeiten. Ich möchte verschiedene Linien nicht parallel bearbeiten. Sie sollten in der Reihenfolge verarbeitet werden.

Lassen Sie uns ein Beispiel machen, wie ich Zeit sparen möchte. Nehmen wir an, unser reader wird uns 100 Zeilen präsentieren. Es dauert 2 ms, um eine Zeile zu lesen und 1 ms, um sie zu verarbeiten. Wenn ich zuerst alle Zeilen lese und sie dann verarbeite, brauche ich 300 ms. Was ich machen möchte ist: Sobald eine Zeile gelesen wird, möchte ich sie verarbeiten und parallel die nächste Zeile lesen. Die Gesamtzeit beträgt dann 201 ms.

Was mir an BufferedReader.lines() nicht gefällt: Soweit ich verstanden habe, fängt das Lesen an, wenn ich die Zeilen bearbeiten will. Nehmen wir an, ich habe bereits meine reader, muss aber Vorberechnungen machen, bevor ich die erste Zeile verarbeiten kann. Nehmen wir an, sie kosten 30 ms. Im obigen Beispiel wäre die Gesamtzeit dann 231 ms oder 301 ms unter Verwendung von reader.lines() (können Sie mir sagen, welche dieser Zeiten korrekt ist?). Aber es wäre möglich, die Arbeit in 201 ms zu erledigen, da die Vorberechnungen parallel zum Lesen der ersten 15 Zeilen durchgeführt werden können.

+1

Marko Topolnik hat einen Spliterator Wrapper geschrieben, mit dem Sie die Batchgröße variieren können: http://stackoverflow.com/a/22575506/1441122 –

Antwort

6

Sie können reader.lines().parallel() verwenden. Auf diese Weise wird Ihre Eingabe in Chunks aufgeteilt und weitere Stream-Operationen werden parallel an Chunks durchgeführt. Wenn weitere Operationen viel Zeit benötigen, können Sie eine Leistungsverbesserung erzielen.

In Ihrem Fall wird Standard-Heuristik nicht funktionieren, wie Sie wollen, und ich denke, es gibt keine fertige Lösung, die Ihnen die Verwendung von Ein-Zeilen-Chargen ermöglichen wird. Sie können einen benutzerdefinierten Spliterator schreiben, der sich nach jeder Zeile aufteilt. Schauen Sie sich java.util.Spliterators.AbstractSpliterator Implementierung an. Wahrscheinlich wäre die einfachste Lösung, etwas ähnliches zu schreiben, aber die Chargengrößen auf ein Element in trySplit zu begrenzen und eine einzelne Zeile in tryAdvance Methode zu lesen.

2

Um zu tun, was Sie wollen, hätten Sie normalerweise einen Thread, der Zeilen liest und sie zu einer blockierenden Warteschlange hinzufügt, und einen zweiten Thread, der Zeilen von dieser blockierenden Warteschlange erhält und sie verarbeitet.

+0

Ich hoffte mit dem Stream-Konzept, dass ich keine solchen Thread-Sachen mehr schreiben müsste. –

2

Sie suchen nach dem falschen Ort. Sie denken, dass ein Strom von Zeilen Zeilen aus der Datei lesen wird, aber so funktioniert es nicht. Sie können dem zugrunde liegenden System nicht sagen, dass es eine Zeile lesen soll, da niemand weiß, was eine Zeile vor dem Lesen ist.

Ein BufferedReader hat seinen Namen, weil es Zeichenpuffer ist. Dieser Puffer hat eine Standardkapazität von 8192. Immer wenn eine neue Zeile angefordert wird, wird der Puffer für eine neue Zeilensequenz analysiert und der Teil wird zurückgegeben. Wenn der Puffer nicht genügend Zeichen enthält, um eine aktuelle Zeile zu finden, wird der gesamte Puffer gefüllt.

Jetzt kann das Füllen des Puffers zu Anforderungen führen, Bytes aus dem zugrunde liegenden InputStream zu lesen, um den Puffer Zeichendecoder zu füllen. Wie viele Bytes angefordert werden und wie viele Bytes tatsächlich gelesen werden, hängt von der Puffergröße des Zeichendecoders ab, davon, wie viele Bytes der tatsächlichen Codierung zu einem Zeichen gehören und ob der zugrunde liegende einen eigenen Puffer hat und wie groß er ist .

Die eigentliche teure Operation ist das Lesen von Bytes aus dem zugrunde liegenden Stream und es gibt keine triviale Zuordnung von Zeilenleseanforderungen zu diesen Leseoperationen. Das Anfordern der ersten Zeile kann zum Lesen führen, zum Beispiel einen 16 KiB Chunk von der zugrundeliegenden Datei, und die nachfolgenden einhundert Anfragen könnten von dem gefüllten Puffer geliefert werden und überhaupt keine I/O verursachen. Und nichts, was Sie mit der API Stream tun, kann daran etwas ändern. Das einzige, was Sie parallelisieren würden, ist die Suche nach neuen Zeilenzeichen im Puffer, die zu trivial sind, um von der parallelen Ausführung zu profitieren.

Sie könnten die Puffergrößen aller beteiligten Parteien reduzieren, um ungefähr das beabsichtigte parallele Lesen einer Zeile zu erhalten, während die vorherige Zeile verarbeitet wird. Die parallele Ausführung wird jedoch die durch die kleinen Puffergrößen verursachte Leistungsverschlechterung nicht kompensieren.