2015-10-28 8 views
5

Ich erstelle eine REST-API, die eine Berechnung in einem Spark-Cluster startet und mit einem Chunked-Stream der Ergebnisse antwortet. Wenn der Spark-Datenstrom mit Berechnungsergebnissen verwendet wird, kann ich die Daten aus Spark mitIdiomatische Verwendung von Spark DStream als Quelle für einen Akka-Stream

dstream.foreachRDD() 

senden. Ich schicke die Chunked HTTP-Antwort mit Akka-http:

val requestHandler: HttpRequest => HttpResponse = { 
    case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) => 
    HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source)) 
} 

Der Einfachheit halber, ich versuche ersten Klartext zum Laufen zu bringen, wird später JSON Rangier hinzufügen.

Aber was ist der idiomatische Weg, den Spark DStream als Quelle für den Akka Stream zu verwenden? Ich dachte, ich sollte es über einen Socket machen können, aber da der Spark-Treiber und der REST-Endpunkt auf der gleichen JVM sitzen, die einen Socket öffnet, erscheint das nur ein bisschen übertrieben.

Antwort

1

Edit: Diese Antwort gilt nur für ältere Version von Spark und Akka. Die Antwort von PH88 ist die korrekte Methode für aktuelle Versionen.

Sie können ein Zwischenprodukt akka.actor.Actor verwenden, das eine Quelle speist (ähnlich this question). Die folgende Lösung ist nicht "reaktiv", da der zugrunde liegende Actor einen Puffer mit RDD-Nachrichten verwalten müsste, die gelöscht werden könnten, wenn der Downstream-HTTP-Client keine Chunks schnell genug verbraucht. Dieses Problem tritt jedoch unabhängig von den Implementierungsdetails auf, da Sie das "Drosseln" des Akka-Stream-Gegendrucks nicht mit dem DStream verbinden können, um die Daten zu verlangsamen. Dies liegt daran, dass DStream org.reactivestreams.Publisher nicht implementiert.

Die grundlegende Topologie ist: eine Stromquelle von ByteStrings (Nachrichten)

//JobManager definition is provided in the link 
val actorRef = actorSystem actorOf JobManager.props 

erstellen basierend auf:

DStream --> Actor with buffer --> Source 

dieses toplogy Sie ein Schauspieler ähnlich der Implementierung here zu schaffen haben zu konstruieren der JobManager. Außerdem wandelt die ByteString-HttpEntity.ChunkStreamPart das ist, was die Httpresponse erfordert:

import akka.stream.actor.ActorPublisher 
import akka.stream.scaladsl.Source 
import akka.http.scaladsl.model.HttpEntity 
import akka.util.ByteString 

type Message = ByteString 

val messageToChunkPart = 
    Flow[Message].map(HttpEntity.ChunkStreamPart(_)) 

//Actor with buffer --> Source 
val source : Source[HttpEntity.ChunkStreamPart, Unit] = 
    Source(ActorPublisher[Message](actorRef)) via messageToChunkPart 

Link zu dem Funken DSTREAM zum Schauspieler, so dass jeder incomining RDD zu einem Iterable von ByteString umgewandelt wird und dann an den Schauspieler weitergeleitet:

import org.apache.spark.streaming.dstream.Dstream 
import org.apache.spark.rdd.RDD 

val dstream : DStream = ??? 

//This function converts your RDDs to messages being sent 
//via the http response 
def rddToMessages[T](rdd : RDD[T]) : Iterable[Message] = ??? 

def sendMessageToActor(message : Message) = actorRef ! message 

//DStream --> Actor with buffer 
dstream foreachRDD {rddToMessages(_) foreach sendMessageToActor} 

Geben Sie die Quelle der Httpresponse:

val requestHandler: HttpRequest => HttpResponse = { 
    case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) => 
    HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source)) 
} 

Hinweis: es sollte sehr wenig Zeit/cod e zwischen der dstream foreachRDD Zeile und der HttpReponse seit dem internen Puffer des Actor beginnt sofort mit ByteString Nachricht aus dem DStream zu füllen, nachdem die foreach Zeile ausgeführt wird.

7

Nicht sicher über die Version von api zum Zeitpunkt der Frage. Aber jetzt, mit akka-stream 2.0.3, glaube ich, können Sie es tun wie:

val source = Source 
    .actorRef[T](/* buffer size */ 100, OverflowStrategy.dropHead) 
    .mapMaterializedValue[Unit] { actorRef => 
    dstream.foreach(actorRef ! _) 
    }