2016-07-31 25 views
1

Ich bin neu in PySpark und ich versuche, einige Python-Code zu konvertieren, der eine neue Variable "COUNT_IDX" ableitet. Die neue Variable hat einen Anfangswert von 1, wird jedoch um 1 erhöht, wenn eine Bedingung erfüllt ist. Andernfalls wird der neue Wert der Variablen derselbe Wert sein wie im letzten Datensatz.PySpark bedingte Inkrement

Der Zustand zu inkrementieren ist, wenn: TRIP_CD nicht gleich dem vorherigen Datensatz TRIP_CD oder SIGN nicht gleich dem vorherigen Datensatzes SIGN oder time_diff nicht gleich 1.

Python-Code (pandas Datenrahmen):

df['COUNT_IDX'] = 1 

for i in range(1, len(df)): 
    if ((df['TRIP_CD'].iloc[i] != df['TRIP_CD'].iloc[i - 1]) 
      or (df['SIGN'].iloc[i] != df['SIGN'].iloc[i-1]) 
      or df['time_diff'].iloc[i] != 1): 
     df['COUNT_IDX'].iloc[i] = df['COUNT_IDX'].iloc[i-1] + 1 
    else: 
     df['COUNT_IDX'].iloc[i] = df['COUNT_IDX'].iloc[i-1] 

Hier sind die erwarteten Ergebnisse:

TRIP_CD SIGN time_diff COUNT_IDX 
2711  -  1   1 
2711  -  1   1 
2711  +  2   2 
2711  -  1   3 
2711  -  1   3 
2854  -  1   4 
2854  +  1   5 

In PySpark, ich COUNT_IDX als 1. Dann mit der Fensterfunktion zu initialisieren, nahm ich die Lags von TRIP_CD und SIGN und die time_diff berechnet, dann versucht:

df = sqlContext.sql(''' 
    select TRIP, TRIP_CD, SIGN, TIME_STAMP, seconds_diff, 
    case when TRIP_CD != TRIP_lag or SIGN != SIGN_lag or seconds_diff != 1 
     then (lag(COUNT_INDEX) over(partition by TRIP order by TRIP, TIME_STAMP))+1 
     else (lag(COUNT_INDEX) over(partition by TRIP order by TRIP, TIME_STAMP)) 
     end as COUNT_INDEX from df''') 

Diese mir wie etwas geben:

TRIP_CD SIGN time_diff COUNT_IDX 
2711  -  1   1 
2711  -  1   1 
2711  +  2   2 
2711  -  1   2 
2711  -  1   1 
2854  -  1   2 
2854  +  1   2 

Wenn COUNT_IDX in einem vorherigen Datensatz aktualisiert wird, erkennt COUNT_IDX im aktuellen Datensatz diese zu berechnende Änderung nicht. Es ist, als ob die COUNTI_IDX nicht überschrieben wird oder nicht von Zeile zu Zeile ausgewertet wird. Irgendwelche Ideen wie ich das umgehen kann?

Antwort

1

Sie benötigen kumulative Summe hier:

-- cumulative sum 
SUM(CAST( 
    -- if at least one condition has been satisfied 
    -- we take 1 otherwise 0 
    TRIP_CD != TRIP_lag OR SIGN != SIGN_lag OR seconds_diff != 1 AS LONG 
)) OVER W 
... 
WINDOW W AS (PARTITION BY trip ORDER BY times_stamp) 
+0

Dies ist eine kreative Lösung, aber ich habe es nicht ganz bekommen noch zu arbeiten. Setzen Sie dies in eine withColumn-Anweisung, um eine neue Spalte mit kumulativer Summe zu erstellen, oder soll das in SQL sein? Vielen Dank! – Amber

+0

Dies soll Ihre SQL-Abfrage zwischen "case when" und "end" ersetzen. Die Fensterdefinition kann inline angegeben werden, wenn Sie dies bevorzugen. Da da einige fehlende Spalten in Daten vorkommen, ist es nur Pseudocode. – zero323