2016-07-07 14 views
10

Ich habe eine Anforderung zum Laden von Daten aus einer Hive-Tabelle mit Spark-SQL HiveContext und laden in HDFS. Standardmäßig hat die DataFrame SQL-Ausgabe 2 Partitionen. Um mehr Parallelität zu erreichen, brauche ich mehr Partitionen aus dem SQL. Es gibt keine überladene Methode in HiveContext, um die Anzahl der Partitionen zu übernehmen.Wie Ändern der Partitionsgröße in Spark SQL

Die Neupartitionierung der RDD verursacht ein Mischen und führt zu mehr Verarbeitungszeit.

val result = sqlContext.sql("select * from bt_st_ent") 

Hat die Protokollausgabe von:

Starting task 0.0 in stage 131.0 (TID 297, aster1.com, partition 0,NODE_LOCAL, 2203 bytes) 
Starting task 1.0 in stage 131.0 (TID 298, aster1.com, partition 1,NODE_LOCAL, 2204 bytes) 

Ich mag wissen würde, ist es eine Möglichkeit, die Partitionen Größe der SQL-Ausgabe zu erhöhen.

Antwort

-1

Ein sehr häufiges und schmerzhaftes Problem. Sie sollten nach einem Schlüssel suchen, der die Daten in einheitlichen Partitionen verteilt. Sie können die Operatoren DISTRIBUTE BY und CLUSTER BY verwenden, um spark zu bestimmen, um Zeilen in einer Partition zu gruppieren. Dies verursacht einen gewissen Mehraufwand für die Abfrage selbst. Dies führt jedoch zu gleichmäßig großen Partitionen. Deepsense hat ein sehr gutes Tutorial zu diesem Thema.

-1

Wenn Ihr SQL einen Shuffle führt (zum Beispiel hat es eine Verknüpfung, oder irgendeine Art von Gruppe), Sie Eigenschaft, um die Anzahl der Partitionen, indem der ‚spark.sql.shuffle.partitions‘ einstellen

sqlContext.setConf("spark.sql.shuffle.partitions", 64) 

Im Anschluss an das, was Fokko vorschlägt, könnten Sie eine zufällige Variable verwenden, um nach zu gruppieren.

val result = sqlContext.sql(""" 
    select * from (
    select *,random(64) as rand_part from bt_st_ent 
    ) cluster by rand_part""") 
3

Funken < 2.0:

Sie Hadoop Konfigurationsoptionen verwenden:

  • mapred.min.split.size.
  • mapred.max.split.size

sowie HDFS Blockgröße für die Partitionsgröße zu steuern Dateisystem basiert Formate.

val minSplit: Int = ??? 
val maxSplit: Int = ??? 

sc.hadoopConfiguration.setInt("mapred.min.split.size", minSplit) 
sc.hadoopConfiguration.setInt("mapred.max.split.size", maxSplit) 

Funken 2.0+:

Sie spark.sql.files.maxPartitionBytes Konfiguration verwenden können:

spark.conf.set("spark.sql.files.maxPartitionBytes", maxSplit) 

In beiden Fällen sind diese Werte im Einsatz sein können, nicht durch eine bestimmte Datenquelle API so sollten Sie immer Überprüfen Sie die Dokumentation/Implementierungsdetails des von Ihnen verwendeten Formats.

+0

Dies funktionierte nicht in unserem Cluster für Spark 2.1.1 mit Datensatz Luckylukee