Der einfachste Weg ist, zu schreiben über den Datenrahmen des RDD zur Karte und verwendet mkString:
df.rdd.map(x=>x.mkString(","))
Als Spark 1.5 (oder schon vorher) df.map(r=>r.mkString(","))
würde das gleiche tun Wenn Sie CSV-Escaping möchten, können Sie Apache Commons lang dafür verwenden. z.B. hier ist der Code, den wir
def DfToTextFile(path: String,
df: DataFrame,
delimiter: String = ",",
csvEscape: Boolean = true,
partitions: Int = 1,
compress: Boolean = true,
header: Option[String] = None,
maxColumnLength: Option[Int] = None) = {
def trimColumnLength(c: String) = {
val col = maxColumnLength match {
case None => c
case Some(len: Int) => c.take(len)
}
if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
}
def rowToString(r: Row) = {
val st = r.mkString("~-~").replaceAll("[\\p{C}|\\uFFFD]", "") //remove control characters
st.split("~-~").map(trimColumnLength).mkString(delimiter)
}
def addHeader(r: RDD[String]) = {
val rdd = for (h <- header;
if partitions == 1; //headers only supported for single partitions
tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
rdd.getOrElse(r)
}
val rdd = df.map(rowToString).repartition(partitions)
val headerRdd = addHeader(rdd)
if (compress)
headerRdd.saveAsTextFile(path, classOf[GzipCodec])
else
headerRdd.saveAsTextFile(path)
}
Ich habe das Koalesce-Ding ausprobiert, das du erwähnt hast. Es erstellt ein Verzeichnis am angegebenen Pfad mit einer "Teil" -Datei und einer Datei namens "_SUCCESS". Weißt du einen Weg, um tatsächlich nur die eine Datei zu bekommen? –
Nein, ich denke es gibt keinen Weg es zu tun. – sag