Ich muss eine vollständige Liste der Zeilennummern für eine Datentabelle mit vielen Spalten generieren.Wie bekomme ich eine SQL-Zeile_number Entsprechung für eine Spark RDD?
In SQL, dies würde wie folgt aussehen:
select
key_value,
col1,
col2,
col3,
row_number() over (partition by key_value order by col1, col2 desc, col3)
from
temp
;
Jetzt wollen wir in Funken sagen, ich habe eine RDD der Form (K, V), wobei V = (col1, col2, col3), so meine Einträge sind wie
(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.
ich diese mit Befehlen wie sortBy bestellen möchten(), sortWith(), sortByKey(), zipWithIndex usw. und haben eine neue RDD mit dem richtigen row_number
(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.
(Ich interessiere mich nicht für die Klammern, so kann das Formular auch (K, (col1, col2, col3, rownum)) sein)
Wie mache ich das?
Hier ist mein erster Versuch:
val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))
val temp1 = sc.parallelize(sample_data)
temp1.collect().foreach(println)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)
temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)
// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)
// note that this isn't ordering with a partition on key value K!
val temp2 = temp1.???
Beachten Sie auch, dass die Funktion sortBy kann nicht direkt an eine RDD angewendet werden, aber man muss laufen sammeln() zuerst, und dann ist der Ausgang nicht ein RDD, entweder , aber ein Array
temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)
// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
Hier ist ein wenig mehr Fortschritte, aber noch nicht partitioniert:
val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))
temp2.collect().foreach(println)
// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)
Diese Frage ist eine Erweiterung von mehreren anderen Fragen teilweise beantwortet, nämlich http://stackoverflow.com/questions/23838614/how-to-sort-ein-rdd-in-scala-spark, http://qnalist.com/questions/5086896/spark-sql-how-to-select-first-row-in-each-group -von-Gruppe, http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3CD01B658B.2BF52%[email protected]%3E, http://Stackoverflow.com/ Fragen/270220 59/filter-rdd-based-on-row-nummer, http://stackoverflow.com/questions/24677180/how-do-select-a-range-of-elements-in-spark-rdd –
I ' m auch, um diese Frage zu beantworten. [Hive fügte analytische Funktionen (einschließlich 'row_number()') in 0.11] (https://issues.apache.org/jira/browse/HIVE-896) hinzu, und Spark 1.1 unterstützt HiveQL/Hive 0.12. So scheint es, dass 'sqlContext.hql (" select row_number() "über" Partition by ... "sollte funktionieren, aber ich bekomme einen Fehler. – dnlbrky