2016-08-07 42 views
3

ich einige DSTREAM in Spark-Scala haben und ich möchte es dann oben N. Das Problem nehmen sortieren ist, dass wenn ich versuche, es zu laufen bekomme ich NotSerializableException und die Ausnahmemeldung sagt:Sortieren einer DSTREAM und unter Erste N

Dies liegt daran, dass auf das DStream-Objekt innerhalb der Closure verwiesen wird.

Das Problem ist, dass ich weiß nicht, wie es zu lösen:

Hier ist mein Versuch:

package com.badrit.realtime 

import java.util.Date 

import com.badrit.drivers.UnlimitedSpaceTimeDriver 
import com.badrit.model.{CellBuilder, DataReader, Trip} 
import com.badrit.utility.Printer 
import org.apache.spark.SparkConf 
import org.apache.spark.rdd.RDD 
import org.apache.spark.streaming.dstream.{DStream, InputDStream} 
import org.apache.spark.streaming.{Duration, Milliseconds, StreamingContext} 

import scala.collection.mutable 

object StreamingDriver { 
    val appName: String = "HotSpotRealTime" 
    val hostName = "localhost" 
    val port = 5050 
    val constrains = UnlimitedSpaceTimeDriver.constrains; 
    var streamingRate = 1; 
    var windowSize = 8; 
    var slidingInterval = 2; 
    val cellBuilder = new CellBuilder(constrains) 
    val inputFilePath = "/home/ahmedelgamal/Downloads/green_tripdata_2015-02.csv" 

    def prepareTestData(sparkStreamCtx: StreamingContext): InputDStream[Trip] = { 

     val sparkCtx = sparkStreamCtx.sparkContext  
     val textFile: RDD[String] = sparkCtx.textFile(inputFilePath) 
     val data: RDD[Trip] = new DataReader().getTrips(textFile) 
     val groupedData = data.filter(_.pickup.date.before(new Date(2015, 1, 2, 0, 0, 0))) 
      .groupBy(trip => trip.pickup.date.getMinutes).sortBy(_._1).map(_._2).collect() 

     printf("Grouped Data Count is " + groupedData.length) 
     var dataQueue: mutable.Queue[RDD[Trip]] = mutable.Queue.empty; 

     groupedData.foreach(trips => dataQueue += sparkCtx.makeRDD(trips.toArray)) 
     printf("\n\nTest Queue size is " + dataQueue.size) 


     groupedData.zipWithIndex.foreach { case (trips: Iterable[Trip], index: Int) => { 
      println("Items List " + index) 


      val passengers: Array[Int] = trips.map(_.passengers).toArray 
      val cnt = passengers.length 
      println("Sum is " + passengers.sum) 
      println("Cnt is " + cnt) 

      val passengersRdd = sparkCtx.parallelize(passengers) 
      println("Mean " + passengersRdd.mean()) 
      println("Stdv" + passengersRdd.stdev()) 

     } 
     } 
     sparkStreamCtx.queueStream(dataQueue, true) 
    } 


    def cellCreator(trip: Trip) = cellBuilder.cellForCarStop(trip.pickup) 

    def main(args: Array[String]) { 
     if (args.length < 1) { 
      streamingRate = 1; 
      windowSize = 3 //2 hours 60 * 60 * 1000L 
      slidingInterval = 2 //0.5 hour 60 * 60 * 1000L 
     } 
     else { 
      streamingRate = args(0).toInt; 
      windowSize = args(1).toInt 
      slidingInterval = args(2).toInt 
     } 

     val sparkConf = new SparkConf().setAppName(appName).setMaster("local[*]") 
     val sparkStreamCtx = new StreamingContext(sparkConf, Milliseconds(streamingRate)) 
     sparkStreamCtx.sparkContext.setLogLevel("ERROR") 
     sparkStreamCtx.checkpoint("/tmp") 

     val data: InputDStream[Trip] = prepareTestData(sparkStreamCtx) 
     val dataWindow = data.window(new Duration(windowSize), new Duration(slidingInterval)) 

     //my main problem lies in the following line 
     val newDataWindow = dataWindow.transform(rdd => sparkStreamCtx.sparkContext.parallelize(rdd.take(10))) 
     newDataWindow.print 

     sparkStreamCtx.start() 
     sparkStreamCtx.awaitTerminationOrTimeout(1000) 

    } 
} 

Ich habe keine andere Möglichkeiten, etwas dagegen eine DSTREAM zu sortieren und erhalten es ist eher mein Top N als mein Weg.

+1

Warum verwenden Sie 'sparkStreamCtx.sparkContext.parallelize' in' transform'? Warum machst du nicht einfach 'transform (rdd => rdd.take (10))'? –

+0

Ursache '.Take' gibt ein Array nicht eine rdd zurück und' .transform' muss eine rdd als Eingabe nehmen und gibt rdd zurück (im Falle, dass 'sparkStreamCtx.sparkContext.parallelize' nicht verwendet wird, ist das zurückgegebene Array nicht rdd). –

Antwort

2

Sie können die Transformationsmethode im DStream-Objekt verwenden, dann die Eingabe-RDD sortieren und n Elemente davon in eine Liste aufnehmen und dann die ursprüngliche RDD filtern, die in dieser Liste enthalten sein soll.

val n = 10 
val topN = result.transform(rdd =>{ 
    val list = rdd.sortBy(_._1).take(n) 
    rdd.filter(list.contains) 
}) 
topN.print