2014-09-18 5 views
18

Ich benutze den folgenden Befehl, um eine RDD mit einer Reihe von Arrays mit 2 Strings ["Dateiname", "Inhalt"] füllen.Wie wiederhole ich RDDs in Apache Spark (Scala)

Jetzt möchte ich über jedes dieser Vorkommen iterieren, um etwas mit jedem Dateinamen und Inhalt zu tun.

val someRDD = sc.wholeTextFiles("hdfs://localhost:8020/user/cloudera/*") 

Ich kann jedoch keine Dokumentation finden, wie dies zu tun ist.

Also, was ich will, ist dies:

foreach occurrence-in-the-rdd{ 
    //do stuff with the array found on loccation n of the RDD 
} 
+0

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD – Malcolm

Antwort

6

Die grundlegenden Operationen sind in Funken map und filter.

val txtRDD = someRDD filter { case(id, content) => id.endsWith(".txt") } 

die txtRDD wird nun nur Dateien enthalten, die die Endung „.txt“

haben und wenn Sie diese Dateien zu Wort zählen möchten, können Sie sagen

//split the documents into words in one long list 
val words = txtRDD flatMap { case (id,text) => text.split("\\s+") } 
// give each word a count of 1 
val wordT = words map (x => (x,1)) 
//sum up the counts for each word 
val wordCount = wordsT reduceByKey((a, b) => a + b) 

Sie wollen mapPartitions verwenden Wenn Sie eine kostspielige Initialisierung durchführen müssen, müssen Sie das ausführen - zum Beispiel, wenn Sie mit einer Bibliothek wie den Stanford coreNLP-Werkzeugen die Erkennung von benannten Entitäten durchführen möchten.

Master map, filter, flatMap und reduce, und Sie sind auf dem besten Weg, Spark zu meistern.

25

Sie verschiedene Methoden auf dem RDD aufrufen, die Funktionen als Parameter akzeptieren.

// set up an example -- an RDD of arrays 
val sparkConf = new SparkConf().setMaster("local").setAppName("Example") 
val sc = new SparkContext(sparkConf) 
val testData = Array(Array(1,2,3), Array(4,5,6,7,8)) 
val testRDD = sc.parallelize(testData, 2) 

// Print the RDD of arrays. 
testRDD.collect().foreach(a => println(a.size)) 

// Use map() to create an RDD with the array sizes. 
val countRDD = testRDD.map(a => a.size) 

// Print the elements of this new RDD. 
countRDD.collect().foreach(a => println(a)) 

// Use filter() to create an RDD with just the longer arrays. 
val bigRDD = testRDD.filter(a => a.size > 3) 

// Print each remaining array. 
bigRDD.collect().foreach(a => { 
    a.foreach(e => print(e + " ")) 
    println() 
    }) 
} 

Beachten Sie, dass die Funktionen, die Sie ein einzelnes Element RDD als Eingabe akzeptieren schreiben und Daten von einem gewissen einheitlichen Typ zurückgeben, so dass Sie eine RDD des letzteren Typs erstellen. Zum Beispiel ist countRDD ein RDD[Int], während bigRDD immer noch ein RDD[Array[Int]] ist.

Es wird wahrscheinlich verlockend sein, irgendwann eine foreach schreiben, die einige andere Daten ändert, aber Sie sollten aus den Gründen in this question and answer widerstehen.

Edit: Versuchen Sie nicht, große RDD s

Mehrere Leser über die Verwendung von collect() und println(), um ihre Ergebnisse zu sehen, wie im Beispiel oben gefragt haben, zu drucken. Dies funktioniert natürlich nur, wenn Sie in einem interaktiven Modus wie dem Spark REPL (read-eval-print-loop) arbeiten. Am besten rufen Sie collect() auf dem RDD auf, um ein sequentielles Array für den ordnungsgemäßen Druck zu erhalten. Aber collect() kann zu viele Daten zurückbringen und auf jeden Fall zu viel gedruckt werden. Hier sind einige alternative Möglichkeiten, Einblick in Ihre RDD s zu erhalten, wenn sie sind groß:

  1. RDD.take(): Das gibt Ihnen die Feinsteuerung über die Anzahl der Elemente, die Sie erhalten, aber nicht, woher sie kamen - definiert als die "erste", das ist ein Konzept, das durch verschiedene andere Fragen und Antworten hier behandelt wird.

    // take() returns an Array so no need to collect() 
    myHugeRDD.take(20).foreach(a => println(a)) 
    
  2. RDD.sample(): Auf diese Weise können Sie (grob) den Anteil der Ergebnisse steuern Sie erhalten, ob mit einer Stichprobe verwendet Ersatz und auch optional die Zufallszahl Samen.

    // sample() does return an RDD so you may still want to collect() 
    myHugeRDD.sample(true, 0.01).collect().foreach(a => println(a)) 
    
  3. RDD.takeSample(): Dies ist ein Hybrid: mit Stichproben, die Sie steuern können, aber beide lassen Sie die genaue Anzahl der Ergebnisse festlegen und eine Array zurück.

    // takeSample() returns an Array so no need to collect() 
    myHugeRDD.takeSample(true, 20).foreach(a => println(a)) 
    
  4. RDD.count(): Manchmal kommt die beste Einsicht aus, wie viele Elemente, die Sie mit am Ende - ich dieses erste oft tun.

    println(myHugeRDD.count())  
    
+0

Gibt es nicht das Risiko, dass die verschiedenen Arbeiter in einigen unkoordinierten Drucken werden Mode, ihre Linien vermischend? Wenn es trotzdem funktioniert, ist das dokumentiert? – EOL

+1

@EOL Ja, ich war schlampig - ich habe gerade meine Antwort bearbeitet, um 'collect()' Anrufe hinzuzufügen, wenn 'RDD's gedruckt werden. –

+0

Nice, zur Verfügung gestellt komplettes Beispiel sogar zeigt Setup von sc! – JimLohse

2

Ich würde versuchen, die Verwendung einer Partition Mapping-Funktion machen.Der folgende Code zeigt, wie ein gesamter RDD-Datensatz in einer Schleife verarbeitet werden kann, so dass jede Eingabe die gleiche Funktion durchläuft. Ich fürchte, ich habe kein Wissen über Scala, also ist alles, was ich zu bieten habe, java code. Es sollte jedoch nicht sehr schwierig sein, es in Scala zu übersetzen.

JavaRDD<String> res = file.mapPartitions(new FlatMapFunction <Iterator<String> ,String>(){ 
     @Override 
     public Iterable<String> call(Iterator <String> t) throws Exception { 

      ArrayList<String[]> tmpRes = new ArrayList <>(); 
      String[] fillData = new String[2]; 

      fillData[0] = "filename"; 
      fillData[1] = "content"; 

      while(t.hasNext()){ 
       tmpRes.add(fillData); 
      } 

      return Arrays.asList(tmpRes); 
     } 

}).cache(); 
+0

Wird das tatsächlich parallel ausgeführt? Ich dachte immer wenn eine for oder while-Schleife in einer anonymen Funktion in Spark aufgerufen wird, wird sie sequentiell in einem einzigen Executor ausgeführt – JimLohse

1

was die wholeTextFiles Rückkehr ist ein Paar RDD:

def wholeTextFiles (path: String, minPartitions: Int): RDD [(String, String)]

ein Verzeichnis von Textdateien Lesen von HDFS, ein lokales Dateisystem (auf allen Knoten verfügbar) oder jeder von Hadoop unterstützte Dateisystem-URI. Jede Datei wird als einzelner Datensatz gelesen und in einem Schlüssel/Wert-Paar zurückgegeben, wobei der Schlüssel der Pfad jeder Datei ist und der Wert der Inhalt jeder Datei ist.

Hier ist ein Beispiel für das Lesen der Dateien auf einem lokalen Pfad, dann Drucken aller Dateinamen und Inhalte.

val conf = new SparkConf().setAppName("scala-test").setMaster("local") 
val sc = new SparkContext(conf) 
sc.wholeTextFiles("file:///Users/leon/Documents/test/") 
    .collect 
    .foreach(t => println(t._1 + ":" + t._2)); 

das Ergebnis:

file:/Users/leon/Documents/test/1.txt:{"name":"tom","age":12} 

file:/Users/leon/Documents/test/2.txt:{"name":"john","age":22} 

file:/Users/leon/Documents/test/3.txt:{"name":"leon","age":18} 

oder das Paar RDD Umwandlung in eine RDD ersten

sc.wholeTextFiles("file:///Users/leon/Documents/test/") 
    .map(t => t._2) 
    .collect 
    .foreach { x => println(x)} 

das Ergebnis:

{"name":"tom","age":12} 

{"name":"john","age":22} 

{"name":"leon","age":18} 

Und ich denke, wholeTextFiles nachgiebiger ist für kleine Dateien.