2016-05-27 4 views
7

Ich habe eine einfache Datenrahmen wie folgt aus:Pivot String Spalte auf Pyspark Datenrahmen

rdd = sc.parallelize(
    [ 
     (0, "A", 223,"201603", "PORT"), 
     (0, "A", 22,"201602", "PORT"), 
     (0, "A", 422,"201601", "DOCK"), 
     (1,"B", 3213,"201602", "DOCK"), 
     (1,"B", 3213,"201601", "PORT"), 
     (2,"C", 2321,"201601", "DOCK") 
    ] 
) 
df_data = sqlContext.createDataFrame(rdd, ["id","type", "cost", "date", "ship"]) 

df_data.show() 
+---+----+----+------+----+ 
| id|type|cost| date|ship| 
+---+----+----+------+----+ 
| 0| A| 223|201603|PORT| 
| 0| A| 22|201602|PORT| 
| 0| A| 422|201601|DOCK| 
| 1| B|3213|201602|DOCK| 
| 1| B|3213|201601|PORT| 
| 2| C|2321|201601|DOCK| 
+---+----+----+------+----+ 

und ich brauche es nach dem Datum zu schwenken:

df_data.groupby(df_data.id, df_data.type).pivot("date").avg("cost").show() 

+---+----+------+------+------+ 
| id|type|201601|201602|201603| 
+---+----+------+------+------+ 
| 2| C|2321.0| null| null| 
| 0| A| 422.0| 22.0| 223.0| 
| 1| B|3213.0|3213.0| null| 
+---+----+------+------+------+ 

Alles funktioniert wie erwartet. Aber jetzt muss ich es schwenken und eine nicht-numerische Spalte zu erhalten:

df_data.groupby(df_data.id, df_data.type).pivot("date").avg("ship").show() 

und natürlich würde ich eine Ausnahme erhalten:

AnalysisException: u'"ship" is not a numeric column. Aggregation function can only be applied on a numeric column.;' 

Ich mag würde etwas auf der Linie der

erzeugen

Ist das mit pivot möglich?

Antwort

10

Unter der Annahme, dass (id |type | date) Kombinationen sind einzigartig und Ihr einziges Ziel ist schwenkbar und nicht die Aggregation können Sie first (oder jede andere Funktion nicht auf numerische Werte beschränkt) verwenden:

from pyspark.sql.functions import first 

(df_data 
    .groupby(df_data.id, df_data.type) 
    .pivot("date") 
    .agg(first("ship")) 
    .show()) 

## +---+----+------+------+------+ 
## | id|type|201601|201602|201603| 
## +---+----+------+------+------+ 
## | 2| C| DOCK| null| null| 
## | 0| A| DOCK| PORT| PORT| 
## | 1| B| PORT| DOCK| null| 
## +---+----+------+------+------+ 

Wenn diese Annahmen nicht korrigieren‘ Ich muss Ihre Daten vorab aggregieren. Zum Beispiel für die gängigsten ship Werte:

from pyspark.sql.functions import max, struct 

(df_data 
    .groupby("id", "type", "date", "ship") 
    .count() 
    .groupby("id", "type") 
    .pivot("date") 
    .agg(max(struct("count", "ship"))) 
    .show()) 

## +---+----+--------+--------+--------+ 
## | id|type| 201601| 201602| 201603| 
## +---+----+--------+--------+--------+ 
## | 2| C|[1,DOCK]| null| null| 
## | 0| A|[1,DOCK]|[1,PORT]|[1,PORT]| 
## | 1| B|[1,PORT]|[1,DOCK]| null| 
## +---+----+--------+--------+--------+