Ich bereite Daten für die Eingabe für einen Klassifikator in Pyspark. Ich habe Aggregatfunktionen in SparkSQL verwendet, um Merkmale wie Durchschnitt und Varianz zu extrahieren. Diese sind nach Aktivität, Name und Fenster gruppiert. Fenster wurde berechnet, indem ein Unix-Zeitstempel durch 10000 geteilt wurde, um in Zeitfenster von 10 Sekunden einzubrechen.Pyspark benutzerdefinierte Aggregatberechnung auf Spalten
sample = sqlContext.sql("SELECT activity, name, window, avg(acc_x) as avgX , variance(acc_x) as varX FROM data GROUP BY activity,name,window ORDER BY activity,name,window")
Das Ergebnis dieser aussehen würde
Activity Name Window AvgX VarX
Walk accelerometer 95875 2.0 1.0
Was soll ich jetzt tun,
Um dies die durchschnittlichen Steigung von jedem Punkt in X zu berechnen, ist I-Zeitstempel müssen, Fenster, und X. Ich habe die Logik in Python implementiert, mit Arrays, so würde es aussehen - Berechnung der Steigung zwischen jedem Punkt, und dann die durchschnittliche Steigung zu bekommen. Am liebsten würde ich dies in einem UDAF tun, der in Pyspark noch nicht unterstützt wird. (Es würde so aussehen, sagen, wenn die Funktion unten Neigung gerufen wurde, dann in SQL Sie slope(timestamp, X) as avgSlopeX
tun konnte
EDIT -. Eingabe geändert, damit es klarer ist Also, was ich tue genau das ist die Berechnung der Steigung zwischen. jeder Punkt, und die Rückkehr dann den Durchschnitt der Pisten in diesem Fenster. so, wie ich den Durchschnitt und die Varianz jeden Fensters bin immer, möchte ich auch die durchschnittliche Steigung erhalten.
#sample input
timestamp = [1464703425544,1464703426534,1464703427551,1464703428587,1464703429512,1464703430493,1464703431505,1464703432543,1464703433513,1464703434529]
values = [1021.31,1021.26,1021.19,1021.19,1021.1,1021.1,1021.1, 1021.05,1021.02]
i = 0;
slope = 0.0;
totalSlope = 0.0;
while (i < len(timestamp) - 1):
y2 = values[i+1];
y1 = values[i];
x2 = timestamp[i + 1];
x1 = timestamp[i];
slope = ((y2-y1)/(x2-x1));
totalSlope = totalSlope + slope;
i=i+1
avgSlope = (totalSlope/len(x_values))
wie ich das umsetzen kann ? Sollte ich versuchen, zu einem Pandas-Dataframe und dann zu einem numpy-Array zu konvertieren? Wenn ja, wie kann ich sicherstellen, dass die Daten immer noch korrekt gemappt werden, wenn man den GROUP BY-Act berücksichtigt itivity, Name Fenster in der SQL-Abfrage.
Dies ist definitiv kein Job für UDAF. – zero323
@ zero323 wie würdest du das angehen? – other15
Berechnen Sie die Steigung für aufeinanderfolgende Punkte und nehmen Sie dann einen einfachen Durchschnitt. Aber Eingabe Beschreibung ist hier eher vage. Können Sie Beispieldaten mit der erwarteten Ausgabe bereitstellen? – zero323