2015-01-05 22 views
8

Gibt es einige Codebeispiele für die Verwendung von org.reactivestreams Bibliotheken zur Verarbeitung großer Datenströme mit Java NIO (für hohe Leistung)? Ich strebe eine verteilte Verarbeitung an, daher wären Beispiele, die Akka verwenden, am besten, aber ich kann das herausfinden.Wie verwende ich Reactive Streams für die NIO-Binärverarbeitung?

Es scheint immer noch der Fall zu sein, dass die meisten (ich hoffe, nicht alle) Beispiele von Dateien in scala Resort Source (nicht binär) oder direkte Java NIO Lesen (und sogar Dinge wie Files.readAllBytes!)

Vielleicht Gibt es eine Aktivatorvorlage, die ich verpasst habe? (Akka Streams with Scala! ist in der Nähe Adressierung alles, was ich brauche außer der binären/NIO-Seite)

Antwort

4

Wir verwenden tatsächlich akka Streams, um Binärdateien zu verarbeiten. Es war ein wenig schwierig zu bekommen Dinge gehen, wie es keine Dokumentation rund um das war, aber das ist, was wir kamen mit:

val binFile = new File(filePath) 
val inputStream = new BufferedInputStream(new FileInputStream(binFile)) 
val binStream = Stream.continually(inputStream.read).takeWhile(-1 != _).map(_.toByte) 
val binSource = Source(binStream) 

Sobald Sie binSource, die ein akka Source[Byte] können Sie voran gehen und fange an, irgendwelche Stromtransformationen anzuwenden (map, flatMap, transform, etc ...), die du willst. Diese Funktion nutzt das des Begleitobjekts, das eine Iterable übernimmt, die in einer Skala Stream übergeben wird, die die Daten träge einlesen und für Ihre Transformationen verfügbar machen sollte.

EDIT

Wie Konrad in den Kommentaren darauf hingewiesen, auf die Tatsache zurückzuführen, ein Strom kann ein Problem mit großen Dateien sein, dass es memoization die Elemente führt er begegnet, wie es träge ist, den Strom Aufbau aus. Dies kann zu Situationen mit zu wenig Arbeitsspeicher führen, wenn Sie nicht vorsichtig sind. wenn man sich die Dokumentation sieht jedoch für Stream gibt es eine Spitze memoization zur Vermeidung von im Speicher Aufbau:

Man muss vorsichtig von memoization sein; Sie können sehr schnell große Mengen an Speicher auf essen, wenn Sie nicht vorsichtig sind. Der Grund dafür ist, dass die Memoisierung des Streams eine Struktur ähnlich wie scala.collection.immutable.List erstellt. Solange etwas an den Kopf hält, hält sich der Kopf an den Schwanz, und so geht es rekursiv weiter. Wenn sich andererseits nichts an dem Kopf befindet (z. B. haben wir def zum Definieren des Streams verwendet), dann verschwindet es, sobald es nicht mehr direkt verwendet wird.

Damit berücksichtigt, könnte man meinen ursprünglichen Beispiel wie folgt ändern:

val binFile = new File(filePath) 
val inputStream = new BufferedInputStream(new FileInputStream(binFile))  
val binSource = Source(() => binStream(inputStream).iterator) 

def binStream(in:BufferedInputStream) = Stream.continually(in.read).takeWhile(-1 != _).map(_.toByte) 

Also hier die Idee, die Stream über eine def zu bauen ist, und weisen nicht auf eine val und dann sofort erhalten die iterator von ihm und verwenden, um den Akka Source zu initialisieren. Dinge so einzurichten, sollte die Probleme mit Momoization vermeiden. Ich ließ den alten Code gegen eine große Datei laufen und war in der Lage, eine OutOfMemory Situation herzustellen, indem ich eine foreach auf der Source tat. Als ich es auf den neuen Code umstellte, konnte ich dieses Problem vermeiden.

+2

Die Verwendung von scala.collection.immutable.Stream ist hier ziemlich gefährlich - es verwendet memoization (!) (Siehe Dokumentation http://www.scala-lang.org/api/current/index.html#scala.collection .immutable.Stream), so dass Sie die gesamte Datei im Speicher haben, anstatt sie durch (!) zu streamen. –

+0

@ Konrad'ktoso'Malawski, ausgezeichneter Punkt. Ich werde ein Update mit einer Problemumgehung für das Memo-Problem veröffentlichen. – cmbaxter

+1

gutes Update, die Bereitstellung des Iterators des Eingabestreams funktioniert gut. Denken Sie daran, die Ressource zu schließen, wenn der Stream ebenfalls abgeschlossen ist. –

7

Verwenden Sie nicht scala.collection.immutable.Stream, um Dateien wie diese zu konsumieren, der Grund dafür ist, dass es memoization ausführt - das heißt, während ja es faul ist, hält es den gesamten Stream gepuffert (Memo) im Speicher!

Dies ist definitiv nicht was Sie wollen, wenn Sie über "Stream Verarbeitung einer Datei" denken. Der Grund dafür, dass Scala's Stream so funktioniert, liegt darin, dass es in einer funktionalen Umgebung sinnvoll ist - Sie können es vermeiden, die Fibbonachi-Zahlen immer wieder einfach zu berechnen, zum Beispiel für weitere Details siehe ScalaDoc.

Akka Streams bietet Implementierungen von Reactive Streams und stellt eine FileIO Klasse zur Verfügung, die Sie hier verwenden können (es wird die Daten aus der Datei nur bei Bedarf zurückdrucken und der Rest des Streams ist bereit, es zu verbrauchen) :

import java.io._ 
import akka.actor.ActorSystem 
import akka.stream.scaladsl.{ Sink, Source } 

object ExampleApp extends App { 


    implicit val sys = ActorSystem() 
    implicit val mat = FlowMaterializer() 

    FileIO.fromPath(Paths.get("/example/file.txt")) 
    .map(c ⇒ { print(c); c }) 
    .runWith(Sink.onComplete(_ ⇒ { f.close(); sys.shutdown() })) 
} 

Hier sind weitere Dokumente über mit IO with Akka Streams Hinweis arbeitet, dass dies für die aktuelle-as-Version von Akka zu schreiben ist, so dass die 2.5.x-Serie.

Hoffe, das hilft!

+0

Danke für die tolle Antwort - ich musste meine eigene Frage erneut finden, um zu wissen, wonach ich suchte: http://doc.akka.io/docs/akka-stream-und-http-experimental/2.0.2/java /stream-io.html#Streaming_File_IO - und akka 2.4 ist out (vermutlich bedeutet das, dass es NIO 2 ist)! (Ich werde akzeptieren, sobald Sie oder ich aktualisieren/erstellen Sie eine Code-Antwort mit der API) – Stephen

+0

Wird es wirklich immer den gesamten Stream im Speicher halten? Oder hängt es davon ab, dass Sie einen Verweis auf den Anfang des Streams haben? Mein (Wunsch-) Eindruck war, dass "Stream" -Elemente schließlich freigegeben werden, wenn Sie iterativ den Schwanz weiter bearbeiten und den Kopf vergessen. – dividebyzero

+0

Bitte lesen Sie die Dokumentation, ich habe sie unten verlinkt; http://www.scala-lang.org/api/current/scala/collection/immutable/Stream.html –