3

Ich bereite Daten für die Eingabe für einen Klassifikator in Pyspark. Ich habe Aggregatfunktionen in SparkSQL verwendet, um Merkmale wie Durchschnitt und Varianz zu extrahieren. Diese sind nach Aktivität, Name und Fenster gruppiert. Fenster wurde berechnet, indem ein Unix-Zeitstempel durch 10000 geteilt wurde, um in Zeitfenster von 10 Sekunden einzubrechen.Pyspark benutzerdefinierte Aggregatberechnung auf Spalten

sample = sqlContext.sql("SELECT activity, name, window, avg(acc_x) as avgX , variance(acc_x) as varX FROM data GROUP BY activity,name,window ORDER BY activity,name,window") 

Das Ergebnis dieser aussehen würde

Activity Name   Window  AvgX  VarX 
Walk accelerometer 95875  2.0   1.0 

Was soll ich jetzt tun,

Um dies die durchschnittlichen Steigung von jedem Punkt in X zu berechnen, ist I-Zeitstempel müssen, Fenster, und X. Ich habe die Logik in Python implementiert, mit Arrays, so würde es aussehen - Berechnung der Steigung zwischen jedem Punkt, und dann die durchschnittliche Steigung zu bekommen. Am liebsten würde ich dies in einem UDAF tun, der in Pyspark noch nicht unterstützt wird. (Es würde so aussehen, sagen, wenn die Funktion unten Neigung gerufen wurde, dann in SQL Sie slope(timestamp, X) as avgSlopeX tun konnte

EDIT -. Eingabe geändert, damit es klarer ist Also, was ich tue genau das ist die Berechnung der Steigung zwischen. jeder Punkt, und die Rückkehr dann den Durchschnitt der Pisten in diesem Fenster. so, wie ich den Durchschnitt und die Varianz jeden Fensters bin immer, möchte ich auch die durchschnittliche Steigung erhalten.

#sample input 
timestamp = [1464703425544,1464703426534,1464703427551,1464703428587,1464703429512,1464703430493,1464703431505,1464703432543,1464703433513,1464703434529] 

values = [1021.31,1021.26,1021.19,1021.19,1021.1,1021.1,1021.1, 1021.05,1021.02] 

i = 0; 
slope = 0.0; 
totalSlope = 0.0; 

while (i < len(timestamp) - 1): 
    y2 = values[i+1]; 
    y1 = values[i]; 

    x2 = timestamp[i + 1]; 
    x1 = timestamp[i]; 
    slope = ((y2-y1)/(x2-x1)); 
    totalSlope = totalSlope + slope; 
    i=i+1 

avgSlope = (totalSlope/len(x_values)) 

wie ich das umsetzen kann ? Sollte ich versuchen, zu einem Pandas-Dataframe und dann zu einem numpy-Array zu konvertieren? Wenn ja, wie kann ich sicherstellen, dass die Daten immer noch korrekt gemappt werden, wenn man den GROUP BY-Act berücksichtigt itivity, Name Fenster in der SQL-Abfrage.

+0

Dies ist definitiv kein Job für UDAF. – zero323

+0

@ zero323 wie würdest du das angehen? – other15

+0

Berechnen Sie die Steigung für aufeinanderfolgende Punkte und nehmen Sie dann einen einfachen Durchschnitt. Aber Eingabe Beschreibung ist hier eher vage. Können Sie Beispieldaten mit der erwarteten Ausgabe bereitstellen? – zero323

Antwort

4

Im Allgemeinen ist dies kein Job für UDAF, da UDAFs keine Möglichkeit bieten, die Reihenfolge zu definieren. Es sieht so aus, als ob Sie hier eine Kombination von Fensterfunktionen und Standardaggregationen benötigen.

from pyspark.sql.functions import col, lag, avg 
from pyspark.sql.window import Window 

df = ... 
## DataFrame[activity: string, name: string, window: bigint, 
## timestamp: bigint, value: float] 

group = ["activity", "name", "window"] 

w = (Window() 
    .partitionBy(*group) 
    .orderBy("timestamp")) 

v_diff = col("value") - lag("value", 1).over(w) 
t_diff = col("timestamp") - lag("timestamp", 1).over(w) 

slope = v_diff/t_diff 

df.withColumn("slope", slope).groupBy(*group).agg(avg(col("slope"))) 
+0

das scheint ein guter Ansatz zu sein! Aber mitColumn scheint "slope" zu ignorieren und gibt stattdessen nur den Durchschnittswert zurück:/wenn slope als regulär registriert wurde Funktion könnte es verwendet werden? – other15

+0

Typo. Es sollte Steigung in Agg-Klausel sein. – zero323