2016-04-14 9 views
1

Ich möchte meinen Dataframe mit ausgewählten Spalten sortieren, indem Sie sie von Stringtype auf Prederred-Typ und Prederred-Reihenfolge umwandeln. Aber selbst eine einfache Umwandlung einer Spalte funktioniert nicht und gibt diese Ausnahme. Ich stelle den Beispielcode hier zur Verfügung.Scala + Spark + Dataframe-Ausnahme Wenn ich versuche, eine Spalte dynamisch zu werfen und Sortierreihenfolge zuzuweisen

val conf = new SparkConf().setAppName("Sparkify").setMaster("local[*]") 
    val sparkContext =new SparkContext(conf) 
    val sqlContext = new SQLContext(sparkContext) 
    var df = sqlContext.read 
     .format("com.databricks.spark.csv") 
     .option("header", "true") 
     .load("example-data.csv") 
// val colsToSort= List("age") 
    df = df.sort(df.col("age").cast(IntegerType).desc) 
    df.show() 
    sparkContext.stop() 

Die einfache csv sieht wie folgt aus

+-----+---+---+ 
| name|sex|age| 
+-----+---+---+ 
|Alice| f| 34| 
| Bob| m| 63| 
|Alice| f| 14| 
| Bob| m| 6| 
+-----+---+---+ 

Die detaillierte Ausnahme-Stack.

Exception in thread "main" org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: unresolvedalias(cast(cast(age#2 as decimal(20,0)) as int)) 
    at org.apache.spark.sql.catalyst.analysis.UnresolvedAlias.dataType(unresolved.scala:295) 
    at org.apache.spark.sql.catalyst.expressions.SortOrder.dataType(SortOrder.scala:49) 
    at org.apache.spark.sql.catalyst.expressions.SortOrder.checkInputDataTypes(SortOrder.scala:42) 
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:62) 
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319) 
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:318) 
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:107) 
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:117) 
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:121) 

Was mache ich falsch? Oder was ist die beste Möglichkeit, einen Sortierausdruck basierend auf mehreren Spalten mit Typ Casting und Reihenfolge dynamisch zu deklarieren.

Kann mir bitte jemand helfen?

Antwort

0

Sie können die neue Spalte nicht innerhalb der sort-Funktion erstellen - Sie müssen es vor der sort tun. Versuchen Sie so etwas wie dieses stattdessen:

df.withColumn("age", $"age".cast(IntegerType)).sort($"age".desc) 

Wenn Sie eine Variable für den Spaltennamen verwenden möchten, versuchen Sie dies:

val colName = "age" 
df.withColumn(colName, col(colName).cast(IntegerType)).sort(col(colName).desc) 

Beachten Sie, dass, es sei denn, Sie sind die Verbindung von zwei Tabellen mit demselben genannten Spalte , können Sie im Allgemeinen die df. vor col(...) verlassen. Man könnte sogar die sort Spalte erstellen, bevor Sie die df erstellen und es funktioniert gut:

val sortCol = col("age").desc 
val df = ... 
df.sort(sortCol) 

Auf diese Weise mehrere die gleiche sort problemlos anwenden können DataFrames

Wenn Sie mehr als eine Spalte werfen wollen , könnten Sie einfach tun:

df.withColumn(...).withColumn(...).withColumn(...) 

Wo jeder der withColumn ein gegossener Ausdruck ist. Oder Sie tun können:

df.select($"age".cast(IntegerType), $"otherCol".cast(DoubleType), ...) 
+0

Lassen Sie uns sagen, wenn ich den ersten Ansatz bin, ist es trotzdem kann ich dies tun für eine variable Zeichenfolge, die den Namen der Spalte hält eher String? Das implizite funktioniert nicht mit Variablen, sowie "var colName =" alter " df.withColumn (colName, df.col (colName) .cast (IntegerType)). Sort (df.col (colName) .desc) '. Kannst du mir bitte dabei helfen? – NehaM

+0

Das funktioniert perfekt für mich: 'val colName =" age "; df.withColumn (colName, df.col (colName) .cast (IntegerType)) ' –

+0

Warten Sie einen Fehler mit der' sort'. Versuchen Sie Folgendes: 'val colName =" alter "; df.withColumn (colName, df.col (colName) .cast (IntegerType)). sort (col (colName) .desc) '- Beachten Sie nicht' df.col (colName) ', sondern' col (colName) '. –