3

Also ich versuche, einen parallelen Algorithmus für Prims-Algorithmus zu schreiben, aber ich kann nicht ganz herausfinden, wie es mit Spark Graphx zu tun. Ich habe ziemlich nach Ressourcen gesucht, aber es gibt nicht viele Beispiele für die Implementierung von Algorithmen für den kürzesten Pfad in graphx. Ich denke, ich muss Divide and Conquer verwenden und das Diagramm in Untergraphen aufteilen und dann ihre MST zusammenführen.Wie Parallel Prims Algorithmus in Graphx

Graphx Ressource: http://ampcamp.berkeley.edu/big-data-mini-course/graph-analytics-with-graphx.html#the-property-graph

Parallel Prims Ressource: https://www8.cs.umu.se/kurser/5DV050/VT10/handouts/F10.pdf

Code:

import org.apache.spark._ 
import org.apache.log4j.Logger 
import org.apache.log4j.Level 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 
import org.apache.spark.graphx._ 
import org.apache.spark.rdd.RDD 
import org.apache.spark.graphx.util._ 

object ParallelPrims { 
    Logger.getLogger("org").setLevel(Level.OFF) 
    Logger.getLogger("akka").setLevel(Level.OFF) 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setAppName("Parallel Prims").setMaster("local") 
    val sc = new SparkContext(conf) 
    val logFile = "NodeData.txt" 

    val logData = sc.textFile(logFile, 2).cache() 
    // Splitting off header node 
    val headerAndRows = logData.map(line => line.split(",").map(_.trim)) 
    val header = headerAndRows.first 
    val data = headerAndRows.filter(_(0) != header(0)) 
    // Parse number of Nodes and Edges from header 
    val numNodes = header(0).toInt 
    val numEdges = header(1).toInt 

    val vertexArray = new Array[(Long, String)](numNodes) 

    val edgeArray = new Array[Edge[Int]](numEdges) 
    // Create vertex array 
    var count = 0 
    for (count <- 0 to numNodes - 1) { 
     vertexArray(count) = (count.toLong + 1, ("v" + (count + 1)).toString()) 
    } 
    count = 0 
    val rrdarr = data.take(data.count.toInt) 
    // Create edge array 
    for (count <- 0 to (numEdges - 1)) { 
     val line = rrdarr(count) 
     val cols = line.toList 
     val edge = Edge(cols(0).toLong, cols(1).toLong, cols(2).toInt) 
     edgeArray(count) = Edge(cols(0).toLong, cols(1).toLong, cols(2).toInt) 
    } 
    // Creating graphx graph 
    val vertexRDD: RDD[(Long, (String))] = sc.parallelize(vertexArray) 
    val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray) 

    val graph: Graph[String, Int] = Graph(vertexRDD, edgeRDD) 

    graph.triplets.take(6).foreach(println) 

    } 

} 

NodeData.txt

4,6 
1,2,5 
1,3,8 
1,4,4 
2,3,8 
2,4,7 
3,4,1 

Ausgabe

((1,v1),(2,v2),5) 
((1,v1),(3,v3),8) 
((1,v1),(4,v4),4) 
((2,v2),(3,v3),8) 
((2,v2),(4,v4),7) 
((3,v3),(4,v4),1) 
+0

Ihr Zuordnungshandout beschreibt den parallelen Algorithmus. Was hast du versucht, das umzusetzen, und wo steckst du fest? SO sollten Sie nicht irgendwo Leute bitten, Ihren Code zu schreiben. –

+0

Danke für die Antwort, der Implementierungsteil ist, wo ich feststecke. Ich denke, dass ich eine Nachbarschaftsaggregation verwenden muss, aber die graphx-Beispiele, die ich finde, erklären nicht, wie das verwendet werden kann, um den kürzesten Weg zu finden. [link] (https://spark.apache.org/docs/0.9.1/graphx-programming-guide.html#map-reduce-triplets-mapreducetriplets) –

+0

Ich glaube nicht, dass Sie viele bekommen werden Antworten, bis Sie uns zeigen, was Sie versucht haben. * Natürlich * der Implementierungsteil ist, wo du feststeckst, darum geht es in der Aufgabe, und die Leute hier sind nicht sehr scharf darauf, deine Aufgabe nur für dich zu beantworten. Und (meiner Meinung nach) kann ich nicht sehen, wie es dir hilft, jemand anderen dazu zu bringen, es zu schreiben. –

Antwort

0

Hier ist meine Version von Prims-Algorithmus.

var graph : Graph [String, Int] = ... 

// just empty RDD for MST 
var MST = sc.parallelize(Array[EdgeTriplet[Int, Int]]()) 

// pick random vertex from graph 
var Vt: RDD[VertexId] = sc.parallelize(Array(graph.pickRandomVertex)) 

// do until all vertices is in Vt set 
val vcount = graph.vertices.count 
while (Vt.count < vcount) { 

    // rdd to make inner joins 
    val hVt = Vt.map(x => (x, x)) 

    // add key to make inner join 
    val bySrc = graph.triplets.map(triplet => (triplet.srcId, triplet)) 

    // add key to make inner join 
    val byDst = graph.triplets.map(triplet => (triplet.dstId, triplet)) 

    // all triplet where source vertex is in Vt 
    val bySrcJoined = bySrc.join(hVt).map(_._2._1) 

    // all triplet where destinaiton vertex is in Vt 
    val byDstJoined = byDst.join(hVt).map(_._2._1) 

    // sum previous two rdds and substract all triplets where both source and destination vertex in Vt 
    val candidates = bySrcJoined.union(byDstJoined).subtract(byDstJoined.intersection(bySrcJoined)) 

    // find triplet with least weight 
    val triplet = candidates.sortBy(triplet => triplet.attr).first 

    // add triplet to MST 
    MST = MST.union(sc.parallelize(Array(triplet))) 

    // find out whether we should add source or destinaiton vertex to Vt 
    if (!Vt.filter(x => x == triplet.srcId).isEmpty) { 
    Vt = Vt.union(sc.parallelize(Array(triplet.dstId))) 
    } else { 
    Vt = Vt.union(sc.parallelize(Array(triplet.srcId))) 
    } 
} 

// final minimum spanning tree 
MST.collect.foreach(p => println(p.srcId + " " + p.attr + " " + p.dstId)) 
+0

Danke! Ihr Code läuft parallel und gibt mir eine minimale Spannweite. Wie schnell läuft es auf Ihrem Cluster? Ich habe Probleme mit der Verarbeitung von Datensätzen mit mehr als 30 Knoten. Selbst nachdem ich die Speicherzuweisung für den Job erhöht habe, bekomme ich immer noch 'java.lang.OutOfMemoryError: GC overhead limit überschritten ' Hier ist, was ich derzeit habe, werde ich den Code in meiner Frage aktualisieren, nachdem ich herausfinden, was die verursacht Speicherprobleme. https://gist.github.com/gigglesbw4/f25fbec2b6873015d6bebec0014431b1 –

+0

für mich dein Beispiel mit 50 Vertices funktioniert ... aber es geht ziemlich lang – Hlib

+0

Mit einem einfachen Zähler anstatt Aufruf 'Vt.Count 'Aktion jedes Mal durch die Schleife, könnte einen der möglichen Engpässe entfernen. –