2016-05-16 10 views
1

Ich habe überall nach dem Namen jeder Datei gesucht, die Spark Streaming in der Methode fileStream() aufruft. Es gibt einige Teillösungen in Java, aber ich konnte kein Scala-Beispiel finden. Es gibt auch nicht vollständige Empfehlungen mit FileInputFormat, das nicht klar ist, wie zu verwenden ist. Jeder Scala-Beispielcode würde sehr geschätzt werden.Wie finde ich den Dateinamen in Spark Streaming mit FileStream() in Scala?

Antwort

3

Dies ist, wie ich dieses Problem gelöst durch Ortung und eine Reihe von Antworten auf ähnliche Fragen kombiniert:

def fileNameFilter(path: Path): Boolean = { 
    if (path.getName().contains("COPYING")) { 
     logger.info("*** ignoring incomplete file: " + path.getName()) 
     return false 
    } else { 
     return true 
    } 
} 

def deleteFile(sc: SparkContext, fileName: String): Unit = { 
    val filePath = new Path(fileName) 
    val fs = FileSystem.get(new Configuration()) 
    if (fs.isDirectory(filePath)) { 
     fs.listStatus(filePath).foreach((status) => { 
      fs.delete(status.getPath(), true) 
     }) 
    } else { 
     fs.delete(filePath, true) 
    } 
} 

val ssc = new StreamingContext(sc, Seconds(5)) 
val mfStream = ssc.fileStream[LongWritable,Text,TextInputFormat](pathToMyFiles, x=>fileNameFilter(x), true) 
mfStream.foreachRDD(rdd => { 
....some business logic 
if (!rdd.partitions.isEmpty) { 
    regExp.findAllMatchIn(rdd.toDebugString).foreach(name => { 
    logger.info("Deleting processed File(s): " + name.toString) 
    deleteFile(sc, name.toString) 
}) 
} 

}) 

Hope this andere Menschen mit ähnlichen Bedürfnissen helfen ...