2016-05-07 14 views
0

Ich bin sehr neu in Spark und kenne mich nicht wirklich mit den Grundlagen aus, ich bin einfach hinein gesprungen, um ein Problem zu lösen. Die Lösung für das Problem besteht darin, ein Diagramm (mit GraphX) zu erstellen, in dem Kanten ein Zeichenfolgenattribut haben. Ein Benutzer möchte dieses Diagramm möglicherweise abfragen, und ich behandle die Abfragen, indem nur diejenigen Kanten herausgefiltert werden, die das Zeichenfolgenattribut haben, das der Suchanfrage des Benutzers entspricht.Wie funktioniert die Filterfunktion von Spark auf GraphX-Kanten?

Jetzt hat mein Diagramm mehr als 16 Millionen Kanten; Es dauert mehr als 10 Minuten, um das Diagramm zu erstellen, wenn ich alle 8 Kerne meines Computers verwende. Wenn ich jedoch diese Grafik abfrage (wie ich oben erwähnt habe), bekomme ich die Ergebnisse sofort (zu meiner angenehmen Überraschung).

Also, meine Frage ist, wie genau sucht der Filterbetrieb nach meinen abgefragten Kanten? Sieht es sie iterativ an? Werden die Kanten auf mehreren Kernen gesucht und es scheint nur sehr schnell? Oder ist irgendeine Art von Hashing beteiligt?

Hier ist ein Beispiel, wie ich Filter verwende: Mygraph.edges.filter (_. Attr (0) .equals ("cat")), was bedeutet, dass ich Kanten mit dem Attribut "cat." " in ihnen. Wie werden die Kanten gesucht?

Antwort

2

Wie können die Filterergebnisse sofort angezeigt werden?

Das Ausführen der Anweisung erfolgt so schnell, weil die Filterung nicht ausgeführt wird. Spark verwendet eine faule Auswertung: Es führt keine Transformationen durch, bis Sie eine Aktion ausführen, bei der die Ergebnisse tatsächlich erfasst werden. Der Aufruf einer Transformationsmethode wie filter erstellt nur eine neue RDD, die diese Transformation und ihr Ergebnis darstellt. Sie erhalten eine Aktion wie collect oder count durchführen müssen, um tatsächlich es ausgeführt:

def myGraph: Graph = ??? 

// No filtering actually happens yet here, the results aren't needed yet so Spark is lazy and doesn't do anything 
val filteredEdges = myGraph.edges.filter() 

// Counting how many edges are left requires the results to actually be instantiated, so this fires off the actual filtering 
println(filteredEdges.count) 

// Actually gathering all results also requires the filtering to be done 
val collectedFilteredEdges = filteredEdges.collect 

Beachten Sie, dass die Filterergebnisse in diesen Beispielen nicht zwischen gespeichert in: aufgrund der Trägheit der Filterung für beide wiederholt Aktionen. Um diese Duplizierung zu vermeiden, sollten Sie sich die Spark-Caching-Funktionalität ansehen, nachdem Sie die Details zu Transformationen und Aktionen gelesen haben und was Spark hinter der Szene tatsächlich leistet: https://spark.apache.org/docs/latest/programming-guide.html#rdd-operations.

Wie genau sucht die Filteroperation nach meinen abgefragten Kanten (wenn ich eine Aktion ausführe)?

in Spark GraphX ​​die Kanten sind in einer RDD des Typs EdgeRDD[ED] gespeichert, wobei ist der Typ Ihrer Kante Attribut, in Ihrem Fall String. Diese spezielle RDD führt einige spezielle Optimierungen im Hintergrund aus, aber für Ihre Zwecke verhält sie sich wie ihre Superklasse RDD[Edge[ED]] und das Filtern erfolgt wie das Filtern einer beliebigen RDD: Sie durchläuft alle Elemente und wendet das gegebene Prädikat auf jede an. Eine RDD ist jedoch in eine Anzahl von Partitionen aufgeteilt und Spark wird mehrere Partitionen parallel filtern; In Ihrem Fall, in dem Sie scheinen, Spark lokal auszuführen, wird es so viele parallel tun wie die Anzahl der Kerne, die Sie haben, oder wie viel Sie explizit mit --master local[4] zum Beispiel angegeben haben.

Die RDD mit Kanten ist partitioniert basierend auf der PartitionStrategy, die gesetzt ist, zum Beispiel, wenn Sie Ihren Graphen mit Graph.fromEdgeTuples erstellen oder partitionBy in Ihrem Graphen aufrufen. Alle Strategien basieren jedoch auf den Scheitelpunkten der Kante. Sie verfügen daher über keinerlei Kenntnisse über Ihr Attribut und haben daher keinen Einfluss auf Ihre Filterungsoperation, außer vielleicht für eine unsymmetrische Netzwerklast, wenn Sie sie auf einem Cluster ausführen würden. Die Kanten von cat landen in der gleichen Partition/Executor und Sie tun eine collect oder einige Shuffle-Operation. Weitere Informationen dazu, wie Graphen dargestellt und partitioniert werden, finden Sie unter GraphX docs on Vertex and Edge RDDs.

+0

Ich dachte nicht, dass ich etwas zeigen müsste, was ich getan habe. Ich möchte nur wissen, wie die Kanten gesucht werden, wenn ich einen Filter verwende. Ich meine, wenn ich Mygraph.edges.filter (_. Attr (0) .equals ("cat")) anrufe, wie werden die Kanten, die das Attribut "cat" haben, gesucht? – CMWasiq

+1

Ja, sagt Spark faule Bewertung sagt nichts darüber, wie Werte partitioniert und verteilt werden, wenn der Filter tatsächlich angewendet wird. –

+0

Gut genug, ich konzentrierte mich auf den anfänglichen Auslöser der Frage: dass er sofortige Rückkehr sah, die jetzt mit dem hinzugefügten Anruf zeigt, dass es überhaupt keine Filterung gab. Probieren Sie 'Mygraph.edges.filter (_. Attr (0) .equals (" cat ")). Count()' um zu sehen, wie lange die Filterung tatsächlich dauert. Ich habe jetzt einen schnellen Schnitt mit der grundlegenden Antwort für diesen Fall gemacht, ich kann etwas später in die Tiefe gehen. – sgvd