2016-06-26 6 views
2

Ich habe einige Zwischendaten, die ich in HDFS und lokal gespeichert werden muss. Ich benutze Spark 1.6. In HDFS als Zwischenform bekomme ich Daten in /output/testDummy/part-00000 und /output/testDummy/part-00001. Ich möchte diese Partitionen lokal mit Java/Scala speichern, so dass ich sie als /users/home/indexes/index.nt (durch Zusammenführen von beiden in lokalen) oder /users/home/indexes/index-0000.nt und /home/indexes/index-0001.nt getrennt speichern konnte.Speichern Sie eine Funke RDD mit mapPartition mit Iterator

Hier ist mein Code: Hinweis: testDummy ist das gleiche wie Test, Ausgabe ist mit zwei Partitionen. Ich möchte sie separat oder kombiniert aber lokal mit index.nt Datei speichern. Ich speichere lieber getrennt in zwei Datenknoten. Ich verwende einen Cluster und übergebe einen Spark-Job auf YARN. Ich habe auch einige Kommentare hinzugefügt, wie oft und welche Daten ich bekomme. Wie könnte ich tun? Jede Hilfe wird geschätzt.

val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+"/testDummy") 
println("testDummy done") //1 time print 

def savesData(iterator: Iterator[(String)]): Iterator[(String)] = { 
    println("Inside savesData")         // now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2 
    println("iter size"+iterator.size)       // 2 735 2 735 values 
    val filenamesWithExtension = outputPath + "/index.nt" 
    println("filenamesWithExtension "+filenamesWithExtension.length) //4 times 
    var list = List[(String)]() 

    val fileWritter = new FileWriter(filenamesWithExtension,true) 
    val bufferWritter = new BufferedWriter(fileWritter) 

    while (iterator.hasNext){      //iterator.hasNext is false 
     println("inside iterator")     //0 times 
     val dat = iterator.next() 
     println("datadata "+iterator.next()) 

     bufferWritter.write(dat + "\n") 
     bufferWritter.flush() 
     println("index files written") 

     val dataElements = dat.split(" ") 
     println("dataElements")         //0 
     list = list.::(dataElements(0)) 
     list = list.::(dataElements(1)) 
     list = list.::(dataElements(2)) 
    } 
    bufferWritter.close() //closing 
    println("savesData method end")       //4 times when coal=2 
    list.iterator 
} 

println("before saving data into local")        //1 
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData) 
println("testRDD partitions "+test.getNumPartitions)        //2 
println("testRDD size "+test.collect().length)        //0 
println("after saving data into local") //1 

PS: Ich folgte, this und this aber nicht genau gleiche, was ich suche, habe ich irgendwie, aber nicht immer alles in index.nt

+2

Scala hat die Welt ein bisschen besser gemacht, indem wir 'list.: :(dataElements (2))' äquivalent zu 'dataElements (2) :: list' machen, also mach Scala nicht traurig und benutze diese Syntax bei zumindest wenn die Methoden operatorähnlich sind. Übrigens wäre hier wahrscheinlich ein 'ListBuffer' besser geeignet – Dici

Antwort

5

Ein paar Dinge:

  • Rufen Sie nie Iterator.size, wenn Sie Daten später verwenden möchten. Iterators sind TraversableOnce. Die einzige Möglichkeit, die Größe Iterator zu berechnen, besteht darin, alle Elemente zu durchlaufen, und danach müssen keine Daten mehr gelesen werden.
  • Verwenden Sie keine Transformationen wie mapPartitions für Nebenwirkungen. Wenn Sie eine Art von E/A-Aktionen wie foreach/foreachPartition ausführen möchten. Es ist eine schlechte Übung und garantiert nicht, dass ein gegebenes Stück Code nur einmal ausgeführt wird.
  • Lokaler Pfad in Aktion oder Transformationen ist ein lokaler Pfad eines bestimmten Arbeiters. Wenn Sie direkt auf den Client-Rechner schreiben möchten, sollten Sie die Daten zuerst mit collect oder toLocalIterator abrufen. Es könnte jedoch besser sein, auf verteilten Speicher zu schreiben und Daten später zu holen.
0

Java 7 bietet die Möglichkeit, Verzeichnisse zu überwachen.

https://docs.oracle.com/javase/tutorial/essential/io/notification.html

Die Idee ist, eine Uhr-Dienst zu erstellen, registrieren Sie es mit dem Verzeichnis von Interesse (erwähnen die Ereignisse Ihres Interesses, wie Datei Erstellen, Löschen, etc.), nicht sehen, werden Sie Wenn Sie über Ereignisse wie Erstellen, Löschen usw. informiert werden, können Sie die gewünschten Maßnahmen ergreifen.

Sie müssen, wo anwendbar, auf Java hdfs api stark angewiesen sein.

Führen Sie das Programm im Hintergrund aus, da es auf Ereignisse für immer wartet. (Sie können Logik schreiben, um zu beenden, nachdem Sie tun, was Sie wollen)

Auf der anderen Seite wird auch Shell-Skripting helfen.

Achten Sie beim Lesen von Dateien auf das Kohärenzmodell des hdfs-Dateisystems.

Hoffe das hilft mit einer Idee.