2014-07-01 6 views
9

Ich versuche, das folgende Problem mit pyspark zu lösen. Ich habe eine Datei auf hdfs im Format, das ein Dump der Nachschlagetabelle ist.Erstellen eines großen Wörterbuchs in pyspark

Ich möchte dies in Python-Wörterbuch in Pyspark laden und für einen anderen Zweck verwenden. Also habe ich versucht zu tun:

table = {} 
def populateDict(line): 
    (k,v) = line.split(",", 1) 
    table[k] = v 

kvfile = sc.textFile("pathtofile") 
kvfile.foreach(populateDict) 

Ich fand, dass Tabellenvariable nicht geändert wird. Also, gibt es eine Möglichkeit, eine große inhemory hashtable im Funken zu erstellen?

Antwort

4

foreach ist eine verteilte Berechnung, so dass Sie nicht erwarten können, dass sie eine nur im Treiber sichtbare Datenstruktur ändert. Was du willst ist. Diese

kv.map(line => { line.split(" ") match { 
    case Array(k,v) => (k,v) 
    case _ => ("","") 
}.collectAsMap() 

ist in scala, aber Sie bekommen die Idee, die wichtige Funktion collectAsMap() ist, die eine Karte für den Fahrer zurückgibt.

Wenn Ihre Daten sehr groß sind, können Sie ein PairRDD als Karte verwenden. Erste Karte Paare

kv.map(line => { line.split(" ") match { 
     case Array(k,v) => (k,v) 
     case _ => ("","") 
    } 

dann können Sie mit rdd.lookup("key") zugreifen, die eine Folge von Werten kehrt mit dem Schlüssel zugeordnet ist, auch wenn dies auf jeden Fall nicht so effizient wie andere verteilte KV speichert sein wird, als Funke nicht wirklich gebaut wird Das.

+0

Cool danke. Bedeutet dies, dass die Karte in den Speicher des Fahrers passen muss? Oder ist es noch verteilt? – Kamal

+0

@Kamal ja es muss in mem passen. U könnte das Paar rdd als Nachschlagetabelle verwenden. Auch an eine Lösung mit akkumulierbar gedacht, wird bald – aaronman

+0

in Ordnung bringen. Ich suchte nach einer verteilten Karte in Spark. Sieht so aus als wäre es nicht möglich! – Kamal

1

Für Effizienz finden Sie unter: sortByKey() and lookup()

Lookup (key):

Return die Liste der Werte in der RDD für Schlüssel-Taste. Diese Operation wird effizient ausgeführt, wenn der RDD einen bekannten Partitionierer hat, indem er nur die Partition durchsucht, der der Schlüssel zugeordnet ist.

Die RDD wird von sortByKey() (see: OrderedRDD) und effizient gesucht während lookup() Anrufe neu partitioniert werden. In Code, etwas wie,

kvfile = sc.textFile("pathtofile") 
sorted_kv = kvfile.flatMap(lambda x: x.split("," , 1)).sortByKey() 

sorted_kv.lookup('key1').take(10) 

wird den Trick sowohl als RDD und effizient.