Um die Fensterfunktion verwenden zu können, müssen Sie zuerst ein Fenster erstellen. Die Definition ist so ähnlich wie bei normalem SQL, dh Sie können entweder Reihenfolge, Partition oder beides definieren. Zuerst lässt einige Dummy-Daten erstellen:
import numpy as np
np.random.seed(1)
keys = ["foo"] * 10 + ["bar"] * 10
values = np.hstack([np.random.normal(0, 1, 10), np.random.normal(10, 1, 100)])
df = sqlContext.createDataFrame([
{"k": k, "v": round(float(v), 3)} for k, v in zip(keys, values)])
Stellen Sie sicher, HiveContext
verwenden (Funke < 2.0 only):
from pyspark.sql import HiveContext
assert isinstance(sqlContext, HiveContext)
erstellen Fenster:
from pyspark.sql.window import Window
w = Window.partitionBy(df.k).orderBy(df.v)
, die äquivalent ist
(PARTITION BY k ORDER BY v)
in SQL.
Als Faustregel sollten Fensterdefinitionen immer die PARTITION BY
Klausel enthalten, ansonsten verschiebt Spark alle Daten auf eine einzelne Partition. ORDER BY
ist für einige Funktionen erforderlich, während in anderen Fällen (typischerweise Aggregate) optional sein kann.
Es gibt auch zwei optionale Optionen, mit denen die Fensterspanne definiert werden kann: ROWS BETWEEN
und RANGE BETWEEN
. Diese sind für uns in diesem speziellen Szenario nicht nützlich.
Schließlich können wir es für eine Abfrage verwenden:
from pyspark.sql.functions import percentRank, ntile
df.select(
"k", "v",
percentRank().over(w).alias("percent_rank"),
ntile(3).over(w).alias("ntile3")
)
Beachten Sie, dass ntile
nicht in irgendeiner Weise auf die Quantile zusammenhängt.