Ich bin neu in PySpark und ich versuche, einige Python-Code zu konvertieren, der eine neue Variable "COUNT_IDX" ableitet. Die neue Variable hat einen Anfangswert von 1, wird jedoch um 1 erhöht, wenn eine Bedingung erfüllt ist. Andernfalls wird der neue Wert der Variablen derselbe Wert sein wie im letzten Datensatz.PySpark bedingte Inkrement
Der Zustand zu inkrementieren ist, wenn: TRIP_CD nicht gleich dem vorherigen Datensatz TRIP_CD oder SIGN nicht gleich dem vorherigen Datensatzes SIGN oder time_diff nicht gleich 1.
Python-Code (pandas Datenrahmen):
df['COUNT_IDX'] = 1
for i in range(1, len(df)):
if ((df['TRIP_CD'].iloc[i] != df['TRIP_CD'].iloc[i - 1])
or (df['SIGN'].iloc[i] != df['SIGN'].iloc[i-1])
or df['time_diff'].iloc[i] != 1):
df['COUNT_IDX'].iloc[i] = df['COUNT_IDX'].iloc[i-1] + 1
else:
df['COUNT_IDX'].iloc[i] = df['COUNT_IDX'].iloc[i-1]
Hier sind die erwarteten Ergebnisse:
TRIP_CD SIGN time_diff COUNT_IDX
2711 - 1 1
2711 - 1 1
2711 + 2 2
2711 - 1 3
2711 - 1 3
2854 - 1 4
2854 + 1 5
In PySpark, ich COUNT_IDX als 1. Dann mit der Fensterfunktion zu initialisieren, nahm ich die Lags von TRIP_CD und SIGN und die time_diff berechnet, dann versucht:
df = sqlContext.sql('''
select TRIP, TRIP_CD, SIGN, TIME_STAMP, seconds_diff,
case when TRIP_CD != TRIP_lag or SIGN != SIGN_lag or seconds_diff != 1
then (lag(COUNT_INDEX) over(partition by TRIP order by TRIP, TIME_STAMP))+1
else (lag(COUNT_INDEX) over(partition by TRIP order by TRIP, TIME_STAMP))
end as COUNT_INDEX from df''')
Diese mir wie etwas geben:
TRIP_CD SIGN time_diff COUNT_IDX
2711 - 1 1
2711 - 1 1
2711 + 2 2
2711 - 1 2
2711 - 1 1
2854 - 1 2
2854 + 1 2
Wenn COUNT_IDX in einem vorherigen Datensatz aktualisiert wird, erkennt COUNT_IDX im aktuellen Datensatz diese zu berechnende Änderung nicht. Es ist, als ob die COUNTI_IDX nicht überschrieben wird oder nicht von Zeile zu Zeile ausgewertet wird. Irgendwelche Ideen wie ich das umgehen kann?
Dies ist eine kreative Lösung, aber ich habe es nicht ganz bekommen noch zu arbeiten. Setzen Sie dies in eine withColumn-Anweisung, um eine neue Spalte mit kumulativer Summe zu erstellen, oder soll das in SQL sein? Vielen Dank! – Amber
Dies soll Ihre SQL-Abfrage zwischen "case when" und "end" ersetzen. Die Fensterdefinition kann inline angegeben werden, wenn Sie dies bevorzugen. Da da einige fehlende Spalten in Daten vorkommen, ist es nur Pseudocode. – zero323