2016-03-21 5 views
4

ich Parkett-Datei hdfs schreibe mit dem folgenden Befehl: df.write.mode(SaveMode.Append).partitionBy(id).parquet(path)Funken nicht hdfs mit Parkett Partitionierung nutzt

Danach wie dieses Ich lese und Filtern der Datei:

val file = sqlContext.read.parquet(folder) 
val data = file.map(r => Row(r.getInt(4).toString, r.getString(0), r.getInt(1), 
    r.getLong(2), r.getString(3))) 

val filteredData = data.filter(x => x.thingId.equals("1")) 
filteredData.collect() 

Ich würde erwarten, , dass Spark die Partitionierung der Datei nutzen und nur die Partition von "thingId = 1" lesen würde. Tatsächlich liest Spark alle Partitionen der Datei und nicht nur die gefilterte (Partition mit thingId = 1). Wenn ich in den Protokollen suchen, kann ich sehen, dass es nicht alles gelesen:

16/03/21 01:32:33 INFO ParquetRelation: Leseparkett Datei (en) aus hdfs: // Sandbox. hortonworks.com/path/id=1/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParkettRelation: Lesen Parkett Datei (s) von hdfs: //sandbox.hortonworks.com/path/id=42/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01.32.33 INFO ParquetRelation: Lesen von Parkett-Datei (en) von hdfs: //sandbox.hortonworks.com/path/id=17/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01 : 32: 33 Ich NFO ParkettRelation: Lesen der Parkettdatei (en) von hdfs: //sandbox.hortonworks.com/path/0833/id=33/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParkettRelation: Parkett-Datei (en) lesen von hdfs: //sandbox.hortonworks.com/path/id=26/part-r-00000-b4e27b02-9a21-4915-89a7- 189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParkettRelation: Lesen von Parkett-Datei (en) von hdfs: //sandbox.hortonworks.com/path/id=12/part-r-00000-b4e27b02 -9a21-4915-89a7-189c30ca3fe3.gz.parquet

gibt es etwas, ich bin fehlt? Wenn ich mir die Dokumente ansehe, sollte Spark basierend auf dem Filter wissen, dass es nur Partition mit thingID = 1 lesen soll. Hat jemand von euch eine Idee was das Problem ist?

Antwort

5

Einige Probleme können Funken verhindern erfolgreich "nach unten drücken" Prädikate (dh mit Filter am Eingang-Format-Ebene):

  1. Filter-Pushdown- ist OFF: abhängig von Spark Version Sie verwenden Möglicherweise ist die Prädikat-Pushdown-Option (spark.sql.parquet.filterPushdown) deaktiviert. Es ist standardmäßig als Spark 1.5.0 - so überprüfen Sie Ihre Version und Konfiguration

  2. Filter ist „opaque“: Das scheint hier der Fall zu sein: Sie Parkett Datei einlegen, Abbilden jeder Zeile eine andere Zeile (Spalten neu anordnen?) und dann die filter-Methode, die eine -Funktion akzeptiert. Spark kann den Funktionscode nicht "lesen" und erkennen, dass er einen Vergleich in der Partitionierungsspalte verwendet - zu Spark, dies ist nur eine Row => Boolean Funktion, die alle Arten von Prüfungen durchführen kann ...

    Damit Filter Pushdown funktioniert, müssen Sie es vor dem Zuordnen der Datensätze in etwas, das von der ursprünglichen Struktur "getrennt" ist, verwenden und eine der filter Überladungen verwenden, die einen Filter verwendet, der von Spark analysiert werden kann Beispiel:

    // assuming the relevant column name is "id" in the parquet structure 
    val filtered = file.filter("id = 1") 
    
    // or: 
    val filtered = file.filter(col("id") === 1) 
    
    // and only then: 
    val data = filtered.map(r => Row(...)) 
    
+0

vielen Dank Tzach Zohar :) ich bin mit Spark-1.5.0, so dass das Problem genau war wie Sie in Option 2 undurchsichtige Filter beschrieben. Wenn ich vor dem Mappen der Zeile einen Filter mache, funktioniert es! Danke vielmals! – AlexL