Sie verschiedene Methoden auf dem RDD aufrufen, die Funktionen als Parameter akzeptieren.
// set up an example -- an RDD of arrays
val sparkConf = new SparkConf().setMaster("local").setAppName("Example")
val sc = new SparkContext(sparkConf)
val testData = Array(Array(1,2,3), Array(4,5,6,7,8))
val testRDD = sc.parallelize(testData, 2)
// Print the RDD of arrays.
testRDD.collect().foreach(a => println(a.size))
// Use map() to create an RDD with the array sizes.
val countRDD = testRDD.map(a => a.size)
// Print the elements of this new RDD.
countRDD.collect().foreach(a => println(a))
// Use filter() to create an RDD with just the longer arrays.
val bigRDD = testRDD.filter(a => a.size > 3)
// Print each remaining array.
bigRDD.collect().foreach(a => {
a.foreach(e => print(e + " "))
println()
})
}
Beachten Sie, dass die Funktionen, die Sie ein einzelnes Element RDD als Eingabe akzeptieren schreiben und Daten von einem gewissen einheitlichen Typ zurückgeben, so dass Sie eine RDD des letzteren Typs erstellen. Zum Beispiel ist countRDD
ein RDD[Int]
, während bigRDD
immer noch ein RDD[Array[Int]]
ist.
Es wird wahrscheinlich verlockend sein, irgendwann eine foreach
schreiben, die einige andere Daten ändert, aber Sie sollten aus den Gründen in this question and answer widerstehen.
Edit: Versuchen Sie nicht, große RDD
s
Mehrere Leser über die Verwendung von collect()
und println()
, um ihre Ergebnisse zu sehen, wie im Beispiel oben gefragt haben, zu drucken. Dies funktioniert natürlich nur, wenn Sie in einem interaktiven Modus wie dem Spark REPL (read-eval-print-loop) arbeiten. Am besten rufen Sie collect()
auf dem RDD auf, um ein sequentielles Array für den ordnungsgemäßen Druck zu erhalten. Aber collect()
kann zu viele Daten zurückbringen und auf jeden Fall zu viel gedruckt werden. Hier sind einige alternative Möglichkeiten, Einblick in Ihre RDD
s zu erhalten, wenn sie sind groß:
RDD.take()
: Das gibt Ihnen die Feinsteuerung über die Anzahl der Elemente, die Sie erhalten, aber nicht, woher sie kamen - definiert als die "erste", das ist ein Konzept, das durch verschiedene andere Fragen und Antworten hier behandelt wird.
// take() returns an Array so no need to collect()
myHugeRDD.take(20).foreach(a => println(a))
RDD.sample()
: Auf diese Weise können Sie (grob) den Anteil der Ergebnisse steuern Sie erhalten, ob mit einer Stichprobe verwendet Ersatz und auch optional die Zufallszahl Samen.
// sample() does return an RDD so you may still want to collect()
myHugeRDD.sample(true, 0.01).collect().foreach(a => println(a))
RDD.takeSample()
: Dies ist ein Hybrid: mit Stichproben, die Sie steuern können, aber beide lassen Sie die genaue Anzahl der Ergebnisse festlegen und eine Array
zurück.
// takeSample() returns an Array so no need to collect()
myHugeRDD.takeSample(true, 20).foreach(a => println(a))
RDD.count()
: Manchmal kommt die beste Einsicht aus, wie viele Elemente, die Sie mit am Ende - ich dieses erste oft tun.
println(myHugeRDD.count())
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD – Malcolm