versuchen, etwas wie folgt aus:
graph.edges.filter(_.srcId == x).map(e => (e.dstId, null)).join(
graph.collectNeighborIds(EdgeDirection.Either)
).flatMap{t => t._2._2}.collect.toSet
Wenn Sie als diese tiefer gehen wollen, würde ich so etwas wie die Pregel API verwenden. Im Wesentlichen können Sie damit wiederholt Nachrichten von Knoten zu Knoten senden und die Ergebnisse aggregieren.
Edit: Pregel Lösung
Ich habe endlich die die Iterationen auf eigene Faust zu stoppen. Bearbeitungen unten. Vor diesem Diagramm:
graph.vertices.collect
res46: Array[(org.apache.spark.graphx.VertexId, Array[Long])] = Array((4,Array()), (8,Array()), (1,Array()), (9,Array()), (5,Array()), (6,Array()), (2,Array()), (3,Array()), (7,Array()))
graph.edges.collect
res47: Array[org.apache.spark.graphx.Edge[Double]] = Array(Edge(1,2,0.0), Edge(2,3,0.0), Edge(3,4,0.0), Edge(5,6,0.0), Edge(6,7,0.0), Edge(7,8,0.0), Edge(8,9,0.0), Edge(4,2,0.0), Edge(6,9,0.0), Edge(7,9,0.0))
Wir werden Nachrichten vom Typ senden Array[Long]
- ein Array aller VertexIds
von angeschlossenen Knoten. Nachrichten werden stromaufwärts gehen - die dst
sendet die src
ihre VertexId
zusammen mit allen anderen nachgeschalteten VertexIds
. Wenn der Upstream-Knoten die Verbindung bereits kennt, wird keine Nachricht gesendet. Schließlich weiß jeder Knoten über jeden verbundenen Knoten und es werden keine weiteren Nachrichten gesendet.
Zuerst definieren wir unsere vprog
. Nach der Dokumentation:
das benutzerdefinierte Vertex-Programm, das auf jedem Scheitelpunkt verläuft und empfängt die eingehende Nachricht und berechnet einen neuen Scheitelwert. Bei der ersten Iteration wird das Vertex-Programm an allen Scheitelpunkten aufgerufen und übergibt die Standardmeldung . Bei nachfolgenden Iterationen wird das Vertex-Programm nur an den Knoten aufgerufen, die Nachrichten empfangen.
def vprog(id: VertexId, orig: Array[Long], newly: Array[Long]) : Array[Long] = {
(orig ++ newly).toSet.toArray
}
Dann definieren wir unsere sendMsg
- bearbeitet:src
vertauscht & dst
einen Benutzer bereitgestellte Funktion, die aus Kanten von Vertices angewendet wird, die Nachrichten in der aktuellen Iteration empfangenen
def sendMsg(trip: EdgeTriplet[Array[Long],Double]) : Iterator[(VertexId, Array[Long])] = {
if (trip.srcAttr.intersect(trip.dstAttr ++ Array(trip.dstId)).length != (trip.dstAttr ++ Array(trip.dstId)).toSet.size) {
Iterator((trip.srcId, (Array(trip.dstId) ++ trip.dstAttr).toSet.toArray))
} else Iterator.empty }
Next unser mergeMsg
:
ein Benutzer bereitgestellte Funktion, die zwei eingehende Nachrichten vom Typ nimmt A und verschmilzt sie zu einer einzigen Nachricht vom Typ A. Diese Funktion muss kommutativ und assoziativ sein und idealerweise die Größe A sollte nicht erhöhen.
Leider werden wir die Regel im letzten Satz brechen oben:
def mergeMsg(a: Array[Long], b: Array[Long]) : Array[Long] = {
(a ++ b).toSet.toArray
}
Dann führen wir pregel
- bearbeitet: entfernt maxIterations
, standardmäßig Int.MaxValue
val result = graph.pregel(Array[Long]())(vprog, sendMsg, mergeMsg)
Und Sie können die Ergebnisse betrachten:
result.vertices.collect
res48: Array[(org.apache.spark.graphx.VertexId, Array[Long])] = Array((4,Array(4, 2, 3)), (8,Array(8, 9)), (1,Array(1, 2, 3, 4)), (9,Array(9)), (5,Array(5, 6, 9, 7, 8)), (6,Array(6, 7, 9, 8)), (2,Array(2, 3, 4)), (3,Array(3, 4, 2)), (7,Array(7, 8, 9)))
Gut. Das ist überhaupt kein einfaches Problem. Es gibt eine aggregateMessages-API, die Sie sich ansehen sollten, und einen Algorithmus namens "Pregel", den Sie verwenden könnten. Im Wesentlichen können Sie damit wiederholt Nachrichten von Knoten zu Knoten senden und die Ergebnisse aggregieren. Es ist schwierig, dies in einem Kommentar zu erklären - aber stellen Sie sich vor, dass Knoten 7 in der ersten Iteration des Algorithmus (8,9) an Knoten 2 senden würde und Knoten 8 (10,11) an Knoten 7 senden würde In der zweiten Iteration würde der Knoten 7 (10, 11) an den Knoten 2 senden. So haben Sie zwei Wiederholungen von Pregel, die Sie beantworten müssen. –
Sicher David .. ich werde damit arbeiten ... und werde Sie aktualisieren ... Danke – Devndra
Also, großartig - jetzt akzeptiere und upvote meine Antwort !! –