2016-07-20 4 views
2

Ich habe derzeit einen DataFrame von Doppel mit ungefähr 20% der Daten, die Nullwerte sind. Ich möchte die Pearson-Korrelation einer Spalte mit jeder anderen Spalte berechnen und die Spalten-IDs der oberen 10 Spalten im DataFrame zurückgeben.Spark: Computing-Korrelationen eines DataFrame mit fehlenden Werten

Ich möchte Nullen mit paarweiser Löschung ausfiltern, ähnlich wie die Option pairwise.complete.obs von R in seiner Pearson-Korrelationsfunktion. Das heißt, wenn einer der beiden Vektoren in einer Korrelationsberechnung eine Null bei einem Index aufweist, möchte ich diese Zeile aus beiden Vektoren entfernen.

ich zur Zeit folgendes:

val df = ... //my DataFrame 
val cols = df.columns 
df.registerTempTable("dataset") 
val target = "Row1" 
val mapped = cols.map {colId => 
    val results = sqlContext.sql(s"SELECT ${target}, ${colId} FROM dataset WHERE (${colId} IS NOT NULL AND ${target} IS NOT NULL)") 
    (results.stat.corr(colId, target) , colId) 
    }.sortWith(_._1 > _._1).take(11).map(_._2) 

Diese sehr langsam läuft, wie jede einzelne Karte Iteration seinen eigenen Job. Gibt es eine Möglichkeit, dies effizient zu tun, vielleicht Statistics.corr im Mllib Verwendung gemäß this SO Question (Spark 1.6 Pearson Correlation)

Antwort

1

Es Funktionen auf DataFrame „na“ ist: DataFrameNaFunctions API Sie in der gleichen Art und Weise arbeiten DataFramStatFunctions tun. Sie können die Zeilen fallen eine Null in eines Ihrer beiden Datenrahmen Spalten mit folgender Syntax enthalten:

myDataFrame.na.drop("any", target, colId) 

wenn Sie Zeilen löschen möchten null eine der Spalten dann enthält es ist: myDataFrame.na.drop ("any")

Wenn Sie den Datenrahmen auf die zwei Spalten beschränken, die Ihnen zuerst wichtig sind, können Sie die zweite Methode verwenden und verbose vermeiden!

Als solcher Code werden würde:

val df = ??? //my DataFrame 
val cols = df.columns 
val target = "Row1" 
val mapped = cols.map {colId => 
    val resultDF = df.select(target, colId).na.drop("any") 
    (resultDF.stat.corr(target, colId) , colId) 
    }.sortWith(_._1 > _._1).take(11).map(_._2) 

Hope this Ihnen hilft.

+0

Danke für die Antwort Tristan! Während DataFrameNaFunctions die Lesbarkeit meines Codes verbessert, bietet es nicht die Beschleunigung, nach der ich gesucht habe. Zur Zeit iteriere ich über einen verteilten Datenrahmen (df) mit einer nicht verteilten Datenquelle (cols). Im Idealfall möchte ich eine Transformation in meinem DataFrame verwenden, die eine columnId auf einen Korrelationswert abbildet, um die Parallelisierung zu maximieren –