2016-08-05 65 views
1

Ich bin neu zu funken. Ich befolge einige der grundlegenden Beispiele in der Dokumentation.Beste Vorgehensweise zum Durchschleifen einer CSV-Datei in Spark

Ich habe eine CSV-Datei wie folgt aus: (eine vereinfachte Version, die wirkliche hat fast 40.000 Zeilen)

date,category 
19900108,apples 
19900108,apples 
19900308,peaches 
19900408,peaches 
19900508,pears 
19910108,pears 
19910108,peaches 
19910308,apples 
19910408,apples 
19910508,apples 
19920108,pears 
19920108,peaches 
19920308,apples 
19920408,peaches 
19920508,pears 

Dieses Bit von scala Code funktioniert gut für die Kategorie zählen beträgt

val textFile = sc.textFile("sample.csv") 
textFile.filter(line => line.contains("1990")).filter(line =>line.contains("peaches")).count() 
textFile.filter(line => line.contains("1990")).filter(line => line.contains("apples")).count() 
textFile.filter(line => line.contains("1990")).filter(line => line.contains("pears")).count() 

Was ist der beste Ansatz zum Durchlaufen jeder Zeile, Hinzufügen von Kategorien Summen nach Jahr, so dass ich am Ende schreibe eine CSV-Datei wie folgt:

date,apples,peaches,pears 
1990,2,2,1 
1991,3,1,1 
1992,1,2,2 
date,apples,peaches,pears 
1990,2,2,1 
1991,3,1,1 
1992,1,2,2 

Jede Hilfe wäre willkommen.

+2

Mögliche Duplikat [Pivot Spark-Dataframe] (http://stackoverflow.com/questions/30244910/pivot-spark-dataframe) – zero323

Antwort

1
//Create Spark SQL Context  
val sqlContext = new SQLContext(sc) 

//read csv 
var df = sqlContext.read.format("com.databricks.spark.csv") 
    .option("header", "true") 
    .option("inferSchema", "true") 
    .load("sample.csv") 
df = df.withColumn("year", df.col("date").substr(0,4)) 
df = df.groupBy("year").pivot("category").agg("category"->"count") 
df.withColumn("total", df.col("apples").+(df.col("peaches")).+(df.col("pears"))).show() 

//Dependency required: 
<dependency> 
     <groupId>com.databricks</groupId> 
     <artifactId>spark-csv_2.10</artifactId> 
     <version>1.4.0</version> 
</dependency> 
+0

[Spark-2.0] (https://spark.apache.org/releases /spark-release-2-0-0.html#new-features) wurde vor kurzem veröffentlicht, es hat jetzt native CSV-Unterstützung :) – NikoNyrh

+0

Danke VenkatN für Ihre Antwort. Ich war in den letzten Tagen unter dem Wetter, also konnte ich mir das früher nicht ansehen. Als ich dieses Skript, das ich in dieser Zeile eine "Fehler Neuzuordnung val" Nachricht erhalte ausführen: df = df.withColumn (. "Jahr", df.col ("date") substr (0,4)) Mit Spark 2.0 bedeutet das, dass ich nicht das CSC-Paket von Databricks haben muss? – ronmac

+0

Ich erklärte "df" als var statt val aus dem gleichen Grund. Also, um "Fehler Neuzuweisung Val" zu vermeiden, müssen Sie das gleiche tun, oder Sie können eine neue Variable initialisieren, anstatt neu zu df wie: 'val df2 = df.withColumn (" Jahr ", df.col (" Datum ") .substr (0,4))' 'df2.groupBy (" Jahr "). pivot (" Kategorie "). agg (" Kategorie "->" count "). show()' – VenkatN