2016-06-10 7 views
3

Mit einem Datenrahmen wie diese,Füllen Pyspark Datenrahmen Spalte Nullwert mit Mittelwert aus derselben Spalte

rdd_2 = sc.parallelize([(0,10,223,"201601"), (0,10,83,"2016032"),(1,20,None,"201602"),(1,20,3003,"201601"), (1,20,None,"201603"), (2,40, 2321,"201601"), (2,30, 10,"201602"),(2,61, None,"201601")]) 

df_data = sqlContext.createDataFrame(rdd_2, ["id", "type", "cost", "date"]) 
df_data.show() 

+---+----+----+-------+ 
| id|type|cost| date| 
+---+----+----+-------+ 
| 0| 10| 223| 201601| 
| 0| 10| 83|2016032| 
| 1| 20|null| 201602| 
| 1| 20|3003| 201601| 
| 1| 20|null| 201603| 
| 2| 40|2321| 201601| 
| 2| 30| 10| 201602| 
| 2| 61|null| 201601| 
+---+----+----+-------+ 

Ich brauche die Nullwert mit dem Mittelwert der vorhandenen Werte zu füllen, wobei das erwartete Ergebnis

seinem
+---+----+----+-------+ 
| id|type|cost| date| 
+---+----+----+-------+ 
| 0| 10| 223| 201601| 
| 0| 10| 83|2016032| 
| 1| 20|1128| 201602| 
| 1| 20|3003| 201601| 
| 1| 20|1128| 201603| 
| 2| 40|2321| 201601| 
| 2| 30| 10| 201602| 
| 2| 61|1128| 201601| 
+---+----+----+-------+ 

wobei 1128 der Durchschnitt der vorhandenen Werte ist. Ich muss das für mehrere Spalten tun.

Mein aktueller Ansatz ist na.fill zu verwenden:

fill_values = {column: df_data.agg({column:"mean"}).flatMap(list).collect()[0] for column in df_data.columns if column not in ['date','id']} 
df_data = df_data.na.fill(fill_values) 

+---+----+----+-------+ 
| id|type|cost| date| 
+---+----+----+-------+ 
| 0| 10| 223| 201601| 
| 0| 10| 83|2016032| 
| 1| 20|1128| 201602| 
| 1| 20|3003| 201601| 
| 1| 20|1128| 201603| 
| 2| 40|2321| 201601| 
| 2| 30| 10| 201602| 
| 2| 61|1128| 201601| 
+---+----+----+-------+ 

Aber das ist sehr umständlich. Irgendwelche Ideen?

+0

Hat das getan. Ich war nur überrascht über die Anzahl der Reifen, die ich durchmachen musste, um so etwas zu machen. – Ivan

Antwort

8

Nun, eine oder andere Weise, Sie zu haben:

  • berechnen Statistiken
  • füllen die freien Räume

Es ziemlich Grenzen, was Sie wirklich hier verbessern können diese Frage nicht zu erwähnen, würde passt besser zu CodeReview. Still:

  • ersetzen flatMap(list).collect()[0] mit first()[0] oder Struktur
  • berechnen alle Werte mit einer einzigen Aktion
  • Verwendung Einbau- Methoden Wörterbuch

Das Endergebnis könnte dies gerne extrahieren Auspacken:

def fill_with_mean(df, exclude=set()): 
    stats = df.agg(*(
     avg(c).alias(c) for c in df.columns if c not in exclude 
    )) 
    return df.na.fill(stats.first().asDict()) 

fill_with_mean(df_data, ["id", "date"]) 

In Spark 2.2 oder später können Sie auch Imputer verwenden. Siehe Replace missing values with mean - Spark Dataframe.

+0

Ich bekomme 'name 'avg' ist nicht definiert in spark 2.0.x :( – Kevad

+1

@Kevad importieren pyspark.sql.functions als fn, dann verwenden Sie fn.avg –

+0

Hat jemand diese Funktion verbessert? Nehmen Sie viel zu viel Computing Zeit auf meiner Seite!? :) –