Ich habe Kanal Stream, die ich in HDFS über Funken speichern möchte. Im Folgenden finden Sie Funken Code, den ichFlume + Spark - Speichern von DStream in HDFS
object FlumePull {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(
"Usage: FlumePollingEventCount <host> <port>")
System.exit(1)
}
val batchInterval = Milliseconds(60000)
val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
val ssc = new StreamingContext(sparkConf, batchInterval)
val stream = FlumeUtils.createPollingStream(ssc, "localhost", 9999)
stream.map(x => x + "!!!!")
.saveAsTextFiles("/user/root/spark/flume_Map_", "_Mapout")
ssc.start()
ssc.awaitTermination()
}
}
leite Als ich meine spsark Streaming-Job starten, wird es ausgegeben speichert in HDFS aber Ausgang ist so etwas wie dieses:
[[email protected] ~]# hadoop fs -cat /user/root/spark/flume_Map_-1459450380000._Mapout/part-00000
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
[email protected]!!!!
Es speichert Gerinne Ereignis statt Daten aus Flume. Wie erhält man Daten daraus?
Dank