2016-05-10 7 views
4

Ich arbeite an einem Anwendungsfall, wo ich Daten von RDBMS zu HDFS übertragen muss. Wir haben das Benchmarking dieses Falles mit sqoop durchgeführt und herausgefunden, dass wir in der Lage sind, etwa 20 GB Daten in 6-7 Minuten zu übertragen.Apache Spark-SQL vs Sqoop Benchmarking beim Übertragen von Daten aus RDBMS zu HDFs

Wo, wenn ich das gleiche mit Spark SQL versuche, ist die Leistung sehr gering (1 GB von Datensätzen dauert 4 Minuten, um von Netezza auf hdfs zu übertragen). Ich versuche, etwas abzustimmen und seine Leistung zu erhöhen, aber es ist unwahrscheinlich, dass es auf das Niveau von sqoop abgestimmt wird (ungefähr 3 GB Daten in 1 Min).

Ich stimme der Tatsache zu, dass Spark in erster Linie eine Verarbeitungsmaschine ist, aber meine Hauptfrage ist, dass sowohl Funken und sqoop JDBC-Treiber intern verwenden, also warum gibt es so viel Unterschied in der Leistung (oder vielleicht bin ich etwas fehlt) . Ich poste meinen Code hier.

object helloWorld { 
    def main(args: Array[String]): Unit = { 
    val conf = new SparkConf().setAppName("Netezza_Connection").setMaster("local") 
    val sc= new SparkContext(conf) 
    val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 
    sqlContext.read.format("jdbc").option("url","jdbc:netezza://hostname:port/dbname").option("dbtable","POC_TEST").option("user","user").option("password","password").option("driver","org.netezza.Driver").option("numPartitions","14").option("lowerBound","0").option("upperBound","13").option("partitionColumn", "id").option("fetchSize","100000").load().registerTempTable("POC") 
    val df2 =sqlContext.sql("select * from POC") 
    val partitioner= new org.apache.spark.HashPartitioner(14) 
    val rdd=df2.rdd.map(x=>(String.valueOf(x.get(1)),x)).partitionBy(partitioner).values 
    rdd.saveAsTextFile("hdfs://Hostname/test") 
    } 
} 

Ich habe viele andere Post überprüfe, konnte aber keine klare Antwort auf die internen Arbeits und Tuning von Sqoop noch ich bekam Sqoop vs Funken SQL Benchmarking Kindly Hilfe zum Verständnis dieses Problems bekommen.

Antwort

2

Sie verwenden die falschen Werkzeuge für den Job.

Sqoop wird eine Reihe von Prozessen (auf den Datenseiten) starten, die jeweils eine Verbindung zu Ihrer Datenbank herstellen (siehe Num-Mapper) und sie werden jeweils einen Teil des Datensatzes extrahieren. Ich glaube nicht, dass Sie mit Spark eine Lese-Parallelität erreichen können.

Holen Sie den Datensatz mit Sqoop und verarbeiten Sie es dann mit Spark.

2

können Sie Folgendes versuchen: -

  1. Lesen von Daten von Netezza ohne Partitionen und mit erhöhter fetch_size zu einer Million.

    sqlContext.read.format("jdbc").option("url","jdbc:netezza://hostname:port/dbname").option("dbtable","POC_TEST").option("user","user").option("password","password").option("driver","org.netezza.Driver").option("fetchSize","1000000").load().registerTempTable("POC") 
    
  2. die Daten vor dem Schreiben in die endgültige Datei neu partitionieren.

    val df3 = df2.repartition(10) //to reduce the shuffle 
    
  3. ORC-Formate sind mehr optimiert als TEXT. Schreiben Sie die endgültige Ausgabe auf Parkett/ORC.

    df3.write.format("ORC").save("hdfs://Hostname/test") 
    
0

@amitabh Obwohl als eine Antwort markiert, stimme ich damit nicht überein.

Sobald Sie das Prädikat angeben, um die Daten beim Lesen aus dem jdbc zu partitionieren, führt Spark für jede Partition separate Tasks aus. In Ihrem Fall sollte keine der Aufgaben 14 sein (Sie können dies mit der Funke UI bestätigen).

Ich bemerke, dass Sie lokal als Master verwenden, die nur 1 Kern für Executoren bieten würde. Daher wird es keine Parallelität geben. Was ist in deinem Fall passiert?

Jetzt, um den gleichen Durchsatz wie sqoop zu erhalten, müssen Sie sicherstellen, dass diese Aufgaben parallel ausgeführt werden. Theoretisch kann dies entweder durch geschehen: 1. Mit 14 Zieher mit 1 Kern jedes 2. Mit 1 Exekutor mit 14 Kernen (anderen Ende des Spektrums)

Typischerweise würde ich mit 4-5 Kernen pro Exekutor gehen .Also teste ich die Leistung mit 15/5 = 3 Executoren (ich fügte 1 bis 14 hinzu, um 1 Kern für den Treiber zu betrachten, der im Clustor-Modus läuft). Verwenden Sie: executor.cores, executor.instances in sparkConf.set, um mit den configs zu spielen.

Wenn dies die Leistung nicht wesentlich erhöht, wäre die nächste Sache, den Executor-Speicher zu betrachten.

Schließlich würde ich die Anwendungslogik zwicken, um mapRDD Größen, Partitionsgrößen und Shuffle-Größen zu sehen.

+0

: - Vielen Dank für Ihre Kommentare .. Ich habe Master als „lokal“ in meinem Code gegeben becoz ich meine Firma Garn URL posten hier nicht in der Lage war zu .. In Wirklichkeit habe ich das auf Garnhaufen laufen lassen. Auch die Parallelität von 14 wird beim Schreiben der Daten auf hdfs und nicht während des Lesens erreicht. Beim Lesen gibt es nur einen Thread, der aus SQL db liest, was den Gesamtprozess sehr langsam macht. In diesem Fall denke ich, Marco Polo Antwort ist richtig. Das ist meine Meinung. Bitte zögern Sie nicht, mich zu korrigieren, falls mir etwas fehlt. Vielen Dank. –

+0

Wie viele Executoren werden dem Job zugewiesen? Können Sie die Verwendung von Spark UI überprüfen? – bigdatamann

0

Die folgende Lösung half mir


var df=spark.read.format("jdbc").option("url"," "url").option("user","user").option("password","password").option("dbTable","dbTable").option("fetchSize","10000").load() df.registerTempTable("tempTable") var dfRepart=spark.sql("select * from tempTable distribute by primary_key") //this will repartition the data evenly

dfRepart.write.format("parquet").save("hdfs_location")