In den letzten paar Tagen habe ich versucht, herauszufinden, der beste Weg, um eine HTTP-Ressource in eine Datei mit Akka Streams und HTTP herunterladen.Wie kann ich eine HTTP-Ressource mit Akka Streams und HTTP in eine Datei herunterladen?
Zunächst begann ich mit den Future-Based Variant und das sah ungefähr so aus:
def downloadViaFutures(uri: Uri, file: File): Future[Long] = {
val request = Get(uri)
val responseFuture = Http().singleRequest(request)
responseFuture.flatMap { response =>
val source = response.entity.dataBytes
source.runWith(FileIO.toFile(file))
}
}
diese Art von Ordnung war, aber sobald ich mehr über reine Akka Streams gelernt wollte ich versuchen, und verwenden Sie die Flow-Based Variant einen Strom zu erzeugen ausgehend von einer Source[HttpRequest]
. Zuerst stolperte ich mich völlig, bis ich auf die flatMapConcat
Flow-Transformation stolperte. Damit endete ein wenig ausführlicher auf:
def responseOrFail[T](in: (Try[HttpResponse], T)): (HttpResponse, T) = in match {
case (responseTry, context) => (responseTry.get, context)
}
def responseToByteSource[T](in: (HttpResponse, T)): Source[ByteString, Any] = in match {
case (response, _) => response.entity.dataBytes
}
def downloadViaFlow(uri: Uri, file: File): Future[Long] = {
val request = Get(uri)
val source = Source.single((request,()))
val requestResponseFlow = Http().superPool[Unit]()
source.
via(requestResponseFlow).
map(responseOrFail).
flatMapConcat(responseToByteSource).
runWith(FileIO.toFile(file))
}
Dann habe ich ein wenig kompliziert und verwenden Sie die Content-Disposition
Header erhalten wollte.
Gehen wir zurück in die Zukunft-basierte Variante:
def destinationFile(downloadDir: File, response: HttpResponse): File = {
val fileName = response.header[ContentDisposition].get.value
val file = new File(downloadDir, fileName)
file.createNewFile()
file
}
def downloadViaFutures2(uri: Uri, downloadDir: File): Future[Long] = {
val request = Get(uri)
val responseFuture = Http().singleRequest(request)
responseFuture.flatMap { response =>
val file = destinationFile(downloadDir, response)
val source = response.entity.dataBytes
source.runWith(FileIO.toFile(file))
}
}
Aber ich habe jetzt keine Ahnung, wie dies mit der Zukunfts basierte Variante zu tun. Dies ist so weit, wie ich bekam:
def responseToByteSourceWithDest[T](in: (HttpResponse, T), downloadDir: File): Source[(ByteString, File), Any] = in match {
case (response, _) =>
val source = responseToByteSource(in)
val file = destinationFile(downloadDir, response)
source.map((_, file))
}
def downloadViaFlow2(uri: Uri, downloadDir: File): Future[Long] = {
val request = Get(uri)
val source = Source.single((request,()))
val requestResponseFlow = Http().superPool[Unit]()
val sourceWithDest: Source[(ByteString, File), Unit] = source.
via(requestResponseFlow).
map(responseOrFail).
flatMapConcat(responseToByteSourceWithDest(_, downloadDir))
sourceWithDest.runWith(???)
}
So jetzt habe ich eine Source
haben, dass ein oder mehrere (ByteString, File)
Elemente für jede File
emittieren (sagen, dass ich jeden File
da es keinen Grund ist die ursprüngliche Source
hat eine einzige zu sein, HttpRequest
).
Gibt es trotzdem, diese zu nehmen und sie zu einem dynamischen Sink
zu leiten?
Ich denke, so etwas wie flatMapConcat
, wie zum Beispiel:
def runWithMap[T, Mat2](f: T => Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = ???
Damit ich downloadViaFlow2
mit vollenden konnte:
def destToSink(destination: File): Sink[(ByteString, File), Future[Long]] = {
val sink = FileIO.toFile(destination, true)
Flow[(ByteString, File)].map(_._1).toMat(sink)(Keep.right)
}
sourceWithDest.runWithMap {
case (_, file) => destToSink(file)
}
Hmm Ich hatte gehofft, es gäbe einen besseren Weg. Ich bin mir nicht sicher, ob das auch funktionieren wird. 'writeFile' kehrt zurück, sobald der FileIO-Stream materialisiert wurde. Wenn die Antwort chunked ist, muss sie der Reihe nach in die Datei geschrieben werden.Ähnliches Problem bei der Verwendung von 'mapAsync'. Der 'append' Parameter müsste ebenfalls gesetzt werden. Es scheint auch so, als würden irgendwelche Fehler, die in die Datei schreiben, nicht dazu führen, dass der äußere Strom ein Fehlersignal erhält. – Steiny
@Steiny Breaking meine Antwort auf Ihre mehrere Kommentare: (a) korrigieren, schreiben Sie Datei kehrt mit einer Zukunft sofort, aber die MapAsync behandelt dies (b) gibt es keine Lösung, die chunkedsource korrigieren kann noch war dieser Teil der ursprünglichen Frage/Anforderungen (c) append ist nur notwendig, wenn in die gleiche Datei geschrieben wird (d) das Erzwingen, dass der äußere Strom bei irgendeiner Datei fehlschlägt write fail war nicht Teil der ursprünglichen Frage. Du hast gefragt: "Gibt es überhaupt welche, um diese zu nehmen und zu einem dynamischen Waschbecken zu leiten?", Beantwortet meine Antwort diese Frage. Ich habe meine Antwort im Kontext Ihres Beispielcodes geschrieben ... –