Ich habe überall nach dem Namen jeder Datei gesucht, die Spark Streaming in der Methode fileStream() aufruft. Es gibt einige Teillösungen in Java, aber ich konnte kein Scala-Beispiel finden. Es gibt auch nicht vollständige Empfehlungen mit FileInputFormat, das nicht klar ist, wie zu verwenden ist. Jeder Scala-Beispielcode würde sehr geschätzt werden.Wie finde ich den Dateinamen in Spark Streaming mit FileStream() in Scala?
1
A
Antwort
3
Dies ist, wie ich dieses Problem gelöst durch Ortung und eine Reihe von Antworten auf ähnliche Fragen kombiniert:
def fileNameFilter(path: Path): Boolean = {
if (path.getName().contains("COPYING")) {
logger.info("*** ignoring incomplete file: " + path.getName())
return false
} else {
return true
}
}
def deleteFile(sc: SparkContext, fileName: String): Unit = {
val filePath = new Path(fileName)
val fs = FileSystem.get(new Configuration())
if (fs.isDirectory(filePath)) {
fs.listStatus(filePath).foreach((status) => {
fs.delete(status.getPath(), true)
})
} else {
fs.delete(filePath, true)
}
}
val ssc = new StreamingContext(sc, Seconds(5))
val mfStream = ssc.fileStream[LongWritable,Text,TextInputFormat](pathToMyFiles, x=>fileNameFilter(x), true)
mfStream.foreachRDD(rdd => {
....some business logic
if (!rdd.partitions.isEmpty) {
regExp.findAllMatchIn(rdd.toDebugString).foreach(name => {
logger.info("Deleting processed File(s): " + name.toString)
deleteFile(sc, name.toString)
})
}
})
Hope this andere Menschen mit ähnlichen Bedürfnissen helfen ...