2014-04-28 4 views
11

Ich möchte alle Textdateien in einem Hadoop-Verzeichnis durchlaufen und alle Vorkommen des Worts "error" zählen. Gibt es eine Möglichkeit, eine hadoop fs -ls /users/ubuntu/ zu tun, um alle Dateien in einem Verzeichnis mit der Apache Spark Scala API aufzulisten?Verwenden Sie Spark, um alle Dateien in einem Hadoop HDFS-Verzeichnis aufzulisten?

Aus dem gegebenen first example, der Funken Zusammenhang scheint nur Zugriff auf Dateien einzeln durch so etwas wie:

val file = spark.textFile("hdfs://target_load_file.txt") 

In meinem Problem, ich weiß nicht, wie viele noch die Namen der Dateien im HDFS Ordner vorher . Guckte auf die spark context docs aber konnte diese Art von Funktionalität nicht finden.

Antwort

14

Sie können einen Platzhalter verwenden:

val errorCount = sc.textFile("hdfs://some-directory/*") 
        .flatMap(_.split(" ")).filter(_ == "error").count 
+0

Was passiert, wenn ich hat den Namen der Datei, in der der Fehler aufgetreten berichten wollte? –

+2

Verwenden Sie 'sc.wholeTextFiles'. Siehe http://stackoverflow.com/questions/29521665/how-to-map-filenames-to-rdd-using-sc-textfiles3n-bucket-csv für so ziemlich diese Frage. –

1
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} 
import scala.collection.mutable.Stack 


val fs = FileSystem.get(sc.hadoopConfiguration) 
var dirs = Stack[String]() 
val files = scala.collection.mutable.ListBuffer.empty[String] 
val fs = FileSystem.get(sc.hadoopConfiguration) 

dirs.push("/user/username/") 

while(!dirs.isEmpty){ 
    val status = fs.listStatus(new Path(dirs.pop())) 
    status.foreach(x=> if(x.isDirectory) dirs.push(x.getPath.toString) else 
    files+= x.getPath.toString) 
} 
files.foreach(println)