Also ich versuche, einen parallelen Algorithmus für Prims-Algorithmus zu schreiben, aber ich kann nicht ganz herausfinden, wie es mit Spark Graphx zu tun. Ich habe ziemlich nach Ressourcen gesucht, aber es gibt nicht viele Beispiele für die Implementierung von Algorithmen für den kürzesten Pfad in graphx. Ich denke, ich muss Divide and Conquer verwenden und das Diagramm in Untergraphen aufteilen und dann ihre MST zusammenführen.Wie Parallel Prims Algorithmus in Graphx
Graphx Ressource: http://ampcamp.berkeley.edu/big-data-mini-course/graph-analytics-with-graphx.html#the-property-graph
Parallel Prims Ressource: https://www8.cs.umu.se/kurser/5DV050/VT10/handouts/F10.pdf
Code:
import org.apache.spark._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.util._
object ParallelPrims {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Parallel Prims").setMaster("local")
val sc = new SparkContext(conf)
val logFile = "NodeData.txt"
val logData = sc.textFile(logFile, 2).cache()
// Splitting off header node
val headerAndRows = logData.map(line => line.split(",").map(_.trim))
val header = headerAndRows.first
val data = headerAndRows.filter(_(0) != header(0))
// Parse number of Nodes and Edges from header
val numNodes = header(0).toInt
val numEdges = header(1).toInt
val vertexArray = new Array[(Long, String)](numNodes)
val edgeArray = new Array[Edge[Int]](numEdges)
// Create vertex array
var count = 0
for (count <- 0 to numNodes - 1) {
vertexArray(count) = (count.toLong + 1, ("v" + (count + 1)).toString())
}
count = 0
val rrdarr = data.take(data.count.toInt)
// Create edge array
for (count <- 0 to (numEdges - 1)) {
val line = rrdarr(count)
val cols = line.toList
val edge = Edge(cols(0).toLong, cols(1).toLong, cols(2).toInt)
edgeArray(count) = Edge(cols(0).toLong, cols(1).toLong, cols(2).toInt)
}
// Creating graphx graph
val vertexRDD: RDD[(Long, (String))] = sc.parallelize(vertexArray)
val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
val graph: Graph[String, Int] = Graph(vertexRDD, edgeRDD)
graph.triplets.take(6).foreach(println)
}
}
NodeData.txt
4,6
1,2,5
1,3,8
1,4,4
2,3,8
2,4,7
3,4,1
Ausgabe
((1,v1),(2,v2),5)
((1,v1),(3,v3),8)
((1,v1),(4,v4),4)
((2,v2),(3,v3),8)
((2,v2),(4,v4),7)
((3,v3),(4,v4),1)
Ihr Zuordnungshandout beschreibt den parallelen Algorithmus. Was hast du versucht, das umzusetzen, und wo steckst du fest? SO sollten Sie nicht irgendwo Leute bitten, Ihren Code zu schreiben. –
Danke für die Antwort, der Implementierungsteil ist, wo ich feststecke. Ich denke, dass ich eine Nachbarschaftsaggregation verwenden muss, aber die graphx-Beispiele, die ich finde, erklären nicht, wie das verwendet werden kann, um den kürzesten Weg zu finden. [link] (https://spark.apache.org/docs/0.9.1/graphx-programming-guide.html#map-reduce-triplets-mapreducetriplets) –
Ich glaube nicht, dass Sie viele bekommen werden Antworten, bis Sie uns zeigen, was Sie versucht haben. * Natürlich * der Implementierungsteil ist, wo du feststeckst, darum geht es in der Aufgabe, und die Leute hier sind nicht sehr scharf darauf, deine Aufgabe nur für dich zu beantworten. Und (meiner Meinung nach) kann ich nicht sehen, wie es dir hilft, jemand anderen dazu zu bringen, es zu schreiben. –