2016-07-30 18 views
2

Was ist der beste Weg, um die Daten durch ein Feld in eine vordefinierte Anzahl von Partitionen zu partitionieren?Was ist eine effiziente Möglichkeit, nach Spalten zu partitionieren, aber eine feste Anzahl von Partitionen beizubehalten?

Ich partitioniere derzeit die Daten durch Angabe der partionCount = 600. Es wurde herausgefunden, dass die Zählung 600 die beste Abfrageleistung für meine Datensatz-/Clustereinrichtung liefert.

val rawJson = sqlContext.read.json(filename).coalesce(600) 
rawJson.write.parquet(filenameParquet) 

Jetzt möchte ich diese Daten von der Spalte ‚eventname‘ partitionieren, aber immer noch die Zählung 600. Die Daten halten derzeit rund 2000 einzigartige eventNames sowie die Anzahl der Zeilen in jeder eventname ist nicht einheitlich. Etwa 10 eventNames haben mehr als 50% der Daten verursacht Datenschiefstand. Wenn ich also die Partitionierung wie folgt mache, ist es nicht sehr performant. Der Schreibvorgang dauert 5x mehr Zeit als ohne.

val rawJson = sqlContext.read.json(filename) 
rawJson.write.partitionBy("eventName").parquet(filenameParquet) 

Was ist eine gute Möglichkeit, die Daten für diese Szenarien zu partitionieren? Gibt es eine Möglichkeit, nach eventName zu partitionieren, aber in 600 Partitionen zu verteilen?

Mein Schema sieht wie folgt aus:

{ 
    "eventName": "name1", 
    "time": "2016-06-20T11:57:19.4941368-04:00", 
    "data": { 
    "type": "EventData", 
    "dataDetails": { 
     "name": "detailed1", 
     "id": "1234", 
... 
... 
    } 
    } 
} 

Dank!

Antwort

0

Haben Sie versucht, Liste Bucketing-Konzept anwenden. Sie haben nur wenige Partitionen für Ihre Skew-Spalten, wie diese 10 Event-Namen. Im übrigen können Sie nur eine Partition/Verzeichnis für alle anderen Schlüssel haben. Sie können here suchen. Es ist meist auf 80-20 Regel ausgerichtet.

0

Dies ist ein häufiges Problem mit verzerrten Daten und es gibt verschiedene Ansätze, die Sie verwenden können.

Listen-Bucketing funktioniert, wenn der Skew im Laufe der Zeit stabil bleibt, was auch der Fall sein kann, besonders wenn neue Werte der Partitionierungsvariablen eingeführt werden. Ich habe nicht untersucht, wie einfach es ist, List Bucketing im Laufe der Zeit anzupassen, und wie Ihr Kommentar sagt, können Sie das sowieso nicht verwenden, da es sich um eine Spark 2.0-Funktion handelt.

Wenn Sie auf 1.6.x sind, besteht die wichtigste Beobachtung darin, dass Sie eine eigene Funktion erstellen können, die jeden Ereignisnamen in einen von 600 eindeutigen Werten abbildet. Sie können dies als UDF oder als Case-Ausdruck ausführen. Dann erstellen Sie einfach eine Spalte mit dieser Funktion und partitionieren dann nach dieser Spalte unter Verwendung von repartition(600, 'myPartitionCol) im Gegensatz zu coalesce(600).

Da wir mit sehr schief Daten bei Swoop umgehen, habe ich die folgende Arbeitspferd-Datenstruktur als sehr nützlich für die Erstellung von Partitionierungs-Tools gefunden.

/** Given a key, returns a random number in the range [x, y) where 
    * x and y are the numbers in the tuple associated with a key. 
    */ 
class RandomRangeMap[A](private val m: Map[A, (Int, Int)]) extends Serializable { 
    private val r = new java.util.Random() // Scala Random is not serializable in 2.10 

    def apply(key: A): Int = { 
    val (start, end) = m(key) 
    start + r.nextInt(end - start) 
    } 

    override def toString = s"RandomRangeMap($r, $m)" 
} 

Zum Beispiel, hier ist, wie wir eine Partitionierungs für einen etwas anderen Fall bauen: ein, wo die Daten verzerrt sind und die Anzahl der Tasten ist klein, so müssen wir die Anzahl der Partitionen für die schiefen Tasten erhöhen, während mit 1 als die minimalen Anzahl von Partitionen pro Schlüssel kleben:

/** Partitions data such that each unique key ends in P(key) partitions. 
    * Must be instantiated with a sequence of unique keys and their Ps. 
    * Partition sizes can be highly-skewed by the data, which is where the 
    * multiples come in. 
    * 
    * @param keyMap maps key values to their partition multiples 
    */ 
class ByKeyPartitionerWithMultiples(val keyMap: Map[Any, Int]) extends Partitioner { 
    private val rrm = new RandomRangeMap(
    keyMap.keys 
     .zip(
     keyMap.values 
      .scanLeft(0)(_+_) 
      .zip(keyMap.values) 
      .map { 
      case (start, count) => (start, start + count) 
      } 
    ) 
     .toMap 
) 

    override val numPartitions = 
    keyMap.values.sum 

    override def getPartition(key: Any): Int = 
    rrm(key) 
} 

object ByKeyPartitionerWithMultiples { 

    /** Builds a UDF with a ByKeyPartitionerWithMultiples in a closure. 
    * 
    * @param keyMap maps key values to their partition multiples 
    */ 
    def udf(keyMap: Map[String, Int]) = { 
    val partitioner = new ByKeyPartitionerWithMultiples(keyMap.asInstanceOf[Map[Any, Int]]) 
    (key:String) => partitioner.getPartition(key) 
    } 

} 

In Ihrem Fall haben Sie mehrere Ereignisnamen in eine einzige Partition zu verschmelzen, die Änderungen erfordern würde, aber ich hoffe, dass der obige Code gibt Ihnen eine Vorstellung davon, wie das Problem angehen.

Eine letzte Beobachtung ist, dass wenn die Verteilung von Ereignisnamen im Laufe der Zeit sehr viele Werte in Ihren Daten hat, Sie einen Statistiksammeldurchlauf über einen Teil der Daten durchführen können, um eine Zuordnungstabelle zu berechnen. Sie müssen das nicht ständig tun, nur wenn es nötig ist.Um dies festzustellen, können Sie die Anzahl der Zeilen und/oder die Größe der Ausgabedateien in jeder Partition betrachten. Mit anderen Worten, der gesamte Prozess kann als Teil Ihrer Spark-Jobs automatisiert werden.

+0

Danke Sim für die Details. – vijay

+1

Wenn die Neupartition durch eine berechnete Spalte (map des eventName) erfolgt, würde die Abfrage, die nach eventName filtert (dh WHERE eventName == "foo"), immer noch nur die relevanten Partitionen lesen und nicht den vollständigen Tabellenscan durchführen. seit es ist jetzt nicht mehr eventName partitioniert? – vijay

+0

Das effizienteste Laden findet nur statt, wenn Sie genau nach der Partitionsspalte filtern. Wenn Ihr Skew im Zeitverlauf stabil ist, verwenden Sie ein statisches Mapping (was auch immer es sein mag; muss nicht List-Bucket sein) und wenden Sie die gleiche Funktion während der Abfrage an. Wenn Ihr Skew im Zeitverlauf nicht stabil ist, müssen Sie eine Datenstruktur der Ereignis-zu-Partition-Zuordnung im Zeitverlauf separat verwalten, die über die Zeitabschnitte, die Sie abfragen, zusammenführen und beide durch die Partitionsspalte filtern Partitionen) und nach Ereignisnamen (um innerhalb der Partitionen zu fokussieren). – Sim