ich einige DSTREAM in Spark-Scala haben und ich möchte es dann oben N. Das Problem nehmen sortieren ist, dass wenn ich versuche, es zu laufen bekomme ich NotSerializableException
und die Ausnahmemeldung sagt:Sortieren einer DSTREAM und unter Erste N
Dies liegt daran, dass auf das DStream-Objekt innerhalb der Closure verwiesen wird.
Das Problem ist, dass ich weiß nicht, wie es zu lösen:
Hier ist mein Versuch:
package com.badrit.realtime
import java.util.Date
import com.badrit.drivers.UnlimitedSpaceTimeDriver
import com.badrit.model.{CellBuilder, DataReader, Trip}
import com.badrit.utility.Printer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Duration, Milliseconds, StreamingContext}
import scala.collection.mutable
object StreamingDriver {
val appName: String = "HotSpotRealTime"
val hostName = "localhost"
val port = 5050
val constrains = UnlimitedSpaceTimeDriver.constrains;
var streamingRate = 1;
var windowSize = 8;
var slidingInterval = 2;
val cellBuilder = new CellBuilder(constrains)
val inputFilePath = "/home/ahmedelgamal/Downloads/green_tripdata_2015-02.csv"
def prepareTestData(sparkStreamCtx: StreamingContext): InputDStream[Trip] = {
val sparkCtx = sparkStreamCtx.sparkContext
val textFile: RDD[String] = sparkCtx.textFile(inputFilePath)
val data: RDD[Trip] = new DataReader().getTrips(textFile)
val groupedData = data.filter(_.pickup.date.before(new Date(2015, 1, 2, 0, 0, 0)))
.groupBy(trip => trip.pickup.date.getMinutes).sortBy(_._1).map(_._2).collect()
printf("Grouped Data Count is " + groupedData.length)
var dataQueue: mutable.Queue[RDD[Trip]] = mutable.Queue.empty;
groupedData.foreach(trips => dataQueue += sparkCtx.makeRDD(trips.toArray))
printf("\n\nTest Queue size is " + dataQueue.size)
groupedData.zipWithIndex.foreach { case (trips: Iterable[Trip], index: Int) => {
println("Items List " + index)
val passengers: Array[Int] = trips.map(_.passengers).toArray
val cnt = passengers.length
println("Sum is " + passengers.sum)
println("Cnt is " + cnt)
val passengersRdd = sparkCtx.parallelize(passengers)
println("Mean " + passengersRdd.mean())
println("Stdv" + passengersRdd.stdev())
}
}
sparkStreamCtx.queueStream(dataQueue, true)
}
def cellCreator(trip: Trip) = cellBuilder.cellForCarStop(trip.pickup)
def main(args: Array[String]) {
if (args.length < 1) {
streamingRate = 1;
windowSize = 3 //2 hours 60 * 60 * 1000L
slidingInterval = 2 //0.5 hour 60 * 60 * 1000L
}
else {
streamingRate = args(0).toInt;
windowSize = args(1).toInt
slidingInterval = args(2).toInt
}
val sparkConf = new SparkConf().setAppName(appName).setMaster("local[*]")
val sparkStreamCtx = new StreamingContext(sparkConf, Milliseconds(streamingRate))
sparkStreamCtx.sparkContext.setLogLevel("ERROR")
sparkStreamCtx.checkpoint("/tmp")
val data: InputDStream[Trip] = prepareTestData(sparkStreamCtx)
val dataWindow = data.window(new Duration(windowSize), new Duration(slidingInterval))
//my main problem lies in the following line
val newDataWindow = dataWindow.transform(rdd => sparkStreamCtx.sparkContext.parallelize(rdd.take(10)))
newDataWindow.print
sparkStreamCtx.start()
sparkStreamCtx.awaitTerminationOrTimeout(1000)
}
}
Ich habe keine andere Möglichkeiten, etwas dagegen eine DSTREAM zu sortieren und erhalten es ist eher mein Top N als mein Weg.
Warum verwenden Sie 'sparkStreamCtx.sparkContext.parallelize' in' transform'? Warum machst du nicht einfach 'transform (rdd => rdd.take (10))'? –
Ursache '.Take' gibt ein Array nicht eine rdd zurück und' .transform' muss eine rdd als Eingabe nehmen und gibt rdd zurück (im Falle, dass 'sparkStreamCtx.sparkContext.parallelize' nicht verwendet wird, ist das zurückgegebene Array nicht rdd). –