Ich verwende Spark Streaming, um Chargen von JSON-Messwerten von Kafka zu erhalten. Der resultierende Stapel wird von einer RDD in einen Datenrahmen konvertiert.PySpark Mllib Vorhersage aller Zeilen im Datenrahmen
Mein Ziel ist es, eine Klassifizierung in jeder Zeile dieses Datenrahmen zu tun, damit ein VectorAssembler Ich verwende die Funktionen zu erstellen, die auf das Modell übergeben werden:
sqlContext = SQLContext(rdd.context)
rawReading = sqlContext.jsonRDD(rdd)
sensorReadings = rawReading.selectExpr("actual.y AS yaw","actual.p AS pitch", "actual.r AS roll")
assembler = VectorAssembler(
inputCols=["yaw", "pitch", "roll"],
outputCol="features")
sensorReadingsFinal = assembler.transform(sensorReadings)
sensorReadingsFinal.show()
+---+-----+----+-----------------+
|yaw|pitch|roll| features|
+---+-----+----+-----------------+
| 18| 17.5| 120|[18.0,17.5,120.0]|
| 18| 17.5| 120|[18.0,17.5,120.0]|
| 18| 17.5| 120|[18.0,17.5,120.0]|
| 18| 17.5| 120|[18.0,17.5,120.0]|
| 18| 17.5| 120|[18.0,17.5,120.0]|
+---+-----+----+-----------------+
Ich habe einen Zufallswald Modell, das ich habe vorher trainiert.
loadedModel = RandomForestModel.load(sc, "MyRandomForest.model")
Meine Frage ist: Wie kann ich eine Vorhersage auf jeder Zeile in dem Datenrahmen, bevor das Ganze in eine Datenbank einfügen?
ich zunächst so etwas wie dies zu tun, dachte ...
prediction = loadedModel.predict(sensorReadings.features)
aber ich erkennen, dass da der Datenrahmen mehrere Zeilen hat, muss ich irgendwie eine Spalte hinzufügen und die Vorhersage Reihe zu tun, indem Reihe. Vielleicht mache ich das alles falsch?
Die letzte Datenrahmen, die Ich mag würde, ist so etwas wie dieses:
+---+-----+----+-----------------+
|yaw|pitch|roll| Prediction|
+---+-----+----+-----------------+
| 18| 17.5| 120| 1 |
| 18| 17.5| 120| 1 |
| 18| 17.5| 120| 1 |
| 18| 17.5| 120| 1 |
| 18| 17.5| 120| 1 |
+---+-----+----+-----------------+
an welcher Stelle ich es in einer Datenbank speichern wird:
sensorReadingsFinal.write.jdbc("jdbc:mysql://localhost/testdb", "SensorReadings", properties=connectionProperties)