2016-05-24 8 views
8

Ich habe ein Spark-Datenframe mit folgender Struktur. Das bodyText_token hat die Tokens (verarbeitete Wörter). Und ich habe eine verschachtelte Liste der definierten SchlüsselwörterÜbergeben einer Datenrahmen Spalte und externe Liste zu udf unter mitColumn

root 
|-- id: string (nullable = true) 
|-- body: string (nullable = true) 
|-- bodyText_token: array (nullable = true) 

keyword_list=['union','workers','strike','pay','rally','free','immigration',], 
['farmer','plants','fruits','workers'],['outside','field','party','clothes','fashions']] 

ich überprüfen musste, wie viele Token unter jedem Keyword-Liste fallen und das Ergebnis als eine neue Spalte des vorhandenen Datenrahmen hinzufügen. ZB: wenn tokens =["become", "farmer","rally","workers","student"] wird das Ergebnis -> [1,2,0]

Die folgende Funktion funktionierte wie erwartet.

def label_maker_topic(tokens,topic_words): 
    twt_list = [] 
    for i in range(0, len(topic_words)): 
     count = 0 
     #print(topic_words[i]) 
     for tkn in tokens: 
      if tkn in topic_words[i]: 
       count += 1 
     twt_list.append(count) 

    return twt_list 

Ich habe udf unter withColumn verwendet, um auf die Funktion zuzugreifen, und ich bekomme einen Fehler. Ich denke, es geht darum, eine externe Liste an ein udf zu übergeben. Gibt es eine Möglichkeit, die externe Liste und die Datafram-Spalte an ein udf weiterzuleiten und meinem Dataframe eine neue Spalte hinzuzufügen?

topicWord = udf(label_maker_topic,StringType()) 
myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token,keyword_list)) 

Antwort

20

Die sauberste Lösung ist zusätzliche Argumente Schließung passieren mit:

def make_topic_word(topic_words): 
    return udf(lambda c: label_maker_topic(c, topic_words)) 

df = sc.parallelize([(["union"],)]).toDF(["tokens"]) 

(df.withColumn("topics", make_topic_word(keyword_list)(col("tokens"))) 
    .show()) 

Dies keine Änderungen in keyword_list oder die Funktion mit UDF wickeln erfordert. Sie können diese Methode auch verwenden, um ein beliebiges Objekt zu übergeben. Dies kann verwendet werden, um beispielsweise eine Liste von sets für effiziente Suchvorgänge zu übergeben.

Wenn Sie Ihre aktuelle UDF verwenden möchten, und übergeben topic_words direkt werden Sie es auf eine Spalte wörtliche erste konvertieren müssen:

from pyspark.sql.functions import array, lit 

ks_lit = array(*[array(*[lit(k) for k in ks]) for ks in keyword_list]) 
df.withColumn("ad", topicWord(col("tokens"), ks_lit)).show() 

Je nach Daten und Anforderungen es können alternative, effizientere Lösungen, die keine UDFs (explode + aggregate + collapse) oder Lookups (Hashing + Vektoroperationen) benötigen.

7

Folgende funktioniert gut, wo ein externer Parameter kann auf die UDF übergeben werden

topicWord=udf(lambda tkn: label_maker_topic(tkn,topic_words),StringType()) 
myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token)) 
+0

Dies funktioniert (ein gezwickt Code jemand helfen), aber ich würde mit dieser vorsichtig sein, denn die UDF die 'topic_words haben 'Wert im Moment der UDF wurde definiert. Das Ändern der 'topic_words' und das erneute Verwenden des udf wird später nicht funktionieren - es wird immer noch den Wert von' topic_words' zum Zeitpunkt der Definition des udf verwenden. – CHP