Wie eliasah wies darauf hin, foreach
(und foreachRDD
) keinen Wert zurückgeben; sie sind nur für Nebenwirkungen. Wenn Sie etwas zurückgeben möchten, benötigen Sie map
. Basierend auf Ihre zweite Lösung:
val rmse = output.map(rdd => new RegressionMetrics(rdd).rootMeanSquaredError)
Es sieht besser aus, wenn Sie eine kleine Funktion für sie machen:
val getRmse = (rdd: RDD) => new RegressionMetrics(rdd).rootMeanSquaredError
val rmse = output.map(getRmse)
leer RDDs Ignorieren,
val rmse = output.filter(_.nonEmpty).map(getRmse)
Hier ist genau die gleiche Reihenfolge wie ein Verständnis für das Verständnis. Es ist nur syntaktischer Zucker für Karte, flatMap und Filter, aber ich dachte, es war viel einfacher zu verstehen, wenn ich zum ersten Mal Scala lernte:
val rmse = for {
rdd <- output
if (rdd.nonEmpty)
} yield new RegressionMetrics(rdd).rootMeanSquaredError
Und hier ist eine Funktion, die Fehler, wie Ihr erster Versuch Summieren:
def calculateRmse(output: DStream[(Double, Double)]): Double = {
val getRmse = (rdd: RDD) => new RegressionMetrics(rdd).rootMeanSquaredError
output.filter(_.nonEmpty).map(getRmse).reduce(_+_)
}
Die Beschwerde des Compilers über nonEmpty
ist tatsächlich ein Problem mit DStreams filter
Methode. Anstatt auf den RDDs im DStream zu operieren, arbeitet filter
mit den Doppelpaaren (Double, Double)
, die von Ihrem DStream-Typparameter vorgegeben werden.
Ich weiß nicht genug über Spark zu sagen, es ist ein Fehler, aber es ist sehr seltsam. Filter
und die meisten anderen Operationen über Sammlungen sind normalerweise defined in terms of foreach, aber DStream implementiert diese Funktionen, ohne die gleiche Konvention zu befolgen; Die veraltete Methode foreach
und die aktuelle foreachRDD
werden beide über die RDDs des Streams betrieben, aber its other higher-order methods don't.
Also meine Methode wird nicht funktionieren. DSTREAM hat wahrscheinlich einen guten Grund für die seltsamen Wesen (Leistung zu tun?) Hier ist wahrscheinlich schlecht, wie es mit foreach
zu tun:
def calculateRmse(ds: DStream[(Double, Double)]): Double = {
var totalError: Double = 0
def getRmse(rdd:RDD[(Double, Double)]): Double = new RegressionMetrics(rdd).rootMeanSquaredError
ds.foreachRDD((rdd:RDD[(Double, Double)]) => if (!rdd.isEmpty) totalError += getRmse(rdd))
totalError
}
Aber es funktioniert!
@Yuval Itzchakov: Ich möchte quadratischen Mittelfehler für die Streaming-Daten berechnen. Vielleicht ist die Art, wie ich an diese Aufgabe herangehe, falsch. Wenn ja, würde ich gerne den korrekten Weg kennen, vorausgesetzt, dass die Eingabedaten vom Typ DStream [(Double, Double)] sind. – Klue
Ich bin mir immer noch nicht sicher, was Sie tun. – eliasah
@eliasah: DStream enthält Doppelpaare, z. ((5.0, 5.2), (5.1, 5.15) ...) Angenommen, das erste Element in dem Paar ist ein tatsächlicher Wert, während das zweite Element ein vorhergesagter Wert ist. Was ich tun muss, ist, einen Fehler zwischen tatsächlichen und vorhergesagten Werten zu berechnen, indem ich die mittlere quadratische Fehler (RMSE) Metrik verwendet. Wenn ich neue Daten im Streaming bekomme, sollte RMSE sich offensichtlich ändern (d. H. Es sollte mit meiner Funktion "calculateRMSE" neu berechnet werden). Ist das unmöglich? – Klue