Ich benutze graphx api in einem iterativen Algorithmus. Obwohl I have carefully cache/ unpersist rdd, and take care of the vertices partition num. Die Zeitkosten scheinen immer noch pro Runde in einem linearen Trend zuzunehmen. Die vereinfachte Version meiner Code, wie folgend, und es wird das gleiche Problem:Spark Graphx: Zeit Die Kosten steigen pro Runde linear linear
import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
object ComputingTimeProblem extends App {
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val conf = new SparkConf().setMaster("local[1]").setAppName("test")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var graph = GraphGenerators
.logNormalGraph(sc, 15000).mapVertices((_, _) => 1d)
.cache
graph.vertices.take(10).foreach(println)
val maxIter = 50
var preGraph: Graph[Double, Int] = null
var allTime: ArrayBuffer[Double] = ArrayBuffer()
for (i <- 1 to maxIter) {
val begin = System.currentTimeMillis()
preGraph = graph
val vertices2 = graph.triplets.map(tri => (tri.srcId, tri.dstAttr)).reduceByKey(_ + _)
graph = graph.joinVertices(vertices2)((vid, left, right) => left + right).cache
graph.vertices.take(10)
preGraph.unpersist()
val end = System.currentTimeMillis()
val duration = (end - begin)/(60 * 1000d)
allTime += duration
println(s"Round ${i} Time Cost: %.4f min, Vertices Partition Num: %d".format(
duration, graph.vertices.getNumPartitions))
}
graph.vertices.take(10).foreach(println)
val avgTime = allTime.sum/allTime.size
println(s"Average Time = ${avgTime}")
val timeCostDiffs = for (i <- 1 until maxIter) yield (allTime(i) - allTime(i - 1))
timeCostDiffs
.zipWithIndex
.map(x => "Round %d to %d, Time Cost Diff: %.4f min".format(x._2+1, x._2 + 2, x._1))
.foreach(println)
println("tc\n"+allTime.mkString("\n"))
}
Ich habe nicht den Index des Diagrammobjekt geändert, und die graphx würde beitreten die Vertices durch leftZipJoin Methode, die nicht shuffling erfordert, also warum die Zeitkosten noch pro Runde erhöhen. Kann jemand einige konstruktive Optionen geben, danke ?!