2016-08-04 27 views
0

Ich versuche, eine Funktion auf alle Elemente einer Spalte in einem Spark-Datenrahmen in Scala anzuwenden. Der Eingang ist ein String, der aussieht wie „{count: 10}“, und ich möchte nur den Int Teil zurückzukehren - in diesem Beispiel 10. Ich habe dies auf einem Spielzeug Beispiel tun können:Anwenden einer Kartenfunktion auf alle Elemente der Spalte in einem Spark-Datenrahmen

val x = List("{\"count\": 107}", "{\"count\": 9}", "{\"count\": 456}")  
val _list = x.map(x => x.substring(10,x.length-1).toInt) 

aber wenn ich versuche, einen UDF auf meinen Datenrahmen anzuwenden bekomme ich einen Fehler:

val getCounts: String => Int = _.substring(10,x.length-1).toInt 
import org.apache.spark.sql.functions.udf 
val myUDF = udf(getCounts) 

df.withColumn("post_shares_int", myUDF('post_shares)).show 

Fehlerausgang:

org.apache.spark.SparkException: Task not serializable 

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2060) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706) 
    at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:56) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:187) 
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) 
    at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) 
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499) 
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) 
.... 

Jede Hilfe, wie dies zu tun, wäre sehr geschätzt.

Antwort

1

Vergessen Sie die benutzerdefinierte UDF, gibt es bereits eine Funktion zur Verfügung für diese Aufgabe, nämlich regexp_extract die here dokumentiert

df.withColumn(
    "post_shares_int", 
    regexp_extract(df("post_shares"), '^{\\w+:(\\d+)}$', 1) 
).show 

der Kommentar unten folgend, ist es am besten get_json_object zu verwenden, die pars json strings

df.withColumn(
    "post_shares_int", 
    get_json_object(df("post_shares"), '$.count') 
).show 
+0

Sie sind Regex extrahieren a JSON-String ... Sollten Sie den JSON nicht einfach analysieren? –

+0

@ cricket_007 du hast absolut recht, ich war mir dieser funktion nicht bewusst, schön! – cheseaux

+0

@ Feynman27 Lösung bearbeitet – cheseaux