Edit: Diese Antwort gilt nur für ältere Version von Spark und Akka. Die Antwort von PH88 ist die korrekte Methode für aktuelle Versionen.
Sie können ein Zwischenprodukt akka.actor.Actor
verwenden, das eine Quelle speist (ähnlich this question). Die folgende Lösung ist nicht "reaktiv", da der zugrunde liegende Actor einen Puffer mit RDD-Nachrichten verwalten müsste, die gelöscht werden könnten, wenn der Downstream-HTTP-Client keine Chunks schnell genug verbraucht. Dieses Problem tritt jedoch unabhängig von den Implementierungsdetails auf, da Sie das "Drosseln" des Akka-Stream-Gegendrucks nicht mit dem DStream verbinden können, um die Daten zu verlangsamen. Dies liegt daran, dass DStream org.reactivestreams.Publisher
nicht implementiert.
Die grundlegende Topologie ist: eine Stromquelle von ByteStrings (Nachrichten)
//JobManager definition is provided in the link
val actorRef = actorSystem actorOf JobManager.props
erstellen basierend auf:
DStream --> Actor with buffer --> Source
dieses toplogy Sie ein Schauspieler ähnlich der Implementierung here zu schaffen haben zu konstruieren der JobManager. Außerdem wandelt die ByteString
-HttpEntity.ChunkStreamPart
das ist, was die Httpresponse erfordert:
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.HttpEntity
import akka.util.ByteString
type Message = ByteString
val messageToChunkPart =
Flow[Message].map(HttpEntity.ChunkStreamPart(_))
//Actor with buffer --> Source
val source : Source[HttpEntity.ChunkStreamPart, Unit] =
Source(ActorPublisher[Message](actorRef)) via messageToChunkPart
Link zu dem Funken DSTREAM zum Schauspieler, so dass jeder incomining RDD zu einem Iterable von ByteString umgewandelt wird und dann an den Schauspieler weitergeleitet:
import org.apache.spark.streaming.dstream.Dstream
import org.apache.spark.rdd.RDD
val dstream : DStream = ???
//This function converts your RDDs to messages being sent
//via the http response
def rddToMessages[T](rdd : RDD[T]) : Iterable[Message] = ???
def sendMessageToActor(message : Message) = actorRef ! message
//DStream --> Actor with buffer
dstream foreachRDD {rddToMessages(_) foreach sendMessageToActor}
Geben Sie die Quelle der Httpresponse:
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
}
Hinweis: es sollte sehr wenig Zeit/cod e zwischen der dstream foreachRDD
Zeile und der HttpReponse seit dem internen Puffer des Actor beginnt sofort mit ByteString Nachricht aus dem DStream zu füllen, nachdem die foreach
Zeile ausgeführt wird.