Ich habe einige Zwischendaten, die ich in HDFS und lokal gespeichert werden muss. Ich benutze Spark 1.6. In HDFS als Zwischenform bekomme ich Daten in /output/testDummy/part-00000
und /output/testDummy/part-00001
. Ich möchte diese Partitionen lokal mit Java/Scala speichern, so dass ich sie als /users/home/indexes/index.nt
(durch Zusammenführen von beiden in lokalen) oder /users/home/indexes/index-0000.nt
und /home/indexes/index-0001.nt
getrennt speichern konnte.Speichern Sie eine Funke RDD mit mapPartition mit Iterator
Hier ist mein Code: Hinweis: testDummy ist das gleiche wie Test, Ausgabe ist mit zwei Partitionen. Ich möchte sie separat oder kombiniert aber lokal mit index.nt
Datei speichern. Ich speichere lieber getrennt in zwei Datenknoten. Ich verwende einen Cluster und übergebe einen Spark-Job auf YARN. Ich habe auch einige Kommentare hinzugefügt, wie oft und welche Daten ich bekomme. Wie könnte ich tun? Jede Hilfe wird geschätzt.
val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+"/testDummy")
println("testDummy done") //1 time print
def savesData(iterator: Iterator[(String)]): Iterator[(String)] = {
println("Inside savesData") // now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2
println("iter size"+iterator.size) // 2 735 2 735 values
val filenamesWithExtension = outputPath + "/index.nt"
println("filenamesWithExtension "+filenamesWithExtension.length) //4 times
var list = List[(String)]()
val fileWritter = new FileWriter(filenamesWithExtension,true)
val bufferWritter = new BufferedWriter(fileWritter)
while (iterator.hasNext){ //iterator.hasNext is false
println("inside iterator") //0 times
val dat = iterator.next()
println("datadata "+iterator.next())
bufferWritter.write(dat + "\n")
bufferWritter.flush()
println("index files written")
val dataElements = dat.split(" ")
println("dataElements") //0
list = list.::(dataElements(0))
list = list.::(dataElements(1))
list = list.::(dataElements(2))
}
bufferWritter.close() //closing
println("savesData method end") //4 times when coal=2
list.iterator
}
println("before saving data into local") //1
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData)
println("testRDD partitions "+test.getNumPartitions) //2
println("testRDD size "+test.collect().length) //0
println("after saving data into local") //1
PS: Ich folgte, this und this aber nicht genau gleiche, was ich suche, habe ich irgendwie, aber nicht immer alles in index.nt
Scala hat die Welt ein bisschen besser gemacht, indem wir 'list.: :(dataElements (2))' äquivalent zu 'dataElements (2) :: list' machen, also mach Scala nicht traurig und benutze diese Syntax bei zumindest wenn die Methoden operatorähnlich sind. Übrigens wäre hier wahrscheinlich ein 'ListBuffer' besser geeignet – Dici