2014-11-20 8 views
17

Ich muss eine vollständige Liste der Zeilennummern für eine Datentabelle mit vielen Spalten generieren.Wie bekomme ich eine SQL-Zeile_number Entsprechung für eine Spark RDD?

In SQL, dies würde wie folgt aussehen:

select 
    key_value, 
    col1, 
    col2, 
    col3, 
    row_number() over (partition by key_value order by col1, col2 desc, col3) 
from 
    temp 
; 

Jetzt wollen wir in Funken sagen, ich habe eine RDD der Form (K, V), wobei V = (col1, col2, col3), so meine Einträge sind wie

(key1, (1,2,3)) 
(key1, (1,4,7)) 
(key1, (2,2,3)) 
(key2, (5,5,5)) 
(key2, (5,5,9)) 
(key2, (7,5,5)) 
etc. 

ich diese mit Befehlen wie sortBy bestellen möchten(), sortWith(), sortByKey(), zipWithIndex usw. und haben eine neue RDD mit dem richtigen row_number

(key1, (1,2,3), 2) 
(key1, (1,4,7), 1) 
(key1, (2,2,3), 3) 
(key2, (5,5,5), 1) 
(key2, (5,5,9), 2) 
(key2, (7,5,5), 3) 
etc. 

(Ich interessiere mich nicht für die Klammern, so kann das Formular auch (K, (col1, col2, col3, rownum)) sein)

Wie mache ich das?

Hier ist mein erster Versuch:

val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3)) 

val temp1 = sc.parallelize(sample_data) 

temp1.collect().foreach(println) 

// ((3,4),5,5,5) 
// ((3,4),5,5,9) 
// ((3,4),7,5,5) 
// ((1,2),1,2,3) 
// ((1,2),1,4,7) 
// ((1,2),2,2,3) 

temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println) 

// ((((1,2),1,2,3),1),0) 
// ((((1,2),1,4,7),1),1) 
// ((((1,2),2,2,3),1),2) 
// ((((3,4),5,5,5),1),3) 
// ((((3,4),5,5,9),1),4) 
// ((((3,4),7,5,5),1),5) 

// note that this isn't ordering with a partition on key value K! 

val temp2 = temp1.??? 

Beachten Sie auch, dass die Funktion sortBy kann nicht direkt an eine RDD angewendet werden, aber man muss laufen sammeln() zuerst, und dann ist der Ausgang nicht ein RDD, entweder , aber ein Array

temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println) 

// ((1,2),1,4,7) 
// ((1,2),1,2,3) 
// ((1,2),2,2,3) 
// ((3,4),5,5,5) 
// ((3,4),5,5,9) 
// ((3,4),7,5,5) 

Hier ist ein wenig mehr Fortschritte, aber noch nicht partitioniert:

val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1)) 

temp2.collect().foreach(println) 

// ((1,2),1,4,7,1) 
// ((1,2),1,2,3,2) 
// ((1,2),2,2,3,3) 
// ((3,4),5,5,5,4) 
// ((3,4),5,5,9,5) 
// ((3,4),7,5,5,6) 
+0

Diese Frage ist eine Erweiterung von mehreren anderen Fragen teilweise beantwortet, nämlich http://stackoverflow.com/questions/23838614/how-to-sort-ein-rdd-in-scala-spark, http://qnalist.com/questions/5086896/spark-sql-how-to-select-first-row-in-each-group -von-Gruppe, http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3CD01B658B.2BF52%[email protected]%3E, http://Stackoverflow.com/ Fragen/270220 59/filter-rdd-based-on-row-nummer, http://stackoverflow.com/questions/24677180/how-do-select-a-range-of-elements-in-spark-rdd –

+0

I ' m auch, um diese Frage zu beantworten. [Hive fügte analytische Funktionen (einschließlich 'row_number()') in 0.11] (https://issues.apache.org/jira/browse/HIVE-896) hinzu, und Spark 1.1 unterstützt HiveQL/Hive 0.12. So scheint es, dass 'sqlContext.hql (" select row_number() "über" Partition by ... "sollte funktionieren, aber ich bekomme einen Fehler. – dnlbrky

Antwort

13

Die Funktion row_number() over (partition by ... order by ...) wurde zu Spark 1.4 hinzugefügt. Diese Antwort verwendet PySpark/DataFrames.

einen Testdatenrahmen erstellen:

from pyspark.sql import Row, functions as F 

testDF = sc.parallelize(
    (Row(k="key1", v=(1,2,3)), 
    Row(k="key1", v=(1,4,7)), 
    Row(k="key1", v=(2,2,3)), 
    Row(k="key2", v=(5,5,5)), 
    Row(k="key2", v=(5,5,9)), 
    Row(k="key2", v=(7,5,5)) 
    ) 
).toDF() 

die partitionierten Zeilennummer hinzufügen:

from pyspark.sql.window import Window 

(testDF 
.select("k", "v", 
     F.rowNumber() 
     .over(Window 
       .partitionBy("k") 
       .orderBy("k") 
      ) 
     .alias("rowNum") 
     ) 
.show() 
) 

+----+-------+------+ 
| k|  v|rowNum| 
+----+-------+------+ 
|key1|[1,2,3]|  1| 
|key1|[1,4,7]|  2| 
|key1|[2,2,3]|  3| 
|key2|[5,5,5]|  1| 
|key2|[5,5,9]|  2| 
|key2|[7,5,5]|  3| 
+----+-------+------+ 
4

Dies ist ein interessantes Problem, das Sie ansprechen. Ich werde es in Python beantworten, aber ich bin mir sicher, dass Sie nahtlos in Scala übersetzen können.

Hier ist, wie ich es angehen würde:

1- Vereinfachen Sie Ihre Daten:

temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3]))) 

TEMP2 ist nun ein "echtes" Schlüssel-Wert-Paar. Es sieht so aus, dass:

[ 
((3, 4), (5, 5, 5)), 
((3, 4), (5, 5, 9)), 
((3, 4), (7, 5, 5)), 
((1, 2), (1, 2, 3)), 
((1, 2), (1, 4, 7)), 
((1, 2), (2, 2, 3)) 

]

2- Dann verwenden die Gruppe-Funktion durch die Wirkung der PARTITION BY zu reproduzieren:

temp3 = temp2.groupByKey() 

TEMP3 ist nun ein RDD mit 2 Zeilen:

[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>), 
((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)] 

3- Jetzt müssen Sie für jeden Wert der RDD eine Rangfunktion anwenden.In Python, würde ich die einfache sortierten Funktion (die enumerate Ihre row_number Spalte erstellen):

temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10) 

Beachten Sie, dass Ihre bestimmten Reihenfolge zu implementieren, müssen Sie das richtige „Schlüssel“ Argument (in Python ernähren, ich würde nur eine Lambda-Funktion, wie sie schaffen:

lambda tuple : (tuple[0],-tuple[1],tuple[2]) 

Am Ende (ohne Schlüssel Argument Funktion, es sieht so aus):

[ 
((1, 2), ((1, 2, 3), 0)), 
((1, 2), ((1, 4, 7), 1)), 
((1, 2), ((2, 2, 3), 2)), 
((3, 4), ((5, 5, 5), 0)), 
((3, 4), ((5, 5, 9), 1)), 
((3, 4), ((7, 5, 5), 2)) 

]

Hoffe, dass hilft!

Viel Glück.