2015-07-21 6 views
8

Ich versuche zu verstehen, wie die neue akka.http-Bibliothek zu verwenden. Ich möchte eine HTTP-Anfrage an einen Server senden und den gesamten Antworttext als eine einzelne Zeichenfolge lesen, um eine Source[String,?] zu erzeugen.Get ganze HttpResponse Körper als String mit Akka-Streams HTTP

Hier ist die beste Lösung, ich in der Lage war, so weit zu produzieren:

def get(
    modelID: String, 
    pool: Flow[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool] 
): Source[String,Unit] = { 
    val uri = reactionsURL(modelID) 
    val req = HttpRequest(uri = uri) 
    Source.single((req,0)) 
    .via(pool) 
    .map { 
     case (Success(resp),_) => 
     resp.entity.dataBytes.map(_.decodeString("utf-8")) 
    }.flatten(FlattenStrategy.concat) 
    .grouped(1024) 
    .map(_.mkString) 

Es scheint gut zu funktionieren (mit Ausnahme des fehlenden Fehlerpfades), aber es ist ein bisschen klobig für solche einfache Aufgaben. Gibt es eine intelligentere Lösung? Kann ich die grouped/ vermeiden?

Antwort

11

Sie können toStrict-Methode von HttpResponse mit Timeout verwenden. Es sammelt die ganze Antwort als Zukunft.

def toStrict (Timeout: FiniteDuration) (implizite ec: ExecutionContext, fm: Materializer): Future [Strict] Gibt eine gemeinsam nutzbare und serializable

Kopie dieser Nachricht mit einer strengen Einheit.

Beispiel:

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.{HttpResponse, HttpRequest} 
import akka.stream.{Materializer, ActorMaterializer} 
import akka.stream.scaladsl.{Sink, Flow, Source} 
import scala.concurrent.{ExecutionContext, Future} 
import scala.concurrent.duration._ 

import scala.util.{Try, Success} 

object Main extends App { 

    implicit val system = ActorSystem() 

    import system.dispatcher 

    implicit val materializer = ActorMaterializer() 

    val host = "127.0.0.1" 
    lazy val pool = Http().newHostConnectionPool[Int](host, 9000) 

    FlowBuilder.get("/path", pool).to(Sink.foreach(_.foreach(println))).run() 

} 

object FlowBuilder { 
    def get(modelID: String, pool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool]) 
     (implicit ec: ExecutionContext, mat: Materializer): Source[Future[String], Unit] = { 
    val uri = modelID 
    val req = HttpRequest(uri = modelID) 
    Source.single((req, 0)).via(pool) 
     .map { 
     case (Success(resp), _) => resp.entity.toStrict(5 seconds).map(_.data.decodeString("UTF-8")) 
    } 
    } 
} 
+0

OK. Aber ich brauche eine 'Source [String, Unit] ', weil ich weitere Transformationen anwenden muss. Mit deiner Lösung werde ich Materializer und ExecuctionContext weitergeben und dann alles in eine neue Quelle umwandeln ... Ist es nicht klirrender oder habe ich etwas übersehen? – paradigmatic

+0

@paradigmatische Ich füge Beispiel hinzu. ExecutionContext und Materializer können als implizite Argumente festgelegt werden. – Zernike

7

können Sie Unmarshall verwenden, die auch auf andere Arten arbeiten z Json von Spray-Json. Dies auch als strict gibt Future[_] zurück.

Beispiel:

authedReq.via(authServerReqResFlow).mapAsync(1) { case (tryRes, _) => 
     tryRes match { 
     case Failure(exception) => Future.failed[Principal](exception) 
     case Success(response @ HttpResponse(StatusCodes.OK,_,_,_)) => 
      val userContext = Unmarshal(response).to[UserContextData] 
      userContext.map { 
      case UserContextData(UserInfo(_, userName, fullName, email, title), _, _) => 
       Principal(userName, fullName, email, title) 
      } 
     case Success(response @ HttpResponse(responseCode,_,entity,_)) => 
      Unmarshal(entity).to[String].flatMap(msg => Future.failed(new AuthenticationFailure(s"$responseCode\n$msg"))) 
     } 
    }