1

Ich schrieb den folgenden Code für die logistische Regression, ich möchte die Pipeline-API von spark.ml bereitgestellt. Jedoch gab es mir einen Fehler, nachdem ich versuche, Koeffizienten und Abschnitte zu drucken. Auch habe ich Probleme, die Verwirrungsmatrix und andere Maße wie Präzision, Rückruf zu berechnen.Logistische Regression mit Funken ml (Datenrahmen)

#Logistic Regression: 
from pyspark.mllib.linalg import Vectors 
from pyspark.ml.classification import LogisticRegression 
from pyspark.sql import SQLContext 
from pyspark import SparkContext 
from pyspark.sql.types import * 
from pyspark.sql.functions import * 
from pyspark.ml.feature import StringIndexer,VectorAssembler 
from pyspark.ml import Pipeline 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator 


sc = SparkContext("local", "predictive") 
sqlContext=SQLContext(sc) 

df = sqlContext.read.load('/user/bna_ads_final.csv', 
         format='com.databricks.spark.csv', 
         header='true', 
         inferSchema='true') 

df.show(5) 
df.count() 
df.dtypes 
df=df.withColumn("load_date",df.load_date.cast("timestamp")) 
df_withday= df.withColumn("day",dayofmonth(df.load_date)) 
df_new=df_withday.withColumn("Month",month(df.load_date)) 
df_new=df_new.withColumn("classname",df_new.classname.cast("string")) 
ignore = ["load_date","wo_flag","serialnumber", "classname"] 

def modify_values(r): 
if r == "A" or r =="B": 
    return "dispatch" 
else: 
    return "non-dispatch" 

def show_metrics(metrics): 
# Overall statistics 
precision = metrics.precision() 
recall = metrics.recall() 
f1Score = metrics.fMeasure() 
print("Summary Stats") 
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 
print("F1 Score = %s" % f1Score) 
print (metrics.confusionMatrix()) 

ol_val = udf(modify_values, StringType()) 
df_final = df_new.withColumn("wo_flag",ol_val(df_new.wo_flag)) 
indexer= StringIndexer(inputCol="classname", outputCol="classnamecat") 
indexed = indexer.fit(df_final).transform(df_final) 
indexed=indexed.withColumn("classnamecat",indexed.classnamecat.cast("int")) 
indexed.show(5) 
(trainingData, testData) = indexed.randomSplit([0.7, 0.3]) 
assembler = VectorAssembler(inputCols=[x for x in indexed.columns if x not in ignore],outputCol='features') 
stringindexer=StringIndexer(inputCol="wo_flag", outputCol="labellr") 
Classifier= LogisticRegression(labelCol="labellr", featuresCol="features") 
pipeline=Pipeline(stages=[stringindexer,assembler,Classifier]) 
model = pipeline.fit(trainingData) 
predictions = model.transform(testData) 

selected = predictions.select("features", "labellr", "probability", "prediction") 
for row in selected.collect(): 
print row 


evaluator = MulticlassClassificationEvaluator(
labelCol="labellr", predictionCol="prediction", metricName="precision") 
accuracy = evaluator.evaluate(predictions) 
print("Test Error = %g" % (1.0 - accuracy)) 
print("Accuracy= %g" % (accuracy)) 

print("Coefficients: " + str(model.coefficients)) 
print("Intercept: " + str(model.intercept)) 

Der Fehler, die ich bekommen ist:

print("Coefficients: " + str(model.coefficients)) 
AttributeError: 'PipelineModel' object has no attribute 'coefficients' 

Ich habe 1.5 auf dem Hadoop-Cluster installiert Spark, ich werde nicht in der Lage sein, jederzeit schnell zu aktualisieren. Gibt es eine Arbeit, um dieses Problem zu lösen?

load_date   | r   | classname| mstatus34_timdiff| day|Month| classnamecat| serialnumber 
+-----------+------------------+----------+--------------------+------------+--- +-----------+---- 
2013-12-29 10:55:...|non-dispatch|  6634|    19| 1| 7|   0.0| 231234  
2014-10-05 23:43:...|non-dispatch|  6634|    4| 5| 10|   0.0| 342345 
2014-10-09 09:39:...| dispatch|  5886|    36| 9| 10|   1.0| 563472 
2014-09-16 09:47:...| dispatch|  6634|    53| 16| 9|   0.0| 134657 

Antwort

3

können Sie einzelne Stufen Zugriff mit stages Attribut des PipelineModel

from pyspark.ml import Pipeline 
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel 
from pyspark.ml.feature import VectorAssembler 

df = sc.parallelize([ 
    (0.0, 1.0, 2.0, 4.0), 
    (1.0, 3.0, 4.0, 5.0) 
]).toDF(["label", "x1", "x2", "x3"]) 

assembler = (VectorAssembler() 
    .setInputCols(df.columns[1:]) 
    .setOutputCol("features")) 

lr = LogisticRegression(maxIter=10, regParam=0.01) 

pipeline = Pipeline(stages=[assembler, lr]) 
model = pipeline.fit(data) 

[stage.coefficients for stage in model.stages if hasattr(stage, "coefficients")] 
## [DenseVector([2.1178, 1.6843, -1.8338])] 

## or 

[stage.coefficients for stage in model.stages 
    if isinstance(stage, LogisticRegressionModel)] 
## [DenseVector([2.1178, 1.6843, -1.8338])] 
+0

Dank für die Antwort danken jedoch nicht die Koeffizienten nicht gedruckt, es gibt mir nur a leere Klammern: [] –

+0

Sollte gut funktionieren, solange das richtige Modell in der Pipeline ist. Siehe den [MCVE]. – zero323

+0

Hallo @ zero323 Ich habe angehängt, wie die transformierten Daten aussehen, das Modell funktioniert gut, wenn eine RDD verwendet wird und die Lambda-Funktion mit beschrifteten Punkten verwendet wird, um Features und Labels zu erstellen. Aber es scheitert auf Datenrahmen, Meine Funktion der Berechnung von Metriken und auch Druckkoeffizienten funktioniert nicht –

3

Versuchen Sie, diese

pipeline=Pipeline(stages=[assembler, lr]) 
model = pipeline.fit(trainingData) 
lrm = model.stages[-1] 

lrm.coefficients