2016-08-03 13 views
2

Ich muss die verteilte Berechnung auf Spark DataFrame fortsetzen, wobei einige willkürliche (nicht SQL-) Logik auf Brocken von DataFrame aufgerufen wird. ich getan habe:Spark DataFrame mapPartitions

def some_func(df_chunk): 
    pan_df = df_chunk.toPandas() 
    #whatever logic here 

df = sqlContext.read.parquet(...) 
result = df.mapPartitions(some_func) 

Unfortunatelly es führt zu:

AttributeError: 'itertools.chain' object has no attribute 'toPandas'

ich erwartet hatte Funke Dataframe Objekt innerhalb jeder Karte Aufruf zu haben, stattdessen habe ich 'itertools.chain'. Warum? Und wie kann man das überwinden?

+2

PySpark itertools.chain wird unter Verwendung von Daten an die mapPartition passieren und so sind vorbei Sie das Objekt an die Funktion, die es nicht erkennt. –

Antwort

2

Versuchen Sie folgendes:

>>> columns = df.columns 
>>> df.rdd.mapPartitions(lambda iter: [pd.DataFrame(list(iter), columns=columns)])