2016-08-03 14 views
2

Ich habe eine TSV-Datei, wobei die erste Zeile der Header ist. Ich möchte ein JavaPairRDD aus dieser Datei erstellen. Derzeit mache ich so mit dem folgenden Code:Die beste Möglichkeit, TSV-Datei mit Apache Spark in Java zu lesen

TsvParser tsvParser = new TsvParser(new TsvParserSettings()); 
List<String[]> allRows; 
List<String> headerRow; 
try (BufferedReader reader = new BufferedReader(new FileReader(myFile))) { 
     allRows = tsvParser.parseAll((reader)); 
     //Removes the header row 
     headerRow = Arrays.asList(allRows.remove(0)); 
    } 
JavaPairRDD<String, MyObject> myObjectRDD = javaSparkContext 
      .parallelize(allRows) 
      .mapToPair(row -> new Tuple2<>(row[0], myObjectFromArray(row))); 

Ich frage mich, ob es eine Möglichkeit, die javaSparkContext zu haben war direkt die Datei lesen und verarbeiten, anstatt den Betrieb in zwei Teile aufgeteilt wird.

EDIT: Dies ist kein Duplikat von How do I convert csv file to rdd, denn ich suche eine Antwort in Java, nicht in Scala.

+0

verwenden müssen Mögliches Duplikat von [Wie konvertiere ich die csv-Datei in rdd] (http: // stackove rflow.com/questions/22299427/how-do-i-convert-csv-file-to-rdd) – zero323

+0

Es ist kein Duplikat, weil ich in Java eine Antwort suche, nicht in Scala. – alexgbelov

Antwort

3

Verwendung https://github.com/databricks/spark-csv

import org.apache.spark.sql.SQLContext 

SQLContext sqlContext = new SQLContext(sc); 
DataFrame df = sqlContext.read() 
    .format("com.databricks.spark.csv") 
    .option("inferSchema", "true") 
    .option("header", "true") 
    .option("delimiter","\t") 
    .load("cars.csv"); 

df.select("year", "model").write() 
    .format("com.databricks.spark.csv") 
    .option("header", "true") 
    .save("newcars.csv"); 
+0

Ich schaute hinein, und ich mag es nicht, dass es keine einfache Möglichkeit gibt, einen Dataframe zu einem JavaRDD von benutzerdefinierten Objekten zu konvertieren. Sie müssen jede Zeile manuell analysieren. – alexgbelov

+0

Ich bin neugierig, warum Sie eine RDD über einen Datenrahmen in der Instanz von einzigartigen Objekten Anforderung wünschen. @alexgbelov – mark

+0

Der Rest meiner Logik verwendet RDDs; Ich brauchte nur eine Möglichkeit, die Daten so sauber wie möglich einzulesen. – alexgbelov

0

Ich bin der Autor von uniVocity-parsers und kann man nicht viel mit Funken helfen, aber ich glaube, so etwas wie dies für Sie arbeiten:

parserSettings.setHeaderExtractionEnabled(true); //captures the header row 

parserSettings.setProcessor(new AbstractRowProcessor(){ 
     @Override 
     public void rowProcessed(String[] row, ParsingContext context) { 
      String[] headers = context.headers() //not sure if you need them 
      JavaPairRDD<String, MyObject> myObjectRDD = javaSparkContext 
        .mapToPair(row -> new Tuple2<>(row[0], myObjectFromArray(row))); 
      //process your stuff. 
     } 
    }); 

Wenn Sie die Verarbeitung jeder Zeile paralellize möchten,

parserSettings.setProcessor(new ConcurrentRowProcessor(new AbstractRowProcessor(){ 
     @Override 
     public void rowProcessed(String[] row, ParsingContext context) { 
      String[] headers = context.headers() //not sure if you need them 
      JavaPairRDD<String, MyObject> myObjectRDD = javaSparkContext 
        .mapToPair(row -> new Tuple2<>(row[0], myObjectFromArray(row))); 
      //process your stuff. 
     } 
    }, 1000)); //1000 rows loaded in memory. 

Dann rufen Sie einfach zu analysieren: Sie können eine ConcurrentRowProcessor wickeln

new TsvParser(parserSettings).parse(myFile); 

Hoffe das hilft!

1

Versuchen Sie den folgenden Code, um CSV-Datei zu lesen und JavaPairRDD zu erstellen.

public class SparkCSVReader { 

public static void main(String[] args) { 

    SparkConf conf = new SparkConf().setAppName("CSV Reader"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 
    JavaRDD<String> allRows = sc.textFile("c:\\temp\\test.csv");//read csv file 
    String header = allRows.first();//take out header 
    JavaRDD<String> filteredRows = allRows.filter(row -> !row.equals(header));//filter header 
    JavaPairRDD<String, MyCSVFile> filteredRowsPairRDD = filteredRows.mapToPair(parseCSVFile);//create pair 
    filteredRowsPairRDD.foreach(data -> { 
     System.out.println(data._1() + " ### " + data._2().toString());// print row and object 
    }); 
    sc.stop(); 
    sc.close(); 
} 

private static PairFunction<String, String, MyCSVFile> parseCSVFile = (row) -> { 
    String[] fields = row.split(","); 
    return new Tuple2<String, MyCSVFile>(row, new MyCSVFile(fields[0], fields[1], fields[2])); 
}; 

} 

Sie können auch Databricks Funken csv (https://github.com/databricks/spark-csv) verwenden. Spark-CSV ist auch in Spark 2.0.0 enthalten.

+0

Vielen Dank. Ich habe in Databricks spark-csv nachgeschaut, aber das hat mir nicht gefallen, da man jede Zeile manuell parsen muss, was ich ohnehin schon mache. – alexgbelov

0

Apache Spark-2.x verfügen über integrierte in csv Leser so dass Sie nicht https://github.com/databricks/spark-csv

import org.apache.spark.sql.Dataset; 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.SparkSession; 

/** 
* 
* @author cpu11453local 
*/ 
public class Main { 
    public static void main(String[] args) { 


     SparkSession spark = SparkSession.builder() 
       .master("local") 
       .appName("meowingful") 
       .getOrCreate(); 

     Dataset<Row> df = spark.read() 
        .option("header", "true") 
        .option("delimiter","\t") 
        .csv("hdfs://127.0.0.1:9000/data/meow_data.csv"); 

     df.show(); 
    } 
} 

Und Maven Datei pom.xml

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 
    <groupId>com.meow.meowingful</groupId> 
    <artifactId>meowingful</artifactId> 
    <version>1.0-SNAPSHOT</version> 
    <packaging>jar</packaging> 
    <properties> 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
     <maven.compiler.source>1.8</maven.compiler.source> 
     <maven.compiler.target>1.8</maven.compiler.target> 
    </properties> 

    <dependencies> 
     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 --> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-core_2.11</artifactId> 
      <version>2.2.0</version> 
     </dependency> 


     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-sql_2.11</artifactId> 
      <version>2.2.0</version> 
     </dependency> 
    </dependencies> 

</project>