2016-01-20 11 views
6

In den letzten paar Tagen habe ich versucht, herauszufinden, der beste Weg, um eine HTTP-Ressource in eine Datei mit Akka Streams und HTTP herunterladen.Wie kann ich eine HTTP-Ressource mit Akka Streams und HTTP in eine Datei herunterladen?

Zunächst begann ich mit den Future-Based Variant und das sah ungefähr so ​​aus:

def downloadViaFutures(uri: Uri, file: File): Future[Long] = { 
    val request = Get(uri) 
    val responseFuture = Http().singleRequest(request) 
    responseFuture.flatMap { response => 
    val source = response.entity.dataBytes 
    source.runWith(FileIO.toFile(file)) 
    } 
} 

diese Art von Ordnung war, aber sobald ich mehr über reine Akka Streams gelernt wollte ich versuchen, und verwenden Sie die Flow-Based Variant einen Strom zu erzeugen ausgehend von einer Source[HttpRequest]. Zuerst stolperte ich mich völlig, bis ich auf die flatMapConcat Flow-Transformation stolperte. Damit endete ein wenig ausführlicher auf:

def responseOrFail[T](in: (Try[HttpResponse], T)): (HttpResponse, T) = in match { 
    case (responseTry, context) => (responseTry.get, context) 
} 

def responseToByteSource[T](in: (HttpResponse, T)): Source[ByteString, Any] = in match { 
    case (response, _) => response.entity.dataBytes 
} 

def downloadViaFlow(uri: Uri, file: File): Future[Long] = { 
    val request = Get(uri) 
    val source = Source.single((request,())) 
    val requestResponseFlow = Http().superPool[Unit]() 
    source. 
    via(requestResponseFlow). 
    map(responseOrFail). 
    flatMapConcat(responseToByteSource). 
    runWith(FileIO.toFile(file)) 
} 

Dann habe ich ein wenig kompliziert und verwenden Sie die Content-Disposition Header erhalten wollte.

Gehen wir zurück in die Zukunft-basierte Variante:

def destinationFile(downloadDir: File, response: HttpResponse): File = { 
    val fileName = response.header[ContentDisposition].get.value 
    val file = new File(downloadDir, fileName) 
    file.createNewFile() 
    file 
} 

def downloadViaFutures2(uri: Uri, downloadDir: File): Future[Long] = { 
    val request = Get(uri) 
    val responseFuture = Http().singleRequest(request) 
    responseFuture.flatMap { response => 
    val file = destinationFile(downloadDir, response) 
    val source = response.entity.dataBytes 
    source.runWith(FileIO.toFile(file)) 
    } 
} 

Aber ich habe jetzt keine Ahnung, wie dies mit der Zukunfts basierte Variante zu tun. Dies ist so weit, wie ich bekam:

def responseToByteSourceWithDest[T](in: (HttpResponse, T), downloadDir: File): Source[(ByteString, File), Any] = in match { 
    case (response, _) => 
    val source = responseToByteSource(in) 
    val file = destinationFile(downloadDir, response) 
    source.map((_, file)) 
} 

def downloadViaFlow2(uri: Uri, downloadDir: File): Future[Long] = { 
    val request = Get(uri) 
    val source = Source.single((request,())) 
    val requestResponseFlow = Http().superPool[Unit]() 
    val sourceWithDest: Source[(ByteString, File), Unit] = source. 
    via(requestResponseFlow). 
    map(responseOrFail). 
    flatMapConcat(responseToByteSourceWithDest(_, downloadDir)) 
    sourceWithDest.runWith(???) 
} 

So jetzt habe ich eine Source haben, dass ein oder mehrere (ByteString, File) Elemente für jede File emittieren (sagen, dass ich jeden File da es keinen Grund ist die ursprüngliche Source hat eine einzige zu sein, HttpRequest).

Gibt es trotzdem, diese zu nehmen und sie zu einem dynamischen Sink zu leiten?

Ich denke, so etwas wie flatMapConcat, wie zum Beispiel:

def runWithMap[T, Mat2](f: T => Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = ??? 

Damit ich downloadViaFlow2 mit vollenden konnte:

def destToSink(destination: File): Sink[(ByteString, File), Future[Long]] = { 
    val sink = FileIO.toFile(destination, true) 
    Flow[(ByteString, File)].map(_._1).toMat(sink)(Keep.right) 
} 
sourceWithDest.runWithMap { 
    case (_, file) => destToSink(file) 
} 

Antwort

5

Die Lösung erfordert keine flatMapConcat. Wenn Sie keine Rückgabewerte aus der Datei schreiben müssen, dann können Sie Sink.foreach verwenden:

def writeFile(downloadDir : File)(httpResponse : HttpResponse) : Future[Long] = { 
    val file = destinationFile(downloadDir, httpResponse) 
    httpResponse.entity.dataBytes.runWith(FileIO.toFile(file)) 
} 

def downloadViaFlow2(uri: Uri, downloadDir: File) : Future[Unit] = { 
    val request = HttpRequest(uri=uri) 
    val source = Source.single((request,())) 
    val requestResponseFlow = Http().superPool[Unit]() 

    source.via(requestResponseFlow) 
     .map(responseOrFail) 
     .map(_._1) 
     .runWith(Sink.foreach(writeFile(downloadDir))) 
} 

Beachten Sie, dass die Sink.foreachFutures aus der writeFile Funktion erstellt. Daher gibt es nicht viel Rückstau. Die WriteFile könnte durch die Festplatte verlangsamt werden, aber der Stream würde weiterhin Futures generieren. Um dies zu steuern, können Sie verwenden Flow.mapAsyncUnordered (oder Flow.mapAsync):

val parallelism = 10 

source.via(requestResponseFlow) 
     .map(responseOrFail) 
     .map(_._1) 
     .mapAsyncUnordered(parallelism)(writeFile(downloadDir)) 
     .runWith(Sink.ignore) 

Wenn Sie die Langwerte für eine Gesamtzählung akkumulieren möchten, müssen Sie kombinieren mit einem Sink.fold:

source.via(requestResponseFlow) 
     .map(responseOrFail) 
     .map(_._1) 
     .mapAsyncUnordered(parallelism)(writeFile(downloadDir)) 
     .runWith(Sink.fold(0L)(_ + _)) 

Die Falte halten eine laufende Summe und geben den endgültigen Wert aus, wenn die Quelle der Anfragen versiegt ist.

+0

Hmm Ich hatte gehofft, es gäbe einen besseren Weg. Ich bin mir nicht sicher, ob das auch funktionieren wird. 'writeFile' kehrt zurück, sobald der FileIO-Stream materialisiert wurde. Wenn die Antwort chunked ist, muss sie der Reihe nach in die Datei geschrieben werden.Ähnliches Problem bei der Verwendung von 'mapAsync'. Der 'append' Parameter müsste ebenfalls gesetzt werden. Es scheint auch so, als würden irgendwelche Fehler, die in die Datei schreiben, nicht dazu führen, dass der äußere Strom ein Fehlersignal erhält. – Steiny

+1

@Steiny Breaking meine Antwort auf Ihre mehrere Kommentare: (a) korrigieren, schreiben Sie Datei kehrt mit einer Zukunft sofort, aber die MapAsync behandelt dies (b) gibt es keine Lösung, die chunkedsource korrigieren kann noch war dieser Teil der ursprünglichen Frage/Anforderungen (c) append ist nur notwendig, wenn in die gleiche Datei geschrieben wird (d) das Erzwingen, dass der äußere Strom bei irgendeiner Datei fehlschlägt write fail war nicht Teil der ursprünglichen Frage. Du hast gefragt: "Gibt es überhaupt welche, um diese zu nehmen und zu einem dynamischen Waschbecken zu leiten?", Beantwortet meine Antwort diese Frage. Ich habe meine Antwort im Kontext Ihres Beispielcodes geschrieben ... –