In Spark habe ich mehrere Dateien von s3 mit sc.binaryFiles heruntergeladen. Die resultierende RDD hat den Schlüssel als Dateiname und der Wert hat den Inhalt der Datei. Ich habe den Dateiinhalt dekomprimiert, CSV geparst und in einen Datenrahmen konvertiert. So, jetzt habe ich ein PairRDD [String, DataFrame]. Das Problem, das ich habe, ist, dass ich die Datei zu HDFS unter Verwendung des Schlüssels als der Dateiname speichern und den Wert als Parkettdatei speichern will, die einen überschreibt, wenn es bereits existiert. Das habe ich bis jetzt erreicht.Wie speichere ich eine Datei in einem Spark PairRDD mit dem Schlüssel als Dateiname und dem Wert als Inhalt?
val files = sc.binaryFiles(lFiles.mkString(","), 250).mapValues(stream => sc.parallelize(readZipStream(new ZipInputStream(stream.open))))
val tables = files.mapValues(file => {
val header = file.first.split(",")
val schema = StructType(header.map(fieldName => StructField(fieldName, StringType, true)))
val lines = file.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }.flatMap(x => x.split("\n"))
val rowRDD = lines.map(x => Row.fromSeq(x.split(",")))
sqlContext.createDataFrame(rowRDD, schema)
})
Wenn Sie einen Rat haben, lassen Sie es mich bitte wissen. Ich würde es schätzen.
Danke, Ben
naive Ansatz: Wenn Ihr Schlüssel Kardinalität sind niedrig, Sie können sie sammeln, iterieren auf sie Filter auf diesen Schlüssel dann schreiben Sie es in die Festplatte mit dem Pfad gleich Schlüssel. – eliasah