2016-03-04 4 views
5

Angenommen, ich schaffe eine solche RDD (I Pyspark bin mit):Wie entscheidet Spark, wie eine RDD partitioniert wird?

list_rdd = sc.parallelize(xrange(0, 20, 2), 6) 

dann drucke ich die partitionierten Elemente mit dem glom() Verfahren und erhalten

[[0], [2, 4], [6, 8], [10], [12, 14], [16, 18]] 

Wie Funken zu partitionieren hat sich entschieden, wie meine Liste? Woher kommt diese spezifische Wahl der Elemente? Es könnte sie anders gekoppelt haben und einige andere Elemente als 0 und 10 übriglassen, um die 6 angeforderten Partitionen zu erzeugen. Bei einem zweiten Lauf sind die Partitionen gleich.

einen größeren Bereich verwenden, mit 29 Elementen, erhalte ich Partitionen in dem Muster von zwei Elementen von drei Elementen gefolgt:

list_rdd = sc.parallelize(xrange(0, 30, 2), 6) 
[[0, 2], [4, 6, 8], [10, 12], [14, 16, 18], [20, 22], [24, 26, 28]] 

einen kleineren Bereich von 9 Elementen I

list_rdd = sc.parallelize(xrange(0, 10, 2), 6) 
[[], [0], [2], [4], [6], [8]] 

erhalten Was ich daraus schließe, ist, dass Spark die Partitionen erzeugt, indem er die Liste in eine Konfiguration aufteilt, in der kleinstmögliche größere Sammlungen folgen und wiederholt werden.

Die Frage ist, ob es einen Grund für diese Entscheidung gibt, die sehr elegant ist, aber auch Leistungsvorteile bietet?

Antwort

2

Sofern Sie keinen bestimmten Partitionierer angeben, ist dieser "zufällig", da er von der spezifischen Implementierung dieser RDD abhängt. In diesem Fall können Sie sich an die ParallelCollectionsRDD wenden, um weiter zu graben.

getPartitions ist definiert als:

val slices = ParallelCollectionRDD.slice(data, numSlices).toArray 
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray 

wo slice wie kommentiert (umformatiert besser passen):

/** 
* Slice a collection into numSlices sub-collections. 
* One extra thing we do here is to treat Range collections specially, 
* encoding the slices as other Ranges to minimize memory cost. 
* This makes it efficient to run Spark over RDDs representing large sets of numbers. 
* And if the collection is an inclusive Range, 
* we use inclusive range for the last slice. 
*/ 

Beachten Sie, dass einige Überlegungen in Bezug auf Speicher gibt. Dies wird wiederum spezifisch für die Implementierung sein.