2016-07-04 8 views
1

Ich habe ein Graphframe Objekt: g und einem Objekt RDD: Kandidat: ein Ergebnis, RDD mit Schlüssel, Wert-PaareWie schreibt man eine Transformationsfunktion, um RDD mit Bezug auf ein Graphframe-Objekt zu transformieren?

g = GraphFrame(v,e) 
candidates_rdd.collect() 
# [Row(source=u'a', target=u'b'), 
# Row(source=u'a', target=u'c'), 
# Row(source=u'e', target=u'a')] 

Ich möchte von „source“ einen Pfad berechnen, zu „Ziel“ in candidates_rdd und erzeugen ((source, target), path_list) mit der ersten Suche von graphframe browth, wobei path_list eine Liste von Pfaden von Quelle zu Ziel ist.

Beispiel Ausgänge:

(('a','b'),['a-c-b','a-d-e-b']), 
(('f','c'),[]), 
(('a',d'),['a-b-e-d'] 

schrieb ich die unten Funktion:

def bfs_(row):  
    arg1 = "id = '" + row.source + "'" 
    arg2 = "id = '" + row.target + "'"   
    return ((row.source, row.target), g.bfs(arg1,arg2).rdd) 

results = candidates_rdd.map(bfs_) 

ich diesen Fehler habe:

Py4JError: An error occurred while calling o274.__getnewargs__. Trace: 
py4j.Py4JException: Method __getnewargs__([]) does not exist 

habe ich versucht, die Grafik global oder Broadcast zu machen funktioniert auch nicht.

Könnte mir jemand dabei helfen?

Vielen Dank !!

Antwort

1

TL; DR Es ist nicht möglich.

Spark unterstützt verschachtelte Operationen wie diese nicht. Die äußere Schleife muss nicht verteilt sein:

>>> [g.bfs(arg1, arg2) for arg1, arg2 in candidates_rdd.collect()]