2016-08-01 25 views
-2

Ich habe einen Pyspark Datenrahmen enthält Reihen von Daten getrennt durch Komma. Ich möchte jede Zeile aufteilen und die LabeledPoints-Methode darauf anwenden. Dann covern es zum Datenrahmen.piplinedRDD kann nicht in Dataframe konvertieren mit toDF

Hier ist mein Code

import os.path 
from pyspark.mllib.regression import LabeledPoint 
import numpy as np 
file_name = os.path.join('databricks-datasets', 'cs190', 'data-001', 'millionsong.txt') 

raw_data_df = sqlContext.read.load(file_name, 'text') 
rdd = raw_data_df.rdd.map(lambda line: line.split(',')).map(lambda seq:LabeledPoints(seq[0],seq[1:])).toDF() 

Es gibt die folgende Fehlermeldung nach .DF() gelten.

--------------------------------------------------------------------------- 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 38.0 failed 1 times, most recent failure: Lost task 0.0 in stage 38.0 (TID 44, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-65-dc4d86a8ee45> in <module>() 
----> 1 rdd = raw_data_df.rdd.map(lambda line: line.split(',')).map(lambda  seq:LabeledPoints(seq[0],seq[1:])).toDF() 
    2 print(type(rdd)) 
    3 #print(rdd.take(5)) 

/databricks/spark/python/pyspark/sql/context.py in toDF(self, schema,  sampleRatio) 
62   [Row(name=u'Alice', age=1)] 
63   """ 
---> 64   return sqlContext.createDataFrame(self, schema, sampleRatio) 
65 
66  RDD.toDF = toDF 

/databricks/spark/python/pyspark/sql/context.py in createDataFrame(self, data, schema, samplingRatio) 
421 
422   if isinstance(data, RDD): 

-> 423 RDD, schema = self._createFromRDD (Daten, Schema, samplingRatio) 424 anderes: 425 RDD, schema = self._createFromLocal (Daten, Schema)

/databricks/spark/python/pyspark/sql/context.py in _createFromRDD(self, rdd, schema, samplingRatio) 
+1

Tippfehler: 'LabeledPoint'! =' LabeledPoints' – zero323

+0

Das Problem besteht immer noch, wenn ich den Tippfehler korrigiere. Auch nach dem Entfernen der zweiten Map tritt immer noch ein Fehler auf, wenn take() verwendet wird, um die Zeilen nach der Teilung anzuzeigen. –

Antwort

0

Antwort gefunden: rdd = raw_data_df.map (Lambda Zeile: Zeile ['Wert']. Split (',')). Map (Lambda seq: LabeledPoint (float (seq [0]), seq [1:])) .toDF()

Hier muss ich jede Zeile Text mit Zeile ['Wert'], gerade verweisen obwohl es nur ein Merkmal in der Reihe gibt.