2016-05-26 12 views
5

Ich verwende ein Broadcast-Variable etwa 100 MB groß gebeizt, die mit ich annähert:Tipps für die ordnungsgemäße Verwendung großer Broadcast-Variablen?

>>> data = list(range(int(10*1e6))) 
>>> import cPickle as pickle 
>>> len(pickle.dumps(data)) 
98888896 

mit 3 c3.2xlarge Zieher auf einen Cluster ausgeführt werden, und einen m3.large Fahrer, mit der folgenden Befehl startet die interaktive Sitzung:

IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g 

In einer RDD, wenn ich einen Verweis auf diese Sendung Variable bestehen bleiben, die Speichernutzung explodiert. Bei 100 Verweisen auf eine 100-MB-Variable, selbst wenn sie 100 Mal kopiert würde, würde ich erwarten, dass die Datennutzung insgesamt nicht mehr als 10 GB beträgt (geschweige denn 30 GB über 3 Knoten). Allerdings sehe ich Fehler aus dem Speicher, wenn ich folgenden Test durchgeführt:

data = list(range(int(10*1e6))) 
metadata = sc.broadcast(data) 
ids = sc.parallelize(zip(range(100), range(100))) 
joined_rdd = ids.mapValues(lambda _: metadata.value) 
joined_rdd.persist() 
print('count: {}'.format(joined_rdd.count())) 

Der Stack-Trace:

TaskSetManager: Lost task 17.3 in stage 0.0 (TID 75, 10.22.10.13): 

org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main 
    process() 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func 
    return func(split, prev_func(split, iterator)) 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func 
    return func(split, prev_func(split, iterator)) 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 317, in func 
    return f(iterator) 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <lambda> 
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <genexpr> 
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream 
    yield self._read_with_length(stream) 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length 
    return self.loads(obj) 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads 
    return pickle.loads(obj) 
MemoryError 


    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138) 
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

16/05/25 23:57:15 ERROR TaskSetManager: Task 17 in stage 0.0 failed 4 times; aborting job 
--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-1-7a262fdfa561> in <module>() 
     7 joined_rdd.persist() 
     8 print('persist called') 
----> 9 print('count: {}'.format(joined_rdd.count())) 

/usr/lib/spark/python/pyspark/rdd.py in count(self) 
    1004   3 
    1005   """ 
-> 1006   return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    1007 
    1008  def stats(self): 

/usr/lib/spark/python/pyspark/rdd.py in sum(self) 
    995   6.0 
    996   """ 
--> 997   return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) 
    998 
    999  def count(self): 

/usr/lib/spark/python/pyspark/rdd.py in fold(self, zeroValue, op) 
    869   # zeroValue provided to each partition is unique from the one provided 
    870   # to the final reduce call 
--> 871   vals = self.mapPartitions(func).collect() 
    872   return reduce(op, vals, zeroValue) 
    873 

/usr/lib/spark/python/pyspark/rdd.py in collect(self) 
    771   """ 
    772   with SCCallSiteSync(self.context) as css: 
--> 773    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
    774   return list(_load_from_socket(port, self._jrdd_deserializer)) 
    775 

/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 

    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:745) 

Ich habe gesehen vorherige Threads über die Speichernutzung von Gurke Deserialisierung ein Problem zu sein. Ich würde jedoch erwarten, dass eine Broadcast-Variable nur einmal deserialisiert (und auf einem Executor in den Speicher geladen) wird, und nachfolgende Verweise auf .value, um diese In-Memory-Adresse zu referenzieren. Das scheint jedoch nicht der Fall zu sein. Fehle ich etwas?

Die Beispiele, die ich mit Broadcast-Variablen gesehen habe, haben sie als Wörterbücher, einmal verwendet, um eine Reihe von Daten zu transformieren (d. H. Flughafenakronyme durch Flughafennamen ersetzen). Die Motivation dahinter besteht darin, Objekte mit der Kenntnis einer Broadcast-Variablen zu erstellen und mit ihr zu interagieren, diese Objekte persistent zu machen und mehrere Berechnungen mit ihnen durchzuführen (wobei Spark darauf achtet, sie im Speicher zu halten).

Was sind einige Tipps für die Verwendung von großen (100 MB +) Broadcast-Variablen? Bleibt eine Broadcast-Variable fehlgeleitet? Ist das ein Problem, das möglicherweise spezifisch für PySpark ist?

Vielen Dank! Deine Hilfe wird geschätzt.

Hinweis, habe ich auch geschrieben, diese Frage auf dem databricks forums

Bearbeiten - Followup Frage:

Es wurde vorgeschlagen, dass der Standard-Spark-Serializer eine Chargengröße von 65337. Objekten serialisiert in verschiedenen hat Chargen werden nicht gleich identifiziert und erhalten unterschiedliche Speicheradressen, die hier über die eingebaute id Funktion untersucht werden. Aber selbst mit einer größeren Broadcast-Variable, die theoretisch 256 Chargen zum Serialisieren benötigt, sehe ich immer noch nur zwei verschiedene Kopien. Sollte ich nicht viel mehr sehen? Verstehe ich, dass die Batch-Serialisierung nicht korrekt funktioniert?

>>> sc.serializer.bestSize 
65536 
>>> import cPickle as pickle 
>>> broadcast_data = {k: v for (k, v) in enumerate(range(int(1e6)))} 
>>> len(pickle.dumps(broadcast_data)) 
16777786 
>>> len(pickle.dumps({k: v for (k, v) in enumerate(range(int(1e6)))}))/sc.serializer.bestSize 
256 
>>> bd = sc.broadcast(broadcast_data) 
>>> rdd = sc.parallelize(range(100), 1).map(lambda _: bd.value) 
>>> rdd.map(id).distinct().count() 
1 
>>> rdd.cache().count() 
100 
>>> rdd.map(id).distinct().count() 
2 

Antwort

5

Nun, der Teufel steckt im Detail. Um den Grund dafür zu verstehen, müssen wir uns die PySpark-Serialisierer genauer ansehen.Zunächst lässt SparkContext mit Standardeinstellungen erstellen:

from pyspark import SparkContext 

sc = SparkContext("local", "foo") 

und überprüfen, was ein Standard-Serializer ist:

sc.serializer 
## AutoBatchedSerializer(PickleSerializer()) 

sc.serializer.bestSize 
## 65536 

Es sagt uns drei verschiedene Dinge:

  • dies AutoBatchedSerializer Serializer
  • es verwendet PickleSerializer, um tatsächlichen Job
  • durchzuführen
  • bestSize der serialisierten batched ist 65536 Bytes

Ein kurzer Blick at the source code werden Ihnen zeigen, dass dieses serialize Anzahl der Datensätze paßt zu der Zeit auf der Laufzeit serialisiert und versucht Losgröße zu halten weniger als 10 * bestSize. Der wichtige Punkt ist, dass nicht alle Datensätze in der einzelnen Partition zur gleichen Zeit serialisiert werden.

Wir können das experimentell überprüfen, wie folgt:

from operator import add 

bd = sc.broadcast({}) 

rdd = sc.parallelize(range(10), 1).map(lambda _: bd.value) 
rdd.map(id).distinct().count() 
## 1 

rdd.cache().count() 
## 10 

rdd.map(id).distinct().count() 
## 2 

Wie Sie auch in diesem einfachen Beispiel nach der Serialisierung-Deserialisierung sehen wir zwei verschiedene Objekte zu bekommen. Sie können ein ähnliches Verhalten beobachten direkt mit pickle arbeiten:

v = {} 
vs = [v, v, v, v] 

v1, *_, v4 = pickle.loads(pickle.dumps(vs)) 
v1 is v4 
## True 

(v1_, v2_), (v3_, v4_) = (
    pickle.loads(pickle.dumps(vs[:2])), 
    pickle.loads(pickle.dumps(vs[2:])) 
) 

v1_ is v4_ 
## False 

v3_ is v4_ 
## True 

Werte serialisiert in der gleichen Charge Referenz nach Unpickling, das gleiche Objekt. Werte aus verschiedenen Stapeln zeigen auf verschiedene Objekte.

In der Praxis Mehrere Serialisierungen und verschiedene Serialisierungsstrategien ausführen. Sie können zum Beispiel Chargen von unendlicher Größe verwenden:

from pyspark.serializers import BatchedSerializer, PickleSerializer 

rdd_ = (sc.parallelize(range(10), 1).map(lambda _: bd.value) 
    ._reserialize(BatchedSerializer(PickleSerializer()))) 
rdd_.cache().count() 

rdd_.map(id).distinct().count() 
## 1 

Sie können Serializer ändern serializer und/oder batchSize Parameter SparkContext Konstruktor:

sc = SparkContext(
    "local", "bar", 
    serializer=PickleSerializer(), # Default serializer 
    # Unlimited batch size -> BatchedSerializer instead of AutoBatchedSerializer 
    batchSize=-1 
) 

sc.serializer 
## BatchedSerializer(PickleSerializer(), -1) 

Auswählen verschiedene Serializer und Dosiersysteme Strategien Ergebnisse in verschiedenem Handel -offs (Geschwindigkeit, Fähigkeit, beliebige Objekte zu serialisieren, Speicheranforderungen usw.).

Sie sollten auch daran denken, dass Broadcast-Variablen in Spark nicht zwischen Executor-Threads gemeinsam genutzt werden, sodass auf demselben Worker mehrere deserialisierte Kopien gleichzeitig vorhanden sein können.

Darüber hinaus sehen Sie ein ähnliches Verhalten, wenn Sie eine Umwandlung ausführen, die Mischen erfordert.

+0

Haben Sie etwas dagegen, die Kompromisse zwischen Serialisierungsstrategien zu kommentieren? Sollten wir bei einer größeren Batchgröße mehr Speicher für die Serialisierung erwarten? Wie wird es die Geschwindigkeit der Serialisierung beeinflussen? Warum sollte man einen Serializer wählen, der keine beliebigen Objekte serialisieren kann? – captaincapsaicin

+0

Nun, die vollständige Partition passt möglicherweise nicht in den Speicher. Wenn der Stapel also unendlich ist, gibt es keine Garantie, dass er erfolgreich ist. Das ist für Anfänger. Eine höhere Speichernutzung kann zu verschiedenen GC-Problemen führen. In Bezug auf Ihre letzten Fragen kann so gut wie kein Serializer ein beliebiges Objekt verarbeiten, insbesondere wenn es sich um einen nativen Code handelt. Es gibt einige Sprachkonstrukte, die nicht standardmäßig per Design (wie Lambda-Ausdrücke) serialisiert werden können und spezielle Werkzeuge erfordern. Andererseits kann das Serialisieren von komplexen Verschlüssen langsam sein. – zero323

+0

Ich habe auch in einer Folgefrage meine ursprüngliche Frage bearbeitet. Könnten Sie einen Blick darauf werfen, wie die Stapelgröße mit der Anzahl der verschiedenen Objekte im Spark-Speicher korreliert? – captaincapsaicin