2016-06-22 11 views
0

Ich benutze graphx api in einem iterativen Algorithmus. Obwohl I have carefully cache/ unpersist rdd, and take care of the vertices partition num. Die Zeitkosten scheinen immer noch pro Runde in einem linearen Trend zuzunehmen. Die vereinfachte Version meiner Code, wie folgend, und es wird das gleiche Problem:Spark Graphx: Zeit Die Kosten steigen pro Runde linear linear

import org.apache.log4j.{Level, Logger} 
import org.apache.spark.graphx.Graph 
import org.apache.spark.graphx.util.GraphGenerators 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkConf, SparkContext} 

import scala.collection.mutable.ArrayBuffer 


object ComputingTimeProblem extends App { 

    Logger.getLogger("org").setLevel(Level.ERROR) 
    Logger.getLogger("akka").setLevel(Level.ERROR) 
    val conf = new SparkConf().setMaster("local[1]").setAppName("test") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 

    var graph = GraphGenerators 
     .logNormalGraph(sc, 15000).mapVertices((_, _) => 1d) 
     .cache 
    graph.vertices.take(10).foreach(println) 

    val maxIter = 50 

    var preGraph: Graph[Double, Int] = null 
    var allTime: ArrayBuffer[Double] = ArrayBuffer() 
    for (i <- 1 to maxIter) { 

     val begin = System.currentTimeMillis() 

     preGraph = graph 

     val vertices2 = graph.triplets.map(tri => (tri.srcId, tri.dstAttr)).reduceByKey(_ + _) 
     graph = graph.joinVertices(vertices2)((vid, left, right) => left + right).cache 
     graph.vertices.take(10) 

     preGraph.unpersist() 

     val end = System.currentTimeMillis() 

     val duration = (end - begin)/(60 * 1000d) 
     allTime += duration 
     println(s"Round ${i} Time Cost: %.4f min, Vertices Partition Num: %d".format(
      duration, graph.vertices.getNumPartitions)) 
    } 

    graph.vertices.take(10).foreach(println) 

    val avgTime = allTime.sum/allTime.size 
    println(s"Average Time = ${avgTime}") 

    val timeCostDiffs = for (i <- 1 until maxIter) yield (allTime(i) - allTime(i - 1)) 
    timeCostDiffs 
     .zipWithIndex 
     .map(x => "Round %d to %d, Time Cost Diff: %.4f min".format(x._2+1, x._2 + 2, x._1)) 
     .foreach(println) 

    println("tc\n"+allTime.mkString("\n")) 
} 

Zeitkostentrend folgend enter image description here

Ich habe nicht den Index des Diagrammobjekt geändert, und die graphx würde beitreten die Vertices durch leftZipJoin Methode, die nicht shuffling erfordert, also warum die Zeitkosten noch pro Runde erhöhen. Kann jemand einige konstruktive Optionen geben, danke ?!

Antwort

1

Es ist immer noch eine Linie Problem, ich habe gerade gefunden. Graph Objekt hat zwei RDD: Vertex RDD und Edge RDD. Im obigen Code habe ich gerade den Vertex rdd materialisiert, nicht den Rand rdd. Also wird jede Runde die vorherigen Kanten agagin neu berechnet. Also, materialisieren beide RDD mit Objekt Triplets wird das Problem zu lösen, wie folgt:

import org.apache.log4j.{Level, Logger} 
import org.apache.spark.graphx.Graph 
import org.apache.spark.graphx.util.GraphGenerators 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkConf, SparkContext} 

import scala.collection.mutable.ArrayBuffer 


object ComputingTimeProblem extends App { 

    Logger.getLogger("org").setLevel(Level.ERROR) 
    Logger.getLogger("akka").setLevel(Level.ERROR) 
    val conf = new SparkConf().setMaster("local[1]").setAppName("test") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 

    var graph = GraphGenerators 
     .logNormalGraph(sc, 15000).mapVertices((_, _) => 1d) 
     //  .partitionBy(PartitionStrategy.RandomVertexCut,8) 
     .cache 
    graph.vertices.take(10).foreach(println) 

    val maxIter = 50 

    var preGraph: Graph[Double, Int] = null 
    var allTime: ArrayBuffer[Double] = ArrayBuffer() 
    for (i <- 1 to maxIter) { 
     val begin = System.currentTimeMillis() 

     preGraph = graph 

     val vertices2 = graph.triplets.map(tri => (tri.srcId, tri.dstAttr)).reduceByKey(_ + _) 
     graph = graph.joinVertices(vertices2)((vid, left, right) => left + right).cache 
     graph.triplets.take(10) // here materialize both vertex and edge rdd 
     // graph.vertices.take(10) 

     preGraph.unpersist() 

     val end = System.currentTimeMillis() 

     val duration = (end - begin)/(60 * 1000d) 
     allTime += duration 
     println(s"Round ${i} Time Cost: %.4f min, Vertices Partition Num: %d".format(
      duration, graph.vertices.getNumPartitions)) 
    } 

    graph.vertices.take(10).foreach(println) 

    val avgTime = allTime.sum/allTime.size 
    println(s"Average Time = ${avgTime}") 

    val timeCostDiffs = for (i <- 1 until maxIter) yield (allTime(i) - allTime(i - 1)) 
    timeCostDiffs 
     .zipWithIndex 
     .map(x => "Round %d to %d, Time Cost Diff: %.4f min".format(x._2 + 1, x._2 + 2, x._1)) 
     .foreach(println) 


    println("tc\n" + allTime.mkString("\n")) 

}