2016-03-28 17 views
1

ich eine Funktion für jede Datei in einem Verzeichnis und dessen Unter anwenden möchten, wie folgt:scala iteratee rekursiv verarbeiten Dateien und Unterverzeichnisse

def applyRecursively(dir: String, fn: (File) => Any) { 
    def listAndProcess(dir: File) { 
     dir.listFiles match { 
     case null => out.println("exception: dir cannot be listed: " + dir.getPath); List[File]() 
     case files => files.toList.sortBy(_.getName).foreach(file => { 
      fn(file) 
      if (!java.nio.file.Files.isSymbolicLink(file.toPath) && file.isDirectory) listAndProcess(file) 
     }) 
     } 
    } 
    listAndProcess(new File(dir)) 
    } 

    def exampleFn(file: File) { println(s"processing $file") } 

    applyRecursively(dir, exampleFn) 

das funktioniert. Die Frage hier ist, wie ich diesen Code mit scala Iteratees umgestalten könnte. so etwas wie dieses:

val en = Enumerator.generateM(...) // ??? 
val it: Iteratee[File, Unit] = Iteratee.foreach(exampleFn) 
val res = en.run(it) 
res.onSuccess { case x => println("DONE") } 
+0

Suchen Sie wirklich, wie Sie den 'Enumerator' erstellen, die die Dateiliste erzeugt? –

+0

Ja, das ist es. –

Antwort

2

Es nicht erfasst alle Ihre Anforderungen aber das kann Ihnen den Einstieg

object ExampleEnumerator { 
    import scala.concurrent.ExecutionContext.Implicits.global 

    def exampleFn(file: File) { println(s"processing $file") } 

    def listFiles(dir: File): Enumerator[File] = { 
    val files = Option(dir.listFiles).toList.flatten.sortBy(_.getName) 

    Enumerator(dir) andThen Enumerator(files :_*).flatMap(listFiles) 
    } 

    def main(args: Array[String]) { 
    import scala.concurrent.duration._ 

    val dir = "." 
    val en: Enumerator[File] = listFiles(new File(dir)) 
    val it: Iteratee[File, Unit] = Iteratee.foreach(exampleFn) 
    val res = en.run(it) 
    res.onSuccess { case x => println("DONE") } 

    Await.result(res, 10.seconds) 
    } 
} 
+0

Waw, das ist eine schöne Lösung! –

+0

Vielen Dank für Ihre Kommentare. @ m-z Ich habe doppelt überprüft und listFiles wird aufgerufen, wenn der Iteratee mehr Elemente benötigt. Ich sehe also nicht, warum die gesamte Auflistung in den Speicher geladen werden sollte. Bitte erkläre mir, ob mir etwas fehlt. –

+0

Ah, du hast Recht. Ich habe falsch gelesen, was Sie an "Enumerator.apply" weitergeleitet haben. Ich werde es mir genauer ansehen. –

3

Sie Enumerator.unfold dafür verwenden können. Die Signatur ist:

def unfold[S, E](s: S)(f: (S) => Option[(S, E)])(implicit ec: ExecutionContext): Enumerator[E] 

Die Idee ist, dass Sie mit einem Wert vom Typ S, zu starten und dann eine Funktion, um es anzuwenden, die eine Option[(S, E)] zurückgibt. Ein Wert von None bedeutet, dass Enumerator EOF erreicht hat. A Some enthält ein weiteres S zum Entfalten, und der nächste Wert Enumerator[E] wird generiert. In Ihrem Beispiel können Sie mit einem Array[File] (dem Anfangsverzeichnis) beginnen, den ersten Wert von Array nehmen und prüfen, ob es sich um eine Datei oder ein Verzeichnis handelt. Wenn es nur eine Datei ist, geben Sie den Schwanz des Array mit dem File zusammengetupft zurück. Wenn die File ein Verzeichnis ist, erhalten Sie die Dateiauflistung und fügen Sie sie an den Anfang der Array. Dann werden die nächsten Schritte in unfold weiterhin die enthaltenen Dateien verarbeiten.

Sie am Ende mit etwas wie folgt aus:

def list(dir: File)(implicit ec: ExecutionContext): Enumerator[File] = { 
    Enumerator.unfold(Array(dir)) { listing => 
    listing.headOption.map { file => 
     if(!java.nio.file.Files.isSymbolicLink(file.toPath) && file.isDirectory) 
     (file.listFiles.sortBy(f => (f.isDirectory, f.getName)) ++ listing.tail) -> file 
     else 
     listing.tail -> file 
    } 
    } 
} 

ich eine zusätzliche Art von isDirectory hinzugefügt erste nicht-Verzeichnisse zu priorisieren. Dies bedeutet, dass, wenn Verzeichnisinhalte zum Array hinzugefügt werden, die Dateien zuerst verbraucht werden, bevor weitere Inhalte hinzugefügt werden. Dies verhindert, dass sich der Speicherbedarf aufgrund der Rekursivität schnell ausdehnt.

Wenn Sie möchten, dass die Verzeichnisse aus dem endgültigen Enumerator entfernt werden, können Sie verwenden, um das zu tun. Sie werden am Ende mit etwas wie:

list(dir) &> Enumeratee.filter(!_.isDirectory) |>> Iteratee.foreach(fn) 
+0

wow, ich verstehe diesen Code immer noch nicht (die 'list' Funktion wird nur einmal aufgerufen, nicht rekursiv), aber es funktioniert. lassen Sie mich etwas Zeit, um es zu studieren! –

+0

cool, ich habe deine Antwort verstanden. Ich habe eine Antwort mit etwas Logging gepostet, um anderen zu helfen, es zu verstehen. Ich wünschte, ich könnte die Antwort von Jonas Anso und Ihre als akzeptiert markieren, aber stackoverflow erlaubt dies nicht; und er antwortete zuerst. –

2

Dies ergänzt nur die große Antwort von m-w mit einiger Protokollierung es zu helfen, zu verstehen.

$ cd /david/test 
$ find . 
. 
./file1 
./file2 
./file3d 
./file3d/file1 
./file3d/file2 
./file4 

java:

import play.api.libs.iteratee._ 
import java.io.File 
import scala.concurrent.Await 
import scala.concurrent.duration.Duration 

object ExampleEnumerator3 { 
    import scala.concurrent.ExecutionContext.Implicits.global 

    def exampleFn(file: File) { println(s"processing $file") } 

    def list(dir: File): Enumerator[File] = { 
    println(s"list $dir") 
    val initialInput: List[File] = List(dir) 
    Enumerator.unfold(initialInput) { (input: List[File]) => 
     val next: Option[(List[File], File)] = input.headOption.map { file => 
     if(file.isDirectory) { 
      (file.listFiles.toList.sortBy(_.getName) ++ input.tail) -> file 
     } else { 
      input.tail -> file 
     } 
     } 
     next match { 
     case Some(dn) => print(s"value to unfold: $input\n next value to unfold: ${dn._1}\n next input: ${dn._2}\n") 
     case None => print(s"value to unfold: $input\n finished unfold\n") 
     } 
     next 
    } 
    } 

    def main(args: Array[String]) { 
    val dir = new File("/david/test") 
    val res = list(dir).run(Iteratee.foreach(exampleFn)) 
    Await.result(res, Duration.Inf) 
    } 
} 

log:

list /david/test 
value to unfold: List(/david/test) 
    next value to unfold: List(/david/test/file1, /david/test/file2, /david/test/file3d, /david/test/file4) 
    next input: /david/test 
processing /david/test 
value to unfold: List(/david/test/file1, /david/test/file2, /david/test/file3d, /david/test/file4) 
    next value to unfold: List(/david/test/file2, /david/test/file3d, /david/test/file4) 
    next input: /david/test/file1 
processing /david/test/file1 
value to unfold: List(/david/test/file2, /david/test/file3d, /david/test/file4) 
    next value to unfold: List(/david/test/file3d, /david/test/file4) 
    next input: /david/test/file2 
processing /david/test/file2 
value to unfold: List(/david/test/file3d, /david/test/file4) 
    next value to unfold: List(/david/test/file3d/file1, /david/test/file3d/file2, /david/test/file4) 
    next input: /david/test/file3d 
processing /david/test/file3d 
value to unfold: List(/david/test/file3d/file1, /david/test/file3d/file2, /david/test/file4) 
    next value to unfold: List(/david/test/file3d/file2, /david/test/file4) 
    next input: /david/test/file3d/file1 
processing /david/test/file3d/file1 
value to unfold: List(/david/test/file3d/file2, /david/test/file4) 
    next value to unfold: List(/david/test/file4) 
    next input: /david/test/file3d/file2 
processing /david/test/file3d/file2 
value to unfold: List(/david/test/file4) 
    next value to unfold: List() 
    next input: /david/test/file4 
processing /david/test/file4 
value to unfold: List() 
    finished unfold 
2

Dies ergänzt nur die große Antwort von @JonasAnso mit einigen Protokollierung es zu helfen, zu verstehen.

$ cd /david/test 
$ find . 
. 
./file1 
./file2 
./file3d 
./file3d/file1 
./file3d/file2 
./file4 

java:

import play.api.libs.iteratee._ 
import java.io.File 
import scala.concurrent.Await 
import scala.concurrent.duration.Duration 

object ExampleEnumerator2b { 
    import scala.concurrent.ExecutionContext.Implicits.global 

    def exampleFn(file: File) { println(s"processing $file") } 

    def listFiles(dir: File): Enumerator[File] = { 
    println(s"listFiles. START: $dir") 

    if (dir.isDirectory) { 
     val files = dir.listFiles.toList.sortBy(_.getName) 
     Enumerator(dir) andThen Enumerator(files :_*).flatMap(listFiles) 
    } else { 
     Enumerator(dir) 
    } 
    } 

    def main(args: Array[String]) { 
    val dir = new File("/david/test2") 
    val res = listFiles(dir).run(Iteratee.foreach(exampleFn)) 
    Await.result(res, Duration.Inf) 
    } 
} 

log:

listFiles. START: /david/test 
processing /david/test 
listFiles. START: /david/test/file1 
processing /david/test/file1 
listFiles. START: /david/test/file2 
processing /david/test/file2 
listFiles. START: /david/test/file3d 
processing /david/test/file3d 
listFiles. START: /david/test/file3d/file1 
processing /david/test/file3d/file1 
listFiles. START: /david/test/file3d/file2 
processing /david/test/file3d/file2 
listFiles. START: /david/test/file4 
processing /david/test/file4