2012-04-27 13 views
7

So ist die Play2.0 Enumeratee page zeigt ein Beispiel der Verwendung eines der &> oder through Methode zu ändern ein Enumerator[String] in ein Enumerator[Int] ein enumeratee zu Chunk einen Enumerator schreiben:Wie entlang verschiedener Grenzen

val toInt: Enumeratee[String,Int] = Enumeratee.map[String]{ s => s.toInt } 
val ints: Enumerator[Int] = strings &> toInt 

Es ist auch Ein Enumeratee.grouped Enumeratee zum Erstellen eines Enumerators von Chunks aus einzelnen Elementen. Das schien gut zu funktionieren.

Aber was ich sehe, ist, dass die übliche Eingabe in Form von Array[Byte] sein würde (die von Enumerator.fromFile und Enumerator.fromStream zurückgegeben wird). In diesem Sinne würde ich gerne diese Array[Byte] Eingänge nehmen und sie zu einem Enumerator[String] machen, zum Beispiel wenn jeder String eine Zeile ist (terminiert durch einen '\n'). Die Grenzen für die Linien und die Array[Byte] Elemente stimmen normalerweise nicht überein. Wie schreibe ich einen Enumerator, der die Chunked-Arrays in Chunked-Strings konvertieren kann?

Der Zweck besteht darin, diese Zeilen zurück zum Browser zu chunken, da jedes Array[Byte] verfügbar wird, und die übrig gebliebenen Bytes zu behalten, die nicht Teil einer vollständigen Zeile waren, bis der nächste Eingabeklotz kommt.

Im Idealfall würde ich gerne ein Verfahren haben, die eine iter: Iteratee[Array[Byte], T] gegeben und ein Enumerator[Array[Byte]] geben Sie mir eine Enumerator[T] zurück, wo meine T Elemente von iter analysiert wurden.

Zusätzliche Informationen: Ich hatte ein wenig Zeit, um meinen Code aufzuräumen und hier ist ein konkretes Beispiel für das, was ich versuche zu tun. Ich habe folgende iteratees, die die nächste Zeile erkennen:

import play.api.libs.iteratee._ 
type AB = Array[Byte] 

def takeWhile(pred: Byte => Boolean): Iteratee[AB, AB] = { 
    def step(e: Input[AB], acc: AB): Iteratee[AB, AB] = e match { 
    case Input.EOF => Done(acc, Input.EOF) 
    case Input.Empty => Cont(step(_, acc)) 
    case Input.El(arr) => 
     val (taking, rest) = arr.span(pred) 
     if (rest.length > 0) Done(acC++ taking, Input.El(rest)) 
     else Cont(step(_, acC++ taking)) 
    } 
    Cont(step(_, Array())) 
} 

val line = for { 
    bytes <- takeWhile(b => !(b == '\n' || b == '\r')) 
    _  <- takeWhile(b => b == '\n' || b == '\r') 
} yield bytes 

Und was ich möchte ist so etwas zu tun:

Ok.stream(Enumerator.fromFile(filename) &> chunkBy(line)).as("text/plain") 

Antwort

5

https://github.com/playframework/Play20/commit/f979006a7e2c1c08ca56ee0bae67b5463ee099c1#L3R131 Hat etwas ähnliches wie das, was Sie tun. Ich habe mich gruppiert, um mich um die restlichen Eingaben zu kümmern. Der Code sieht im Grunde wie:

val upToNewLine = 
    Traversable.splitOnceAt[String,Char](_ != '\n') &>> 
    Iteratee.consume() 

Enumeratee.grouped(upToNewLine) 

Auch muß ich wiederholt in der gleichen Art und Weise

+0

Kühle beheben. Es fühlte sich an, als ob "gruppiert" hätte tun sollen, was ich wollte. – huynhjl

2

Hier ist, was ich nach einigen Stunden des Experimentierens haben. Ich hoffe, dass jemand eine elegantere Umsetzung finden kann, da ich meiner kaum folgen kann. Hier

def chunkBy(chunker: Iteratee[AB, AB]) = new Enumeratee[AB, AB] { 
    def applyOn[A](inner: Iteratee[AB, A]): Iteratee[AB, Iteratee[AB, A]] = { 
    def step(e: Input[AB], in: Iteratee[AB, A], leftover: Input[AB]): 
      Iteratee[AB, Iteratee[AB, A]] = { 
     e match { 
     case Input.EOF => 
      // if we have a leftover and it's a chunk, then output it 
      leftover match { 
      case Input.EOF | Input.Empty => Done(in, leftover) 
      case Input.El(_) => 
       val lastChunk = Iteratee.flatten(Enumerator.enumInput(leftover) 
       >>> Enumerator.eof |>> chunker) 
       lastChunk.pureFlatFold(
       done = { (chunk, rest) => 
        val nextIn = Iteratee.flatten(Enumerator(chunk) |>> in) 
        nextIn.pureFlatFold(
        done = (a, e2) => Done(nextIn, e2), 
        // nothing more will come 
        cont = k => Done(nextIn, Input.EOF), 
        error = (msg, e2) => Error(msg, e2)) 
       }, 
       // not enough content to get a chunk, so drop content 
       cont = k => Done(in, Input.EOF), 
       error = (msg, e2) => Error(msg, e2)) 
      } 
     case Input.Empty => Cont(step(_, in, leftover)) 
     case Input.El(arr) => 
      // feed through chunker 
      val iChunks = Iteratee.flatten(
      Enumerator.enumInput(leftover) 
       >>> Enumerator(arr) 
       >>> Enumerator.eof // to extract the leftover 
       |>> repeat(chunker)) 
      iChunks.pureFlatFold(
      done = { (chunks, rest) => 
       // we have our chunks, feed them to the inner iteratee 
       val nextIn = Iteratee.flatten(Enumerator(chunks: _*) |>> in) 
       nextIn.pureFlatFold(
       done = (a, e2) => Done(nextIn, e2), 
       // inner iteratee needs more data 
       cont = k => Cont(step(_: Input[AB], nextIn, 
        // we have to ignore the EOF we fed to repeat 
        if (rest == Input.EOF) Input.Empty else rest)), 
       error = (msg, e2) => Error(msg, e2)) 
      }, 
      // not enough content to get a chunk, continue 
      cont = k => Cont(step(_: Input[AB], in, leftover)), 
      error = (msg, e2) => Error(msg, e2)) 
     } 
    } 
    Cont(step(_, inner, Input.Empty)) 
    } 
} 

ist die Definition zu meinen benutzerdefinierten repeat:

// withhold the last chunk so that it may be concatenated with the next one 
def repeat(chunker: Iteratee[AB, AB]) = { 
    def loop(e: Input[AB], ch: Iteratee[AB, AB], acc: Vector[AB], 
     leftover: Input[AB]): Iteratee[AB, Vector[AB]] = e match { 
    case Input.EOF => ch.pureFlatFold(
     done = (a, e) => Done(acc, leftover), 
     cont = k => k(Input.EOF).pureFlatFold(
     done = (a, e) => Done(acc, Input.El(a)), 
     cont = k => sys.error("divergent iter"), 
     error = (msg, e) => Error(msg, e)), 
     error = (msg, e) => Error(msg, e)) 
    case Input.Empty => Cont(loop(_, ch, acc, leftover)) 
    case Input.El(_) => 
     val i = Iteratee.flatten(Enumerator.enumInput(leftover) 
      >>> Enumerator.enumInput(e) |>> ch) 
     i.pureFlatFold(
     done = (a, e) => loop(e, chunker, acc :+ a, Input.Empty), 
     cont = k => Cont(loop(_, i, acc, Input.Empty)), 
     error = (msg, e) => Error(msg, e)) 
    } 
    Cont(loop(_: Input[AB], chunker, Vector(), Input.Empty)) 
} 

Dies funktioniert auf einigen Proben, darunter diese:

val source = Enumerator(
    "bippy".getBytes, 
    "foo\n\rbar\n\r\n\rbaz\nb".getBytes, 
    "azam\ntoto\n\n".getBytes) 
Ok.stream(source 
    &> chunkBy(line) 
    &> Enumeratee.map(l => l ++ ".\n".getBytes) 
).as("text/plain") 

Welche druckt:

bippyfoo. 
bar. 
baz. 
bazam. 
toto.