Wir verwenden tatsächlich akka Streams, um Binärdateien zu verarbeiten. Es war ein wenig schwierig zu bekommen Dinge gehen, wie es keine Dokumentation rund um das war, aber das ist, was wir kamen mit:
val binFile = new File(filePath)
val inputStream = new BufferedInputStream(new FileInputStream(binFile))
val binStream = Stream.continually(inputStream.read).takeWhile(-1 != _).map(_.toByte)
val binSource = Source(binStream)
Sobald Sie binSource
, die ein akka Source[Byte]
können Sie voran gehen und fange an, irgendwelche Stromtransformationen anzuwenden (map
, flatMap
, transform
, etc ...), die du willst. Diese Funktion nutzt das des Begleitobjekts, das eine Iterable
übernimmt, die in einer Skala Stream
übergeben wird, die die Daten träge einlesen und für Ihre Transformationen verfügbar machen sollte.
EDIT
Wie Konrad in den Kommentaren darauf hingewiesen, auf die Tatsache zurückzuführen, ein Strom kann ein Problem mit großen Dateien sein, dass es memoization die Elemente führt er begegnet, wie es träge ist, den Strom Aufbau aus. Dies kann zu Situationen mit zu wenig Arbeitsspeicher führen, wenn Sie nicht vorsichtig sind. wenn man sich die Dokumentation sieht jedoch für Stream gibt es eine Spitze memoization zur Vermeidung von im Speicher Aufbau:
Man muss vorsichtig von memoization sein; Sie können sehr schnell große Mengen an Speicher auf essen, wenn Sie nicht vorsichtig sind. Der Grund dafür ist, dass die Memoisierung des Streams eine Struktur ähnlich wie scala.collection.immutable.List erstellt. Solange etwas an den Kopf hält, hält sich der Kopf an den Schwanz, und so geht es rekursiv weiter. Wenn sich andererseits nichts an dem Kopf befindet (z. B. haben wir def zum Definieren des Streams verwendet), dann verschwindet es, sobald es nicht mehr direkt verwendet wird.
Damit berücksichtigt, könnte man meinen ursprünglichen Beispiel wie folgt ändern:
val binFile = new File(filePath)
val inputStream = new BufferedInputStream(new FileInputStream(binFile))
val binSource = Source(() => binStream(inputStream).iterator)
def binStream(in:BufferedInputStream) = Stream.continually(in.read).takeWhile(-1 != _).map(_.toByte)
Also hier die Idee, die Stream
über eine def
zu bauen ist, und weisen nicht auf eine val
und dann sofort erhalten die iterator
von ihm und verwenden, um den Akka Source
zu initialisieren. Dinge so einzurichten, sollte die Probleme mit Momoization vermeiden. Ich ließ den alten Code gegen eine große Datei laufen und war in der Lage, eine OutOfMemory
Situation herzustellen, indem ich eine foreach
auf der Source
tat. Als ich es auf den neuen Code umstellte, konnte ich dieses Problem vermeiden.
Die Verwendung von scala.collection.immutable.Stream ist hier ziemlich gefährlich - es verwendet memoization (!) (Siehe Dokumentation http://www.scala-lang.org/api/current/index.html#scala.collection .immutable.Stream), so dass Sie die gesamte Datei im Speicher haben, anstatt sie durch (!) zu streamen. –
@ Konrad'ktoso'Malawski, ausgezeichneter Punkt. Ich werde ein Update mit einer Problemumgehung für das Memo-Problem veröffentlichen. – cmbaxter
gutes Update, die Bereitstellung des Iterators des Eingabestreams funktioniert gut. Denken Sie daran, die Ressource zu schließen, wenn der Stream ebenfalls abgeschlossen ist. –