Diese Frage richtet sich an Personen, die mit py4j vertraut sind - und kann helfen, einen Beizfehler zu beheben. Ich versuche, eine Methode zum pyspark PythonMLLibAPI hinzuzufügen, das eine RDD eines namedtuple akzeptiert, etwas Arbeit tut und ein Ergebnis in Form einer RDD zurückbringt.Pyspark py4j PickleException: "erwartete Nullargumente für den Bau von ClassDict"
Dieses Verfahren nach der PYthonMLLibAPI.trainALSModel() Methode modelliert, deren analogen bestehende relevante Abschnitte sind:
def trainALSModel(
ratingsJRDD: JavaRDD[Rating],
..)
Die bestehende Python Rating-Klasse ist den neuen Code Modell:
hierclass Rating(namedtuple("Rating", ["user", "product", "rating"])):
def __reduce__(self):
return Rating, (int(self.user), int(self.product), float(self.rating))
ist der Versuch, hier So sind die relevanten Klassen:
New Python-Klasse pyspark.mllib.clustering.MatrixEntry:
from collections import namedtuple
class MatrixEntry(namedtuple("MatrixEntry", ["x","y","weight"])):
def __reduce__(self):
return MatrixEntry, (long(self.x), long(self.y), float(self.weight))
New Methode foobarRDD In PythonMLLibAPI:
def foobarRdd(
data: JavaRDD[MatrixEntry]): RDD[FooBarResult] = {
val rdd = data.rdd.map { d => FooBarResult(d.i, d.j, d.value, d.i * 100 + d.j * 10 + d.value)}
rdd
}
uns es jetzt lassen ausprobieren:
from pyspark.mllib.clustering import MatrixEntry
def convert_to_MatrixEntry(tuple):
return MatrixEntry(*tuple)
from pyspark.mllib.clustering import *
pic = PowerIterationClusteringModel(2)
tups = [(1,2,3),(4,5,6),(12,13,14),(15,7,8),(16,17,16.5)]
trdd = sc.parallelize(map(convert_to_MatrixEntry,tups))
# print out the RDD on python side just for validation
print "%s" %(repr(trdd.collect()))
from pyspark.mllib.common import callMLlibFunc
pic = callMLlibFunc("foobar", trdd)
Relevante Teile von re Ergebnisse:
[(1,2)=3.0, (4,5)=6.0, (12,13)=14.0, (15,7)=8.0, (16,17)=16.5]
was zeigt, dass der Eingang rdd 'ganz' ist. Doch das Beizen war unglücklich:
5/04/27 21:15:44 ERROR Executor: Exception in task 6.0 in stage 1.0 (TID 14)
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict
(for pyspark.mllib.clustering.MatrixEntry)
at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:617)
at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:170)
at net.razorvine.pickle.Unpickler.load(Unpickler.java:84)
at net.razorvine.pickle.Unpickler.loads(Unpickler.java:97)
at org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(PythonMLLibAPI.scala:1167)
at org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(PythonMLLibAPI.scala:1166)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819)
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1523)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1523)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:212)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
Im Folgenden ist eine visuelle des Python-Aufruf-Stack-Trace:
Hallo hast du es gelöst? –