2016-04-21 18 views

Antwort

3

Diese effizienter durchgeführt werden könnte, wenn es gebaut-in wurde, aber

def partition[A](stream: DStream[A])(pred: A => Boolean) { 
    val stream1 = stream.map(x => (x, pred(x)).cache() 
    val good = stream1.filter(_._2).map(_._1) 
    val bad = stream1.filter(!_._2).map(_._1) 
    (good, bad) 
} 

Hinweis cache() ist erforderlich, um sicherzustellen, dass stream1 nur einmal berechnet wird; Wenn pred einfach genug ist und stream bereits zwischengespeichert ist, sollte nur (stream.filter(pred), stream.filter(x => !pred(x))) schneller sein.

+0

Aber der DStream enthält zuerst gefilterte und ungefilterte Elemente. Ich möchte das gefilterte und ungefilterte gleichzeitig zurückgeben! –

+0

Sie meinen also nicht "ungefiltert", Sie meinen "gefiltert mit dem entgegengesetzten Prädikat"? I.e. eine Funktion, die wie "Partition" auf Scala-Sammlungen funktioniert? –

+0

Oh ich denke schon! Es ist "mit dem entgegengesetzten Prädikat gefiltert" –