2014-11-04 6 views
11

Also versuche ich, Spark mit Python (Pyspark) zu lernen. Ich möchte wissen, wie die Funktion mapPartitions funktioniert. Das ist es, was Input benötigt und was Output gibt. Ich konnte kein richtiges Beispiel aus dem Internet finden. Nehmen wir an, ich habe ein RDD-Objekt mit Listen, wie unten.Wie funktioniert die pyspark mapPartitions-Funktion?

[ [1, 2, 3], [3, 2, 4], [5, 2, 7] ] 

Und ich mag Element 2 aus allen Listen entfernen, wie würde ich erreichen, dass mapPartitions verwenden.

Antwort

17

mapPartition sollte als eine Kartenoperation über Partitionen und nicht über die Elemente der Partition betrachtet werden. Die Eingabe ist die Menge der aktuellen Partitionen, deren Ausgabe eine weitere Gruppe von Partitionen sein wird.

Die Funktion, die Sie Karte passieren ein einzelnes Element des RDD nehmen muss

Die Funktion übergeben Sie mapPartition eine iterable Ihrer RDD Typen festgelegt werden und zurückkehren und iterable eines anderen oder gleichen Typs.

In Ihrem Fall Sie wollen wahrscheinlich nur etwas zu tun, wie

def filterOut2(line): 
    return [x for x in line if x != 2] 

filtered_lists = data.map(filterOut2) 

, wenn Sie es

def filterOut2FromPartion(list_of_lists): 
    final_iterator = [] 
    for sub_list in list_of_lists: 
    final_iterator.append([x for x in sub_list if x != 2]) 
    return iter(final_iterator) 

filtered_lists = data.mapPartition(filterOut2FromPartion) 
+0

Warum gibst du nichts zurück in filterOut2FromPartition f Salbung. Zweitens, ist ein Keyword in Python endgültig? Ich denke du solltest final.iterator = [] anstelle von final_iterator sagen. – MetallicPriest

+0

Die Probleme behoben – bearrito

+0

Ich habe versucht, dies zu implementieren, aber ich bekomme den Fehler "Liste Objekt ist kein Iterator". Außerdem glaube ich, als du [x für x in Zeile, wenn x! = 2] geschrieben hast, meinst du [x für x in der Liste, wenn x! = 2]. Ich habe dort die Liste benutzt. – MetallicPriest

18

Es ist einfacher zu benutzen mapPartitions mit einer Generatorfunktion mit der yield wäre verwenden mapPartition wollte Syntax:

def filter_out_2(partition): 
    for element in partition: 
     if element != 2: 
      yield element 

filtered_lists = data.mapPartition(filter_out_2) 
+0

Ist das schneller als nur eine Liste zurückgeben? – cgreen

+1

@cgreen Die Partition enthält alle Ihre Daten. Ich bin mir nicht sicher, ob Sie alle Ihre Daten in eine Liste laden möchten. Generatoren werden gegenüber Listen bevorzugt, wenn Sie über Daten iterieren. – Narek

+0

@cgreen Generatoren benötigen weniger Speicher, da sie jedes Element nach Bedarf generieren, anstatt zunächst eine ganze Liste von Objekten erstellen zu müssen. Es verbraucht also definitiv weniger Speicher und ist daher wahrscheinlich schneller. [Hier ist eine gute Erklärung der Generatoren in Python] (https://medium.freecodecamp.org/python-list-comprehensions-vs-generator-expressions-cef70ccb49db). –