2015-05-13 9 views
17

Ich versuche, verschiedene Möglichkeiten zum Aggregieren meiner Daten zu vergleichen.Spark: Wie übersetzt count (distinct (value)) in die Dataframe-APIs

Dies ist meine Eingangsdaten mit 2 Elementen (Seite, Besucher):

(PAG1,V1) 
(PAG1,V1) 
(PAG2,V1) 
(PAG2,V2) 
(PAG2,V1) 
(PAG1,V1) 
(PAG1,V2) 
(PAG1,V1) 
(PAG1,V2) 
(PAG1,V1) 
(PAG2,V2) 
(PAG1,V3) 

mit einem SQL-Befehl Arbeiten in Spark-SQL mit diesem Code:

import sqlContext.implicits._ 
case class Log(page: String, visitor: String) 
val logs = data.map(p => Log(p._1,p._2)).toDF() 
logs.registerTempTable("logs") 
val sqlResult= sqlContext.sql(
           """select page 
             ,count(distinct visitor) as visitor 
            from logs 
           group by page 
           """) 
val result = sqlResult.map(x=>(x(0).toString,x(1).toString)) 
result.foreach(println) 

ich diese Ausgabe:

(PAG1,3) // PAG1 has been visited by 3 different visitors 
(PAG2,2) // PAG2 has been visited by 2 different visitors 

Nun möchte ich das gleiche Ergebnis mit Dataframes und thiers API erhalten, aber ich kann nicht das gleiche erhalten Ausgang:

import sqlContext.implicits._ 
case class Log(page: String, visitor: String) 
val logs = data.map(p => Coppia(p._1,p._2)).toDF() 
val result = log.select("page","visitor").groupBy("page").count().distinct 
result.foreach(println) 

In der Tat, das ist, was ich als Ausgabe erhalten:

[PAG1,8] // just the simple page count for every page 
[PAG2,4] 

Es ist wahrscheinlich etwas dumm, aber ich kann es jetzt nicht sehen.

Vielen Dank im Voraus!

FF

Antwort

36

Was Sie brauchen, ist die Datenrahmen Aggregationsfunktion countDistinct:

import sqlContext.implicits._ 
import org.apache.spark.sql.functions._ 

case class Log(page: String, visitor: String) 

val logs = data.map(p => Coppia(p._1,p._2)) 
      .toDF() 

val result = log.select("page","visitor") 
      .groupBy('page) 
      .agg('page, countDistinct('visitor)) 

result.foreach(println) 
+2

ich diesen Fehler -> nicht gefunden: Wert CountDistinct –

+1

es ist eine Methode, bei 'org.apache.spark.sql .functions', importiere das :), edit done. –

+0

mit intelliJ Ich muss den Befehl agg/countDistinct wie diese .agg (org.apache.spark.sql.functions.countDistinct ("visitor")) schreiben, weil ich selbst org.apache.spark.sql importiert habe. Funktionen es gibt mir immer noch den gleichen Fehler ... trotzdem funktioniert das, aber ich bekomme nur die Besucherspalte und keine Seitenspalte ([2], [3]) ... was fehlt mir? –