0

Ich verwende Spark Streaming, um Chargen von JSON-Messwerten von Kafka zu erhalten. Der resultierende Stapel wird von einer RDD in einen Datenrahmen konvertiert.PySpark Mllib Vorhersage aller Zeilen im Datenrahmen

Mein Ziel ist es, eine Klassifizierung in jeder Zeile dieses Datenrahmen zu tun, damit ein VectorAssembler Ich verwende die Funktionen zu erstellen, die auf das Modell übergeben werden:

sqlContext = SQLContext(rdd.context) 
rawReading = sqlContext.jsonRDD(rdd) 
sensorReadings = rawReading.selectExpr("actual.y AS yaw","actual.p AS pitch", "actual.r AS roll") 
assembler = VectorAssembler(
     inputCols=["yaw", "pitch", "roll"], 
     outputCol="features") 
sensorReadingsFinal = assembler.transform(sensorReadings) 
sensorReadingsFinal.show() 
+---+-----+----+-----------------+ 
|yaw|pitch|roll|   features| 
+---+-----+----+-----------------+ 
| 18| 17.5| 120|[18.0,17.5,120.0]| 
| 18| 17.5| 120|[18.0,17.5,120.0]| 
| 18| 17.5| 120|[18.0,17.5,120.0]| 
| 18| 17.5| 120|[18.0,17.5,120.0]| 
| 18| 17.5| 120|[18.0,17.5,120.0]| 
+---+-----+----+-----------------+ 

Ich habe einen Zufallswald Modell, das ich habe vorher trainiert.

loadedModel = RandomForestModel.load(sc, "MyRandomForest.model") 

Meine Frage ist: Wie kann ich eine Vorhersage auf jeder Zeile in dem Datenrahmen, bevor das Ganze in eine Datenbank einfügen?

ich zunächst so etwas wie dies zu tun, dachte ...

prediction = loadedModel.predict(sensorReadings.features) 

aber ich erkennen, dass da der Datenrahmen mehrere Zeilen hat, muss ich irgendwie eine Spalte hinzufügen und die Vorhersage Reihe zu tun, indem Reihe. Vielleicht mache ich das alles falsch?

Die letzte Datenrahmen, die Ich mag würde, ist so etwas wie dieses:

+---+-----+----+-----------------+ 
|yaw|pitch|roll|  Prediction| 
+---+-----+----+-----------------+ 
| 18| 17.5| 120|    1 | 
| 18| 17.5| 120|    1 | 
| 18| 17.5| 120|    1 | 
| 18| 17.5| 120|    1 | 
| 18| 17.5| 120|    1 | 
+---+-----+----+-----------------+ 

an welcher Stelle ich es in einer Datenbank speichern wird:

sensorReadingsFinal.write.jdbc("jdbc:mysql://localhost/testdb", "SensorReadings", properties=connectionProperties) 

Antwort

0

Hier ist, was ich zu tun endete beheben dieses Problems:

# Convert DStream RDD's to DataFrame and run SQL query 
    sqlContext = SQLContext(rdd.context) 
    if rdd.isEmpty() == False: 
     rawReading = sqlContext.jsonRDD(rdd) 
     sensorReadings = rawReading.selectExpr("actual.y AS yaw","actual.p AS pitch", "actual.r AS roll") 
     assembler = VectorAssembler(
        inputCols=["yaw","pitch","roll"], # Must be in same order as what was used to train the model. Testing using only pitch since model has limited dataset. 
        outputCol="features") 
     sensorReadings = assembler.transform(sensorReadings) 

     # Create a new dataFrame of predictions and readingID 
     predictions = loadedModel.predict(sensorReadings.map(lambda x: x.features)) 
     predictionsDF = sensorReadings.map(lambda x: x.readingID).zip(predictions).toDF(["readingID","positionID"]) 

     # Join the prediction dataFrame back to the sensorReadings dataFrame. Drop the duplicate readingID column. 
     combinedDF = sensorReadings.join(predictionsDF, sensorReadings.readingID == predictionsDF.readingID).drop(predictionsDF.readingID) 

     # Drop the feature vector column 
     combinedDF = combinedDF.drop("features") 

     combinedDF.show() 

ich im Grunde eine neue Datenrahmen von nur den Merkmalsvektoren und readingID erstellt und schloss sie dann wieder an den ursprünglichen dataFrame an.

Dies ist möglicherweise nicht die eleganteste Lösung, also wenn jemand etwas besser vorschlagen kann, bitte tun.