2014-09-02 8 views
10

Ich habe ein wenig mit der experimentellen Akka Streams API herumgespielt und habe einen Anwendungsfall, den ich sehen wollte. Für meinen Anwendungsfall habe ich eine StreamTcp basierte Flow, die von Bindung der Eingangsstrom von Verbindungen zu meinem Server-Socket gefüttert wird. Der Flow, den ich habe, basiert auf ByteString Daten, die in ihn kommen. Die Daten, die hereinkommen, haben ein Trennzeichen, das bedeutet, dass ich alles vor dem Trennzeichen als eine Nachricht und alles nach und bis zum nächsten Trennzeichen als die nächste Nachricht behandeln soll. So mit einem einfacheren Beispiel Herumspielen, keine Sockets und nur statischen Text, das ist, was ich kam mit:Wie man einen eingehenden Stream auf einem Delimiter-Zeichen unter Verwendung von Akka-Streams aufteilt

import akka.actor.ActorSystem 
import akka.stream.{ FlowMaterializer, MaterializerSettings } 
import akka.stream.scaladsl.Flow 
import scala.util.{ Failure, Success } 
import akka.util.ByteString 

object BasicTransformation { 

    def main(args: Array[String]): Unit = { 
    implicit val system = ActorSystem("Sys") 

    val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.") 

    Flow(data). 
     splitWhen(c => c == '.'). 
     foreach{producer => 
     Flow(producer). 
      filter(c => c != '.'). 
      fold(new StringBuilder)((sb, c) => sb.append(c.toChar)). 
      map(_.toString). 
      filter(!_.isEmpty). 
      foreach(println(_)). 
      consume(FlowMaterializer(MaterializerSettings())) 
     }. 
     onComplete(FlowMaterializer(MaterializerSettings())) { 
     case any => 
      system.shutdown 
     } 
    } 
} 

Die Hauptfunktion auf dem Flow, die ich fand, dass mein Ziel splitWhen war, zu erreichen, die dann produziert zusätzliche Teilflüsse, eine für jede Nachricht pro . Delimiter. Ich bearbeite dann jeden Teilfluss mit einer weiteren Schrittkette und drucke schließlich die einzelnen Nachrichten am Ende aus.

Dies scheint alles ein wenig wortreich, um zu erreichen, was ich für einen ziemlich einfachen und häufigen Anwendungsfall hielt. Meine Frage ist also, gibt es eine sauberere und weniger ausführliche Möglichkeit, dies zu tun, oder ist dies die richtige und bevorzugte Art, einen Stream durch ein Trennzeichen aufzuteilen?

Antwort

1

Nachdem ich dieselbe Frage in der Akka User's Group gestellt habe, habe ich einige Vorschläge von Endre Varga und Viktor Klang (https://groups.google.com/forum/#!topic/akka-user/YsnwIAjQ3EE) bekommen. Ich endete mit Endre Vorschlag einer Transformer und dann mit der transform Methode auf der Flow. Eine leicht modifizierte Version von meinem vorherigen Beispiel ist unten enthalten:

import akka.actor.ActorSystem 
import akka.stream.{ FlowMaterializer, MaterializerSettings } 
import akka.stream.scaladsl.Flow 
import scala.util.{ Failure, Success } 
import akka.util.ByteString 
import akka.stream.Transformer 
import akka.util.ByteStringBuilder 

object BasicTransformation { 

    def main(args: Array[String]): Unit = { 
    implicit val system = ActorSystem("Sys")       
    implicit val mater = FlowMaterializer(MaterializerSettings()) 

    val data = List(
     ByteString("Lorem Ipsum is"), 
     ByteString(" simply.Dummy text of.The prin"), 
     ByteString("ting.And typesetting industry.") 
    ) 
    Flow(data).transform(new PeriodDelimitedTransformer).foreach(println(_)) 
    } 
} 

mit der Definition von PeriodDelimitedTransformer ist folgende:

class PeriodDelimitedTransformer extends Transformer[ByteString,String]{ 
    val buffer = new ByteStringBuilder 

    def onNext(msg:ByteString) = {  
    val msgString = msg.utf8String 
    val delimIndex = msgString.indexOf('.') 
    if (delimIndex == -1){ 
     buffer.append(msg) 
     List.empty 
    } 
    else{ 
     val parts = msgString.split("\\.") 
     val endsWithDelim = msgString.endsWith(".") 

     buffer.putBytes(parts.head.getBytes()) 
     val currentPiece = buffer.result.utf8String    
     val otherPieces = parts.tail.dropRight(1).toList 

     buffer.clear 
     val lastPart = 
     if (endsWithDelim){ 
      List(parts.last) 
     } 
     else{ 
      buffer.putBytes(parts.last.getBytes()) 
      List.empty 
     }   


     val result = currentPiece :: otherPieces ::: lastPart 
     result 
    } 

    } 
} 

einige der Komplexität meiner bisherigen Lösung So oben in diesen Transformer gerollt , aber das scheint der beste Ansatz zu sein. In meiner ursprünglichen Lösung wird der Stream in mehrere Sub-Streams aufgeteilt und das ist nicht wirklich das, was ich wollte.

10

Es sieht so aus, als ob die API kürzlich um akka.stream.scaladsl.Framing erweitert wurde. Die Dokumentation enthält auch eine example wie man es benutzt. In Bezug auf Ihre Frage:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Framing, Source} 
import akka.util.ByteString 
import com.typesafe.config.ConfigFactory 

object TcpDelimiterBasedMessaging extends App { 
    object chunks { 
    val first = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.") 
    val second = ByteString("More text.delimited by.a period.") 
    } 

    implicit val system = ActorSystem("delimiter-based-messaging", ConfigFactory.defaultReference()) 
    implicit val dispatcher = system.dispatcher 
    implicit val materializer = ActorMaterializer() 

    Source(chunks.first :: chunks.second :: Nil) 
    .via(Framing.delimiter(ByteString("."), Int.MaxValue)) 
    .map(_.utf8String) 
    .runForeach(println) 
    .onComplete(_ => system.terminate()) 
} 

erzeugt die folgende Ausgabe: Lorem Ipsum is simply Dummy text of the printing And typesetting industry More text delimited by a period

+0

Perfect! Dies sollte die akzeptierte Antwort sein. Beachten Sie auch, dass es über Chunks funktioniert.Versuchen Sie es mit: 'val first = ByteString ("Lorem Ipsum ist simply.Dummy Text des printing.And Satz")' 'val Sekunde = ByteString (“ industry.More text.delimited by.a Zeitraum. ")" –

0

denke ich Andreys Verwendung von Framing die beste Lösung für Ihre Frage, aber ich hatte ein ähnliches Problem und fand Framing zu begrenzt sein. Ich habe stattdessen statefulMapConcat verwendet, was es Ihnen erlaubt, Ihren Input ByteString mit beliebigen Regeln zu gruppieren. Hier ist der Code für Ihre Frage, falls es hilft jemand:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Flow, Source} 
import akka.util.ByteString 

object BasicTransformation extends App { 

    implicit val system = ActorSystem("Sys") 
    implicit val materializer = ActorMaterializer() 
    implicit val dispatcher = system.dispatcher 
    val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.") 

    val grouping = Flow[Byte].statefulMapConcat {() => 
    var bytes = ByteString() 
    byt => 
     if (byt == '.') { 
     val string = bytes.utf8String 
     bytes = ByteString() 
     List(string) 
     } else { 
     bytes :+= byt 
     Nil 
     } 
    } 

    Source(data).via(grouping).runForeach(println).onComplete(_ => system.terminate()) 
} 

Welche produziert: Lorem Ipsum is simply Dummy text of the printing And typesetting industry