3

Ich habe eine (ziemlich groß, denken 10e7 Zeilen) Datenrahmen, aus dem i-Elemente filtern basierend auf einer EigenschaftPartition Lage RDD/Dataframe

val res = data.filter(data(FieldNames.myValue) === 2).select(pk.name, FieldName.myValue) 

Meine Datenrahmen hat n Partitionen data.rdd.getNumPartitions

Jetzt will ich zu wissen, von welcher Partition meine Zeilen stammen. Ich bin mir bewusst, dass ich nur durch alle Partitionen mit so etwas wie dieses

val temp = res.first() //or foreach, this is just an example 
data.foreachPartition(f => { 
    f.exists(row => row.get(0)==temp.get(0)) 
    //my code here 
}) //compare PKs 

oder data.rdd.mapPartitionsWithIndex((idx, f) => ...)

jedoch laufen könnte, das scheint übertrieben und auch nicht sehr performant, wenn meine Ergebnisse und mein Datenrahmen wird groß.

Gibt es einen Spark-Weg, um dies zu tun, nachdem ich die filter() - Operation durchgeführt habe?

Oder gibt es alternativ eine Möglichkeit, um eine Alternative zu der filter() - Anweisung zu schreiben, so dass es den Ursprung der Zeile zurückgibt?

Ich könnte auch die Partitions Lage in meinem Datenrahmen speichern und auf repartitioning aktualisieren, aber ich würde es eher

(Die einzige ähnliche Frage, die ich war here, und weder die Frage gefunden in einen Funken Weise tun noch der Kommentar ist sehr hilfreich.Ich fand auch this die ähnlich sein könnte, aber nicht das gleiche)

Vielen Dank im Voraus für jede Hilfe/Zeiger und ich entschuldige mich, wenn ich eine Frage ähnlich wie meine, die bereits beantwortet wurde verpasst.

+0

mapPartitionsWithIndex ist eine einfache Kartenoperation. Es beinhaltet kein Shuffling, nur verteiltes Mapping. Es könnte einen anderen Weg geben, aber ich bin mir nicht sicher, ob es wirklich leistungsfähiger sein könnte. – Marie

Antwort

0

Die Partitionsnummern/-zahlen sind nicht stabil, da Spark eine automatische Erweiterung ausführt & Reduzierung der Partitionierung. Dies bedeutet, dass z. B. die Anzahl der Eingangspartitionen nicht der Anzahl der Eingangsdateien entspricht.

Das allgemeine Muster in diesen Situationen besteht darin, basierend auf den Daten in jeder Eingabedatei eine Art zusammengesetzten Schlüssel zu erstellen. Wenn der Schlüssel groß ist, können Sie ihn hashen, um die Größe zu verringern. Wenn Ihnen Kollisionen nicht wichtig sind, verwenden Sie Murmur3. Wenn Sie über Kollisionen besorgt sind, verwenden Sie MD5, die immer noch ziemlich schnell ist.

Wenn das einzige eindeutige Merkmal, das Sie haben, der Pfad der Eingabedatei ist, müssen Sie den Dateipfad als Unterscheidungsspalte hinzufügen. Hier ist ein Weg, dies zu tun:

val paths = Seq(...) 
val df = paths 
    .map { path => 
    sqlContext.read.parquet(path) 
     .withColumn("path", lit(path)) 
    } 
    .reduceLeft(_ unionAll _) 

Die Idee ist einfach: Lesen Sie die Eingabedateien einen nach dem anderen, fügen Sie eine eindeutige Spalte mit ihnen verbunden ist, und sie dann miteinander zu verknüpfen, UNION ALL verwenden.