2016-07-15 10 views
3

einen Datenrahmen Gegeben:Wie ordnen Sie Rängen zu Datensätzen in einem Spark-Datenrahmen basierend auf bestimmten Bedingungen zu?

+-------+-------+ 
| A | B | 
+-------+-------+ 
|  a|  1| 
+-------+-------+ 
|  b|  2| 
+-------+-------+ 
|  c|  5| 
+-------+-------+ 
|  d|  7| 
+-------+-------+ 
|  e|  11| 
+-------+-------+  

Ich möchte Reihen Aufzeichnungen auf Bedingungen basieren zuzuweisen:

  1. starten Rang mit 1
  2. zuordnen Rang = Rang des bisherigen Rekord wenn (B des aktuellen Datensatzes - B des vorherigen Datensatzes) ist < = 2
  3. Inkrement Rang, wenn (B des aktuellen Datensatzes - B des vorherigen Datensatzes)> 2
ist So

Ich möchte Ergebnis so sein:

+-------+-------+------+ 
| A | B | rank | 
+-------+-------+------+ 
|  a|  1|  1| 
+-------+-------+------+ 
|  b|  2|  1| 
+-------+-------+------+ 
|  c|  5|  2| 
+-------+-------+------+ 
|  d|  7|  2| 
+-------+-------+------+ 
|  e|  11|  3| 
+-------+-------+------+ 
  • Inbuilt Funktionen in Funken wie rowNumber, Rang, DENSE_RANK keine jede Funktionalität bieten, dies zu erreichen.
  • Ich habe es versucht, indem Sie einen globalen Variablenrang und holen vorherigen Rekordwerte mit Verzögerung Funktion, aber es gibt keine konsistente Ergebnisse aufgrund der verteilten Verarbeitung in Funken im Gegensatz zu SQL.
  • Eine weitere Methode, die ich versuchte, bestand darin, Lag-Werte von Datensätzen an eine UDF zu übergeben, während eine neue Spalte erzeugt und Bedingungen in UDF angewendet wurden. Aber das Problem, mit dem ich konfrontiert bin, ist, dass ich Lag-Werte für die Spalten A und B bekommen kann, aber nicht für den Spaltenrang. Dies gibt Fehler, da es nicht Spaltenname Rang auflösen kann.

    HiveContext.sql ("SELECT * df, LAG (df.rank, 1) OVER (ORDER BY B, 0) AS rank_lag, udfGetVisitNo (B, rank_lag) als Rang FROM df ")

  • Ich kann Lag-Wert einer Spalte, die ich gerade hinzufüge, nicht bekommen.

  • Auch ich möchte nicht Methoden, die Verwendung von df.collect() erfordern, da dieser Datenrahmen ziemlich groß ist und das Sammeln auf einem einzelnen Arbeitsknoten führt zu Speicherfehlern.

Jede andere Methode, mit der ich das gleiche erreichen kann? Ich würde gerne eine Lösung mit Zeitkomplexität O (n) wissen, wobei n das Nein der Datensätze ist.

+0

können Sie Unterabfragen wie in SQL? –

+0

Ja. Ich kann SQL-ähnliche Abfragen verwenden, wenn Sie darauf eine Lösung haben. –

Antwort

1

Eine SQL-Lösung würde

select a,b,1+sum(col) over(order by a) as rnk 
from 
(
select t.* 
,case when b - lag(b,1,b) over(order by a) <= 2 then 0 else 1 end as col 
from t 
) x 

Die Lösung nimmt die Bestellung auf der Säule a basiert.

SQL Server example

+0

Das funktioniert. Vielen Dank . Wie stark beeinflusst diese Unterabfrage die Ausführungszeit meines Codes? Ich habe rund 7000000 Datensätze. –

+0

nicht sicher, ich kann erraten, wie es die Ausführungszeit beeinflusst. Probieren Sie es aus und überprüfen Sie es. Die letzte Abfrage fasst nur die Werte der inneren Abfrage zusammen. –

+0

Okay, hab es! Danke noch einmal. –