In einer Funktion gibt es eine Möglichkeit, zwei DStreams nach der Verwendung von filter
zurückzugeben? Wenn ich zum Beispiel eine DStream
filtere, werden die gefilterten in einer DStream
gespeichert und die ungefilterten werden in einer anderen DStream
gespeichert.Wie kann ich zwei DStreams in einer Funktion nach der Filtertransformation im Spark-Streaming zurückgeben?
2
A
Antwort
3
Diese effizienter durchgeführt werden könnte, wenn es gebaut-in wurde, aber
def partition[A](stream: DStream[A])(pred: A => Boolean) {
val stream1 = stream.map(x => (x, pred(x)).cache()
val good = stream1.filter(_._2).map(_._1)
val bad = stream1.filter(!_._2).map(_._1)
(good, bad)
}
Hinweis cache()
ist erforderlich, um sicherzustellen, dass stream1
nur einmal berechnet wird; Wenn pred
einfach genug ist und stream
bereits zwischengespeichert ist, sollte nur (stream.filter(pred), stream.filter(x => !pred(x)))
schneller sein.
Aber der DStream enthält zuerst gefilterte und ungefilterte Elemente. Ich möchte das gefilterte und ungefilterte gleichzeitig zurückgeben! –
Sie meinen also nicht "ungefiltert", Sie meinen "gefiltert mit dem entgegengesetzten Prädikat"? I.e. eine Funktion, die wie "Partition" auf Scala-Sammlungen funktioniert? –
Oh ich denke schon! Es ist "mit dem entgegengesetzten Prädikat gefiltert" –