-1
Ich sehe eine Beiz Fehler:PySpark PicklingError
Could not pickle object as excessively deep recursion required.
Unten ist die Spur zurück:
Traceback (most recent call last):
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 62, in call
r = self.func(t, *rdds)
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 159, in
func = lambda t, rdd: old_func(rdd)
if rdd.count() > 0:
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1006, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 997, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 871, in fold
vals = self.mapPartitions(func).collect()
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 773, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2388, in _jrdd
pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, command, self)
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2308, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 428, in dumps
return cloudpickle.dumps(obj, 2)
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 646, in dumps
cp.dump(obj)
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 111, in dump
raise pickle.PicklingError(msg)
PicklingError: Could not pickle object as excessively deep recursion required.
Hier ist ein Teil meines Codes inn auf hoher Ebene, die den Fehler verursacht hat:
sc = SparkContext(appName="my_app")
ssc = StreamingContext(sc, 1)
kafka_stream = KafkaUtils.createDirectStream(ssc, full_topic_list, kafka_params, fromOffsets=offset_dict)
messages = kafka_stream.map(lambda (k, v): json.loads(v))
messages.foreachRDD(lambda rdd: process(rdd, topic_list, sqlcontext))
In meiner Prozess-Funktion gibt es eine Anzahl von rdd: if topic_rdd.count() > 0
, die den Fehler auslöst.
Dank cftarnas. Was ist der beste Weg, um Funktion für eine RDD durchzuführen, wenn ich RDD übergeben. – ling
@ling: Sie können die RDDs wirklich nicht bestehen, sie sind nicht pickleable. Wie Sie Ihr Gesamtproblem lösen, hängt davon ab, was Sie versuchen, ein detaillierteres Code-Snippet könnte helfen. Es könnte so einfach wie die topic_rdd.count() zuerst berechnen und übergeben Sie dann nur die Zählungen selbst. – cftarnas