2016-05-02 17 views
1

Hier ist meine Funktion, die quadratischen Fehler berechnet. Die letzte Zeile kann jedoch wegen des Fehlers Type mismatch issue (expected: Double, actual: Unit) nicht übersetzt werden. Ich habe viele verschiedene Wege versucht, um dieses Problem zu lösen, aber immer noch ohne Erfolg. Irgendwelche Ideen?Wie zu lösen Typ Mismatch Problem (erwartet: Doppelt, tatsächlich: Einheit)

def calculateRMSE(output: DStream[(Double, Double)]): Double = { 
     val summse = output.foreachRDD { rdd => 
      rdd.map { 
       case pair: (Double, Double) => 
       val err = math.abs(pair._1 - pair._2); 
       err*err 
      }.reduce(_ + _) 
     } 
     // math.sqrt(summse) HOW TO APPLY SQRT HERE? 
    } 
+0

@Yuval Itzchakov: Ich möchte quadratischen Mittelfehler für die Streaming-Daten berechnen. Vielleicht ist die Art, wie ich an diese Aufgabe herangehe, falsch. Wenn ja, würde ich gerne den korrekten Weg kennen, vorausgesetzt, dass die Eingabedaten vom Typ DStream [(Double, Double)] sind. – Klue

+0

Ich bin mir immer noch nicht sicher, was Sie tun. – eliasah

+0

@eliasah: DStream enthält Doppelpaare, z. ((5.0, 5.2), (5.1, 5.15) ...) Angenommen, das erste Element in dem Paar ist ein tatsächlicher Wert, während das zweite Element ein vorhergesagter Wert ist. Was ich tun muss, ist, einen Fehler zwischen tatsächlichen und vorhergesagten Werten zu berechnen, indem ich die mittlere quadratische Fehler (RMSE) Metrik verwendet. Wenn ich neue Daten im Streaming bekomme, sollte RMSE sich offensichtlich ändern (d. H. Es sollte mit meiner Funktion "calculateRMSE" neu berechnet werden). Ist das unmöglich? – Klue

Antwort

2

Wie eliasah wies darauf hin, foreach (und foreachRDD) keinen Wert zurückgeben; sie sind nur für Nebenwirkungen. Wenn Sie etwas zurückgeben möchten, benötigen Sie map. Basierend auf Ihre zweite Lösung:

val rmse = output.map(rdd => new RegressionMetrics(rdd).rootMeanSquaredError) 

Es sieht besser aus, wenn Sie eine kleine Funktion für sie machen:

val getRmse = (rdd: RDD) => new RegressionMetrics(rdd).rootMeanSquaredError 

val rmse = output.map(getRmse) 

leer RDDs Ignorieren,

val rmse = output.filter(_.nonEmpty).map(getRmse) 

Hier ist genau die gleiche Reihenfolge wie ein Verständnis für das Verständnis. Es ist nur syntaktischer Zucker für Karte, flatMap und Filter, aber ich dachte, es war viel einfacher zu verstehen, wenn ich zum ersten Mal Scala lernte:

val rmse = for { 
    rdd <- output 
    if (rdd.nonEmpty) 
} yield new RegressionMetrics(rdd).rootMeanSquaredError 

Und hier ist eine Funktion, die Fehler, wie Ihr erster Versuch Summieren:

def calculateRmse(output: DStream[(Double, Double)]): Double = { 

val getRmse = (rdd: RDD) => new RegressionMetrics(rdd).rootMeanSquaredError 

output.filter(_.nonEmpty).map(getRmse).reduce(_+_) 
} 

Die Beschwerde des Compilers über nonEmpty ist tatsächlich ein Problem mit DStreams filter Methode. Anstatt auf den RDDs im DStream zu operieren, arbeitet filter mit den Doppelpaaren (Double, Double), die von Ihrem DStream-Typparameter vorgegeben werden.

Ich weiß nicht genug über Spark zu sagen, es ist ein Fehler, aber es ist sehr seltsam. Filter und die meisten anderen Operationen über Sammlungen sind normalerweise defined in terms of foreach, aber DStream implementiert diese Funktionen, ohne die gleiche Konvention zu befolgen; Die veraltete Methode foreach und die aktuelle foreachRDD werden beide über die RDDs des Streams betrieben, aber its other higher-order methods don't.

Also meine Methode wird nicht funktionieren. DSTREAM hat wahrscheinlich einen guten Grund für die seltsamen Wesen (Leistung zu tun?) Hier ist wahrscheinlich schlecht, wie es mit foreach zu tun:

def calculateRmse(ds: DStream[(Double, Double)]): Double = { 

    var totalError: Double = 0 

    def getRmse(rdd:RDD[(Double, Double)]): Double = new RegressionMetrics(rdd).rootMeanSquaredError 

    ds.foreachRDD((rdd:RDD[(Double, Double)]) => if (!rdd.isEmpty) totalError += getRmse(rdd)) 

    totalError 
} 

Aber es funktioniert!

+0

Danke. Die Funktion 'calculateRMSE' kompiliert nicht - sie sagt' Kann das Symbol rdd, + und nonEmpty nicht auflösen'. Weißt du, warum das passiert? – Klue

+0

Im Fall von 'nonEmpty' steht auch' Type mismatch, expected ((Double, Double)) => Boolean, actual ((Double, Double)) => Any'. – Klue

+0

Whoops. Ich habe einen Fehler in 'getRmse' gemacht, Klammern sollten nur um die linke Hälfte der Funktion herumliegen. Aber die Probleme mit 'rdd' und' +' sind etwas schlechter. Ich werde meine Antwort bearbeiten. –

0

ich es geschafft, diese Aufgabe zu tun, wie folgt:

import org.apache.spark.mllib.evaluation.RegressionMetrics 

output.foreachRDD { rdd => 
    if (!rdd.isEmpty) 
    { 
     val metrics = new RegressionMetrics(rdd) 
     val rmse = metrics.rootMeanSquaredError 
     println("RMSE: " + rmse) 
    } 
} 
+0

Haben Sie zumindest verstanden, was Ihr Fehler war? – eliasah

+0

Ja, ich hoffe es :) 'foreachRDD' gibt keinen Wert zurück. Ich sollte zwar MSE über verschiedene RDDs ansammeln und dann RMSE schätzen. Aber der Ansatz ist anders, wie in der Antwort gezeigt. – Klue

+0

Sie tun immer noch nichts mit dem RMSE außer Drucken – eliasah