2013-06-30 3 views
6

Gibt es eine einfache Möglichkeit, Scala Parallelsammlungen zu verwenden, ohne eine vollständige Sammlung in den Speicher zu laden?Parallelsammlung Verarbeitung von Daten größer als Speichergröße

Zum Beispiel habe ich eine große Sammlung und ich möchte eine bestimmte Operation (falten) parallel nur auf einem kleinen Stück, das in den Speicher passt, als auf einem anderen Stück und so weiter, und schließlich rekombinieren Ergebnisse aus alle Stücke.

Ich weiß, dass Schauspieler verwendet werden könnten, aber es wäre wirklich nett, Par-Sammlungen zu verwenden.

Ich habe eine Lösung geschrieben, aber es ist nicht schön:

def split[A](list: Iterable[A], chunkSize: Int): Iterable[Iterable[A]] = { 
    new Iterator[Iterable[A]] { 
     var rest = list 
     def hasNext = !rest.isEmpty 
     def next = { 
     val chunk = rest.take(chunkSize) 
     rest = rest.drop(chunkSize) 
     chunk 
     } 
    }.toIterable 
    }            

    def foldPar[A](acc: A)(list: Iterable[A], chunkSize: Int, combine: ((A, A) => A)): A = { 
    val chunks: Iterable[Iterable[A]] = split(list, chunkSize) 
    def combineChunk: ((A,Iterable[A]) => A) = { case (res, entries) => entries.par.fold(res)(combine) } 
    chunks.foldLeft(acc)(combineChunk) 
    }            

    val chunkSize = 10000000       
    val x = 1 to chunkSize*10     

    def sum: ((Int,Int) => Int) = {case (acc,n) => acc + n } 

    foldPar(0)(x,chunkSize,sum) 
+1

Ich würde sagen, dass hier richtig Berechnungsmodell * Karte reduzieren * sein wird (und somit könnte es sein [Funken] (http://spark-project.org/examples/)), nicht Schauspieler an sich. –

+0

Formal - ja, aber die Verarbeitungszeit ist in diesem Fall nicht sinnvoll, also ist es völlig in Ordnung, auf einer einzigen Maschine zu laufen. –

Antwort

4

Ihre Idee ist sehr gepflegt und es ist schade, es keine solche Funktion bereits verfügbar ist (AFAIK).

Ich habe nur Ihre Idee in ein bisschen kürzer Code umformuliert. Erstens glaube ich, dass es zum parallelen Falten nützlich ist, das Konzept von monoid zu verwenden - es ist eine Struktur mit einer assoziativen Operation und einem Nullelement. Die Assoziativität ist wichtig, weil wir die Reihenfolge nicht kennen, in der wir Ergebnisse kombinieren, die parallel berechnet werden. Und das Null-Element ist wichtig, damit wir Berechnungen in Blöcke aufteilen und jeden von der Null falten können. Es gibt aber nichts Neues, es ist genau das, was fold für die Scala-Kollektionen erwartet.

// The function defined by Monoid's apply must be associative 
// and zero its identity element. 
trait Monoid[A] 
    extends Function2[A,A,A] 
{ 
    val zero: A 
} 

nächstes Scala Iterator s haben bereits eine nützliche Methode, die den grouped(Int): GroupedIterator[Seq[A]] Iterator in feste Größe Sequenzen Scheiben schneiden. Es ist Ihrem split ziemlich ähnlich. Dies ermöglicht es uns, die Eingabe in Blöcke fester Größe zu schneiden und dann Scala parallel Erhebungsmethoden auf sie anzuwenden sind:

def parFold[A](c: Iterator[A], blockSize: Int)(implicit monoid: Monoid[A]): A = 
    c.grouped(blockSize).map(_.par.fold(monoid.zero)(monoid)) 
         .fold(monoid.zero)(monoid); 

Wir falten jeden Block den parallelen Sammlungen Framework und dann (ohne Parallelisierung) kombinieren, um die Zwischenergebnisse.

Ein Beispiel:

// Example: 
object SumMonoid extends Monoid[Long] { 
    override val zero: Long = 0; 
    override def apply(x: Long, y: Long) = x + y; 
} 
val it = Iterator.range(1, 10000001).map(_.toLong) 
println(parFold(it, 100000)(SumMonoid)); 
+0

Nette Verwendung von Monoid, weiß es noch nie zuvor. Was die gruppierte Methode anbelangt, so hatte ich Zweifel, dass sie ganze Sachen in den Speicher laden könnte, aber es stellte sich heraus, dass dies nicht der Fall ist. –

+0

Ich werde Ihre Lösung ein wenig später testen, aber es scheint, dass es funktionieren sollte und es ist viel prägnanter. Danke vielmals! –

+0

@MikhailGolubtsov Bitte lassen Sie mich wissen, wie Ihre Tests gehen, bin ich auch neugierig. Ich habe selbst nur sehr grundlegende Tests gemacht. –