2016-07-30 26 views
0

Ich bin ziemlich neu in Scala/Spark und ich wurde in das tiefe Ende geworfen. Ich habe mich seit einigen Wochen bemüht, eine Lösung für ein scheinbar einfaches Problem auf Scala 2.11.8 zu finden, konnte aber keine gute Lösung dafür finden. Ich habe eine große Datenbank im CSV-Format in der Nähe von 150 GB, mit vielen Null-Werten, die basierend auf den Werten der einzelnen Spalten reduziert und gereinigt werden müssen.Bedingte Zuordnung von Zeilen in einer CSV-Datei in Scala/Spark, um eine weitere CSV-Datei zu erzeugen

Das Schema der ursprünglichen CSV-Datei ist wie folgt:

  • Spalte 1: Doppel
  • Columnn 2: Integer
  • Spalte 3: Doppel
  • Spalte 4: Doppel
  • Columnn 5: Ganzzahl
  • Spalte 6: Doppel
  • Spalten 7: GanzzahlSo

, möchte ich bedingt zur Karte durch alle Reihen der CSV-Datei und exportieren Sie die Ergebnisse auf eine andere CSV-Datei mit den folgenden Bedingungen für jede Zeile:

  1. Wenn der Wert für Spalte 4 ist nicht null, dann sollten die Werte für die Spalten 4, 5, 6 und 7 dieser Zeile als ein Array namens lastValuesOf4to7 gespeichert werden. (Wenn das Element in Spalte 4 nicht null ist, sind die Spalten 1, 2 und 3 null und können ignoriert werden.)

  2. Wenn der Wert von Spalte 3 nicht null ist, werden die Werte von Spalte 1, 2 und 3 und die vier Elemente aus dem Array lastValuesOf4to7, wie oben beschrieben, sollten als neue Zeile in eine andere CSV-Datei namens condensed.csv exportiert werden. (In dem Datensatz, wenn das Element in der Spalte 3 nicht null ist, dann Spalten 4, 5, 6 & 7 sind null und kann ignoriert werden)

Also am Ende soll ich eine CSV-Datei erhält kondensierter genannt. CSV, die 7 Spalten hat.

Ich habe versucht, den folgenden Code in Scala verwendet haben, aber nicht in der Lage gewesen, weitere Fortschritte:

import scala.io.Source 

object structuringData { 
    def main(args: Array[String]) { 

    val data = Source.fromFile("/path/to/file.csv") 

    var lastValuesOf4to7 = Array("0","0","0","0") 

    val lines = data.getLines // Get the lines of the file 

    val splitLine = lines.map(s => s.split(',')).toArray // This gives an out of memory error since the original file is huge. 



    data.close 
    } 
} 

Wie Sie aus dem obigen Code sehen können, ich habe es in ein Array zu bewegen versucht, aber haben Ich kann nicht weiterarbeiten, da ich nicht jede Zeile einzeln verarbeiten kann.

Ich bin ziemlich sicher, dass es eine einfache Lösung für die Verarbeitung von CSV-Dateien auf Scala/Spark sein muss.

Antwort

0

Danke für die Antwort. Es ist mir gelungen, mit Bash Script selbst eine Lösung zu erstellen. Ich musste zuerst mit einer leeren condensed.csv Datei beginnen. Mein Code zeigt, wie einfach es war, dies zu erreichen:

#!/bin/bash 
OLDIFS=$IFS 
IFS="," 
last1=0 
last2=0 
last3=0 
last4=0 
while read f1 f2 f3 f4 f5 f6 f7 
do 
    if [[ $f4 != "" ]]; 
    then 
     last1=$f4 
     last2=$f5 
     last3=$f6 
     last4=$f7 

    elif [[ $f3 != "" ]]; 
    then 
     echo "$f1,$f2,$f3,$last1,$last2,$last3,$last4" >> path/to/condensed.csv 
    fi 

done < $1 
IFS=$OLDIFS 

Wenn das Skript mit dem Namen extractcsv.sh gespeichert wird, dann sollte es mit dem folgenden Format ausgeführt werden:

$ ./extractcsv.sh path/to/original/file.csv 

Dies geht nur zu bestätige meine Beobachtung, dass ETL bei Bash einfacher ist als bei Scala. Vielen Dank für Ihre Hilfe.

1

Verwenden Sie das Spark-CSV-Paket und verwenden Sie dann die Sql-Abfrage, um die Daten abzufragen und die Filter gemäß Ihrem Anwendungsfall zu erstellen und dann am Ende zu exportieren.

Wenn Sie Spark 2.0.0 verwenden, dann wird Spark-CSV in Spark-SQL vorhanden sein, oder wenn Sie eine alte Version verwenden, fügen Sie die Abhängigkeit entsprechend hinzu.

Hier finden Sie einen Link zum spark-csv hier.

Sie können auch am Beispiel hier: http://blog.madhukaraphatak.com/analysing-csv-data-in-spark/

+0

Vielen Dank für Ihre Antwort, Shivansh. Ich bin noch nicht mit Spark-SQL vertraut und muss das herausfinden. Ich verwende Spark 1.6.1 für Hadoop 2.6. –

+0

@Moose: Ich habe meine Antwort mit dem Link zum Beispiel aktualisiert. Wenn Sie die Antwort gut genug finden, bitte upvote und akzeptieren Sie! Vielen Dank :) –