2016-04-01 9 views
2

Ich habe Kanal Stream, die ich in HDFS über Funken speichern möchte. Im Folgenden finden Sie Funken Code, den ichFlume + Spark - Speichern von DStream in HDFS

object FlumePull { 
    def main(args: Array[String]) { 
    if (args.length < 2) { 
     System.err.println(
     "Usage: FlumePollingEventCount <host> <port>") 
     System.exit(1) 
    } 

    val batchInterval = Milliseconds(60000) 
    val sparkConf = new SparkConf().setAppName("FlumePollingEventCount") 
    val ssc = new StreamingContext(sparkConf, batchInterval) 
    val stream = FlumeUtils.createPollingStream(ssc, "localhost", 9999) 

    stream.map(x => x + "!!!!") 
      .saveAsTextFiles("/user/root/spark/flume_Map_", "_Mapout") 

    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

leite Als ich meine spsark Streaming-Job starten, wird es ausgegeben speichert in HDFS aber Ausgang ist so etwas wie dieses:

[[email protected] ~]# hadoop fs -cat /user/root/spark/flume_Map_-1459450380000._Mapout/part-00000 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 
[email protected]!!!! 

Es speichert Gerinne Ereignis statt Daten aus Flume. Wie erhält man Daten daraus?

Dank

Antwort

0

Sie müssen die zugrunde liegenden Puffer aus der SparkFlumeEvent und speichern extrahieren. Zum Beispiel, wenn Ihr Ereigniskörper ein String:

stream.map(x => new String(x.event.getBody.array) + "!!!!") 
     .saveAsTextFiles("/user/root/spark/flume_Map_", "_Mapout") 
ist