Ich verwende ein Broadcast-Variable etwa 100 MB groß gebeizt, die mit ich annähert:Tipps für die ordnungsgemäße Verwendung großer Broadcast-Variablen?
>>> data = list(range(int(10*1e6)))
>>> import cPickle as pickle
>>> len(pickle.dumps(data))
98888896
mit 3 c3.2xlarge Zieher auf einen Cluster ausgeführt werden, und einen m3.large Fahrer, mit der folgenden Befehl startet die interaktive Sitzung:
IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g
In einer RDD, wenn ich einen Verweis auf diese Sendung Variable bestehen bleiben, die Speichernutzung explodiert. Bei 100 Verweisen auf eine 100-MB-Variable, selbst wenn sie 100 Mal kopiert würde, würde ich erwarten, dass die Datennutzung insgesamt nicht mehr als 10 GB beträgt (geschweige denn 30 GB über 3 Knoten). Allerdings sehe ich Fehler aus dem Speicher, wenn ich folgenden Test durchgeführt:
data = list(range(int(10*1e6)))
metadata = sc.broadcast(data)
ids = sc.parallelize(zip(range(100), range(100)))
joined_rdd = ids.mapValues(lambda _: metadata.value)
joined_rdd.persist()
print('count: {}'.format(joined_rdd.count()))
Der Stack-Trace:
TaskSetManager: Lost task 17.3 in stage 0.0 (TID 75, 10.22.10.13):
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/lib/spark/python/pyspark/rdd.py", line 317, in func
return f(iterator)
File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <lambda>
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <genexpr>
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream
yield self._read_with_length(stream)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
MemoryError
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/05/25 23:57:15 ERROR TaskSetManager: Task 17 in stage 0.0 failed 4 times; aborting job
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-1-7a262fdfa561> in <module>()
7 joined_rdd.persist()
8 print('persist called')
----> 9 print('count: {}'.format(joined_rdd.count()))
/usr/lib/spark/python/pyspark/rdd.py in count(self)
1004 3
1005 """
-> 1006 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
1007
1008 def stats(self):
/usr/lib/spark/python/pyspark/rdd.py in sum(self)
995 6.0
996 """
--> 997 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
998
999 def count(self):
/usr/lib/spark/python/pyspark/rdd.py in fold(self, zeroValue, op)
869 # zeroValue provided to each partition is unique from the one provided
870 # to the final reduce call
--> 871 vals = self.mapPartitions(func).collect()
872 return reduce(op, vals, zeroValue)
873
/usr/lib/spark/python/pyspark/rdd.py in collect(self)
771 """
772 with SCCallSiteSync(self.context) as css:
--> 773 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
774 return list(_load_from_socket(port, self._jrdd_deserializer))
775
/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Ich habe gesehen vorherige Threads über die Speichernutzung von Gurke Deserialisierung ein Problem zu sein. Ich würde jedoch erwarten, dass eine Broadcast-Variable nur einmal deserialisiert (und auf einem Executor in den Speicher geladen) wird, und nachfolgende Verweise auf .value
, um diese In-Memory-Adresse zu referenzieren. Das scheint jedoch nicht der Fall zu sein. Fehle ich etwas?
Die Beispiele, die ich mit Broadcast-Variablen gesehen habe, haben sie als Wörterbücher, einmal verwendet, um eine Reihe von Daten zu transformieren (d. H. Flughafenakronyme durch Flughafennamen ersetzen). Die Motivation dahinter besteht darin, Objekte mit der Kenntnis einer Broadcast-Variablen zu erstellen und mit ihr zu interagieren, diese Objekte persistent zu machen und mehrere Berechnungen mit ihnen durchzuführen (wobei Spark darauf achtet, sie im Speicher zu halten).
Was sind einige Tipps für die Verwendung von großen (100 MB +) Broadcast-Variablen? Bleibt eine Broadcast-Variable fehlgeleitet? Ist das ein Problem, das möglicherweise spezifisch für PySpark ist?
Vielen Dank! Deine Hilfe wird geschätzt.
Hinweis, habe ich auch geschrieben, diese Frage auf dem databricks forums
Bearbeiten - Followup Frage:
Es wurde vorgeschlagen, dass der Standard-Spark-Serializer eine Chargengröße von 65337. Objekten serialisiert in verschiedenen hat Chargen werden nicht gleich identifiziert und erhalten unterschiedliche Speicheradressen, die hier über die eingebaute id
Funktion untersucht werden. Aber selbst mit einer größeren Broadcast-Variable, die theoretisch 256 Chargen zum Serialisieren benötigt, sehe ich immer noch nur zwei verschiedene Kopien. Sollte ich nicht viel mehr sehen? Verstehe ich, dass die Batch-Serialisierung nicht korrekt funktioniert?
>>> sc.serializer.bestSize
65536
>>> import cPickle as pickle
>>> broadcast_data = {k: v for (k, v) in enumerate(range(int(1e6)))}
>>> len(pickle.dumps(broadcast_data))
16777786
>>> len(pickle.dumps({k: v for (k, v) in enumerate(range(int(1e6)))}))/sc.serializer.bestSize
256
>>> bd = sc.broadcast(broadcast_data)
>>> rdd = sc.parallelize(range(100), 1).map(lambda _: bd.value)
>>> rdd.map(id).distinct().count()
1
>>> rdd.cache().count()
100
>>> rdd.map(id).distinct().count()
2
Haben Sie etwas dagegen, die Kompromisse zwischen Serialisierungsstrategien zu kommentieren? Sollten wir bei einer größeren Batchgröße mehr Speicher für die Serialisierung erwarten? Wie wird es die Geschwindigkeit der Serialisierung beeinflussen? Warum sollte man einen Serializer wählen, der keine beliebigen Objekte serialisieren kann? – captaincapsaicin
Nun, die vollständige Partition passt möglicherweise nicht in den Speicher. Wenn der Stapel also unendlich ist, gibt es keine Garantie, dass er erfolgreich ist. Das ist für Anfänger. Eine höhere Speichernutzung kann zu verschiedenen GC-Problemen führen. In Bezug auf Ihre letzten Fragen kann so gut wie kein Serializer ein beliebiges Objekt verarbeiten, insbesondere wenn es sich um einen nativen Code handelt. Es gibt einige Sprachkonstrukte, die nicht standardmäßig per Design (wie Lambda-Ausdrücke) serialisiert werden können und spezielle Werkzeuge erfordern. Andererseits kann das Serialisieren von komplexen Verschlüssen langsam sein. – zero323
Ich habe auch in einer Folgefrage meine ursprüngliche Frage bearbeitet. Könnten Sie einen Blick darauf werfen, wie die Stapelgröße mit der Anzahl der verschiedenen Objekte im Spark-Speicher korreliert? – captaincapsaicin