Ich versuche, verschiedene Möglichkeiten zum Aggregieren meiner Daten zu vergleichen.Spark: Wie übersetzt count (distinct (value)) in die Dataframe-APIs
Dies ist meine Eingangsdaten mit 2 Elementen (Seite, Besucher):
(PAG1,V1)
(PAG1,V1)
(PAG2,V1)
(PAG2,V2)
(PAG2,V1)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG2,V2)
(PAG1,V3)
mit einem SQL-Befehl Arbeiten in Spark-SQL mit diesem Code:
import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Log(p._1,p._2)).toDF()
logs.registerTempTable("logs")
val sqlResult= sqlContext.sql(
"""select page
,count(distinct visitor) as visitor
from logs
group by page
""")
val result = sqlResult.map(x=>(x(0).toString,x(1).toString))
result.foreach(println)
ich diese Ausgabe:
(PAG1,3) // PAG1 has been visited by 3 different visitors
(PAG2,2) // PAG2 has been visited by 2 different visitors
Nun möchte ich das gleiche Ergebnis mit Dataframes und thiers API erhalten, aber ich kann nicht das gleiche erhalten Ausgang:
import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Coppia(p._1,p._2)).toDF()
val result = log.select("page","visitor").groupBy("page").count().distinct
result.foreach(println)
In der Tat, das ist, was ich als Ausgabe erhalten:
[PAG1,8] // just the simple page count for every page
[PAG2,4]
Es ist wahrscheinlich etwas dumm, aber ich kann es jetzt nicht sehen.
Vielen Dank im Voraus!
FF
ich diesen Fehler -> nicht gefunden: Wert CountDistinct –
es ist eine Methode, bei 'org.apache.spark.sql .functions', importiere das :), edit done. –
mit intelliJ Ich muss den Befehl agg/countDistinct wie diese .agg (org.apache.spark.sql.functions.countDistinct ("visitor")) schreiben, weil ich selbst org.apache.spark.sql importiert habe. Funktionen es gibt mir immer noch den gleichen Fehler ... trotzdem funktioniert das, aber ich bekomme nur die Besucherspalte und keine Seitenspalte ([2], [3]) ... was fehlt mir? –