2015-04-28 6 views
5

Diese Frage richtet sich an Personen, die mit py4j vertraut sind - und kann helfen, einen Beizfehler zu beheben. Ich versuche, eine Methode zum pyspark PythonMLLibAPI hinzuzufügen, das eine RDD eines namedtuple akzeptiert, etwas Arbeit tut und ein Ergebnis in Form einer RDD zurückbringt.Pyspark py4j PickleException: "erwartete Nullargumente für den Bau von ClassDict"

Dieses Verfahren nach der PYthonMLLibAPI.trainALSModel() Methode modelliert, deren analogen bestehende relevante Abschnitte sind:

def trainALSModel(
    ratingsJRDD: JavaRDD[Rating], 
    ..) 

Die bestehende Python Rating-Klasse ist den neuen Code Modell:

hier
class Rating(namedtuple("Rating", ["user", "product", "rating"])): 
    def __reduce__(self): 
     return Rating, (int(self.user), int(self.product), float(self.rating)) 

ist der Versuch, hier So sind die relevanten Klassen:

New Python-Klasse pyspark.mllib.clustering.MatrixEntry:

from collections import namedtuple 
class MatrixEntry(namedtuple("MatrixEntry", ["x","y","weight"])): 
    def __reduce__(self): 
     return MatrixEntry, (long(self.x), long(self.y), float(self.weight)) 

New Methode foobarRDD In PythonMLLibAPI:

def foobarRdd(
    data: JavaRDD[MatrixEntry]): RDD[FooBarResult] = { 
    val rdd = data.rdd.map { d => FooBarResult(d.i, d.j, d.value, d.i * 100 + d.j * 10 + d.value)} 
    rdd 
    } 

uns es jetzt lassen ausprobieren:

from pyspark.mllib.clustering import MatrixEntry 

def convert_to_MatrixEntry(tuple): 
    return MatrixEntry(*tuple) 

from pyspark.mllib.clustering import * 
pic = PowerIterationClusteringModel(2) 
tups = [(1,2,3),(4,5,6),(12,13,14),(15,7,8),(16,17,16.5)] 
trdd = sc.parallelize(map(convert_to_MatrixEntry,tups)) 

# print out the RDD on python side just for validation 
print "%s" %(repr(trdd.collect())) 

from pyspark.mllib.common import callMLlibFunc 
pic = callMLlibFunc("foobar", trdd) 

Relevante Teile von re Ergebnisse:

[(1,2)=3.0, (4,5)=6.0, (12,13)=14.0, (15,7)=8.0, (16,17)=16.5] 

was zeigt, dass der Eingang rdd 'ganz' ist. Doch das Beizen war unglücklich:

5/04/27 21:15:44 ERROR Executor: Exception in task 6.0 in stage 1.0 (TID 14) 
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict 
(for pyspark.mllib.clustering.MatrixEntry) 
    at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) 
    at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:617) 
    at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:170) 
    at net.razorvine.pickle.Unpickler.load(Unpickler.java:84) 
    at net.razorvine.pickle.Unpickler.loads(Unpickler.java:97) 
    at org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(PythonMLLibAPI.scala:1167) 
    at org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(PythonMLLibAPI.scala:1166) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819) 
    at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1523) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1523) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
    at org.apache.spark.scheduler.Task.run(Task.scala:64) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:212) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:724) 

Im Folgenden ist eine visuelle des Python-Aufruf-Stack-Trace:

enter image description here

+0

Hallo hast du es gelöst? –

Antwort

8

ich den gleichen Fehler hatte, wie ich MLlib wurde mit, und es stellte sich heraus, dass ich hatte einen falschen Datentyp in einer meiner Funktionen zurückgegeben. Es funktioniert jetzt nach einer einfachen Umwandlung des zurückgegebenen Werts. Dies ist vielleicht nicht die Antwort, die Sie suchen, aber es ist zumindest ein Hinweis auf die Richtung, der Sie folgen werden.

+0

Ich arbeite nicht mehr an diesem Projekt - und kann es nicht bestätigen. Es scheint jedoch eine vernünftige Überlegung zu sein, so dass sie aufgewertet wurden. – javadba

1

Ich habe diesen Fehler mit Spark Version> = 2.0 erhalten.

Spark wechselt die MLlib-Funktion in den neueren ML-Namespace. Als Ergebnis gibt es zwei Arten von sparsevector: ml.linalg.SparseVector und mllib.linalg.SparseVector

Einige MLlib Funktionen noch erwarten, dass die ältere mllib Art

from pyspark.ml.linalg import Vectors 
# convert ML vector to older MLlib vector 
old_vec = Vectors.fromML(new_vec) 

HTH

+0

Das war sehr hilfreich - danke! Das einzige Problem ist, dass in Version 2.1.1 'fromML' nicht mehr existiert, also musste ich das Objekt manuell erstellen, indem ich' pyspark.mllib.linalg.SparseVector (sv.size, sv.indices, sv.values) 'erstellte , wo 'sv' mein' pyspark.ml.linalg.SparseVector' Objekt war. – LateCoder

1

hatte das gleiche Problem, mehrere Male. numpy Typen haben keine impliziten Konvertierungen zu pyspark.sql.types.

Machen Sie eine einfache explizite Konvertierung in das systemeigene System. In meinem Fall war es:

float(vector_a.dot(vector_b)