Ich habe ein wenig mit der experimentellen Akka Streams API herumgespielt und habe einen Anwendungsfall, den ich sehen wollte. Für meinen Anwendungsfall habe ich eine StreamTcp
basierte Flow
, die von Bindung der Eingangsstrom von Verbindungen zu meinem Server-Socket gefüttert wird. Der Flow, den ich habe, basiert auf ByteString
Daten, die in ihn kommen. Die Daten, die hereinkommen, haben ein Trennzeichen, das bedeutet, dass ich alles vor dem Trennzeichen als eine Nachricht und alles nach und bis zum nächsten Trennzeichen als die nächste Nachricht behandeln soll. So mit einem einfacheren Beispiel Herumspielen, keine Sockets und nur statischen Text, das ist, was ich kam mit:Wie man einen eingehenden Stream auf einem Delimiter-Zeichen unter Verwendung von Akka-Streams aufteilt
import akka.actor.ActorSystem
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.Flow
import scala.util.{ Failure, Success }
import akka.util.ByteString
object BasicTransformation {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("Sys")
val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")
Flow(data).
splitWhen(c => c == '.').
foreach{producer =>
Flow(producer).
filter(c => c != '.').
fold(new StringBuilder)((sb, c) => sb.append(c.toChar)).
map(_.toString).
filter(!_.isEmpty).
foreach(println(_)).
consume(FlowMaterializer(MaterializerSettings()))
}.
onComplete(FlowMaterializer(MaterializerSettings())) {
case any =>
system.shutdown
}
}
}
Die Hauptfunktion auf dem Flow
, die ich fand, dass mein Ziel splitWhen
war, zu erreichen, die dann produziert zusätzliche Teilflüsse, eine für jede Nachricht pro .
Delimiter. Ich bearbeite dann jeden Teilfluss mit einer weiteren Schrittkette und drucke schließlich die einzelnen Nachrichten am Ende aus.
Dies scheint alles ein wenig wortreich, um zu erreichen, was ich für einen ziemlich einfachen und häufigen Anwendungsfall hielt. Meine Frage ist also, gibt es eine sauberere und weniger ausführliche Möglichkeit, dies zu tun, oder ist dies die richtige und bevorzugte Art, einen Stream durch ein Trennzeichen aufzuteilen?
Perfect! Dies sollte die akzeptierte Antwort sein. Beachten Sie auch, dass es über Chunks funktioniert.Versuchen Sie es mit: 'val first = ByteString ("Lorem Ipsum ist simply.Dummy Text des printing.And Satz")' 'val Sekunde = ByteString (“ industry.More text.delimited by.a Zeitraum. ")" –