2016-04-05 2 views
0

In Spark habe ich mehrere Dateien von s3 mit sc.binaryFiles heruntergeladen. Die resultierende RDD hat den Schlüssel als Dateiname und der Wert hat den Inhalt der Datei. Ich habe den Dateiinhalt dekomprimiert, CSV geparst und in einen Datenrahmen konvertiert. So, jetzt habe ich ein PairRDD [String, DataFrame]. Das Problem, das ich habe, ist, dass ich die Datei zu HDFS unter Verwendung des Schlüssels als der Dateiname speichern und den Wert als Parkettdatei speichern will, die einen überschreibt, wenn es bereits existiert. Das habe ich bis jetzt erreicht.Wie speichere ich eine Datei in einem Spark PairRDD mit dem Schlüssel als Dateiname und dem Wert als Inhalt?

val files = sc.binaryFiles(lFiles.mkString(","), 250).mapValues(stream => sc.parallelize(readZipStream(new ZipInputStream(stream.open)))) 
val tables = files.mapValues(file => { 
    val header = file.first.split(",") 
    val schema = StructType(header.map(fieldName => StructField(fieldName, StringType, true))) 
    val lines = file.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }.flatMap(x => x.split("\n")) 
    val rowRDD = lines.map(x => Row.fromSeq(x.split(","))) 
    sqlContext.createDataFrame(rowRDD, schema) 
}) 

Wenn Sie einen Rat haben, lassen Sie es mich bitte wissen. Ich würde es schätzen.

Danke, Ben

+0

naive Ansatz: Wenn Ihr Schlüssel Kardinalität sind niedrig, Sie können sie sammeln, iterieren auf sie Filter auf diesen Schlüssel dann schreiben Sie es in die Festplatte mit dem Pfad gleich Schlüssel. – eliasah

Antwort

0

die Möglichkeit, Dateien zu HDFS in Funken zu sparen, ist das gleiche zu hadoop. Sie müssen also eine Klasse erstellen, die MultipleTextOutputFormat erstreckt, in benutzerdefinierten Klasse können Sie Ausgabe-Dateinamen definieren yourself.the Beispiel ist unten:

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { 
    override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = { 
     "realtime-" + new SimpleDateFormat("yyyyMMddHHmm").format(new Date()) + "00-" + name 
    } 
} 

der gerufene Code ist unten:

RDD.rddToPairRDDFunctions(rdd.map { case (key, list) => 
    (NullWritable.get, key) 
}).saveAsHadoopFile(input, classOf[NullWritable], classOf[String], classOf[RDDMultipleTextOutputFormat]) 
+0

Funktioniert das tatsächlich mit S3 Native FS ohne HDFS? Ich frage mich, wann die Datei tatsächlich auf s3 hochgeladen wird, wahrscheinlich wenn der Job beendet ist? Weil die letzten X-Datensätze zu allen X-Dateien gehören können ... Also kann nichts zu s3 hochgeladen werden, bevor der letzte Datensatz verarbeitet wird, oder? – lisak