Lets mit einer einfachen Funktion starten, die immer eine zufällige ganze Zahl zurückgibt:Zufallszahlen Generation in PySpark
import numpy as np
def f(x):
return np.random.randint(1000)
und RDD mit Nullen gefüllt und f
kartiert werden:
rdd = sc.parallelize([0] * 10).map(f)
da oberhalb RDD ist nicht beibehalten Ich erwarte, dass ich jedes Mal, wenn ich sammle, eine andere Ausgabe erhalte:
> rdd.collect()
[255, 512, 512, 512, 255, 512, 255, 512, 512, 255]
Wenn wir die Tatsache ignorieren, dass die Verteilung der Werte nicht wirklich zufällig aussieht, passiert mehr oder weniger was passiert. Problem beginnt, wir uns, wenn nimmt nur ein erstes Element:
assert len(set(rdd.first() for _ in xrange(100))) == 1
oder
assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1
Es scheint, die gleiche Anzahl jedes Mal zurück. Ich konnte dieses Verhalten auf zwei verschiedenen Maschinen mit Spark 1.2, 1.3 und 1.4 reproduzieren. Hier verwende ich np.random.randint
, aber es verhält sich auf die gleiche Weise mit random.randint
.
Dieses Problem, das gleiche wie nicht-genau-zufällige Ergebnisse mit collect
scheint Python spezifisch zu sein, und ich konnte es nicht Scala reproduzieren mit:
def f(x: Int) = scala.util.Random.nextInt(1000)
val rdd = sc.parallelize(List.fill(10)(0)).map(f)
(1 to 100).map(x => rdd.first).toSet.size
rdd.collect()
Habe ich etwas offensichtlich hier vermissen?
bearbeiten:
Schaltet die Quelle des Problems heraus ist Python RNG-Implementierung. Um zu zitieren official documentation:
Die von diesem Modul bereitgestellten Funktionen sind tatsächlich gebundenen Methoden einer versteckten Instanz der Random.Random-Klasse. Sie können eigene Instanzen von Random instanziieren, um Generatoren zu erhalten, die keinen gemeinsamen Status haben.
Ich gehe davon aus NumPy die gleiche Weise funktioniert und f
mit RandomState
Instanz Umschreiben als
import os
import binascii
def f(x, seed=None):
seed = (
seed if seed is not None
else int(binascii.hexlify(os.urandom(4)), 16))
rs = np.random.RandomState(seed)
return rs.randint(1000)
macht folgt es langsamer, aber löst das Problem.
Während oben erklärt nicht zufällige Ergebnisse von sammeln ich immer noch nicht verstehen, wie es zwischen mehreren Aktionen first
/take(1)
wirkt.
Nur um zu verdeutlichen: Wenn ich die zufällige Funktion von numpy in Spark verwende, wählt es immer die gleichen Ergebnisse in jeder Partition? Wie kann ich np.random.choice verwenden, damit es zufällig ist? – member555
_Es werden immer die gleichen Ergebnisse in jeder Partition_ gewählt - nicht genau, aber Werte, die für einen einzelnen Worker berechnet werden, sind nicht unabhängig. _Wie kann ich np.random.choice verwenden, damit es zufällig ist? _ - Ich habe die Lösung bereits in einer Bearbeitung beschrieben. Sie sollten einen separaten Status verwenden. Da es ziemlich teuer ist, möchten Sie es vielleicht einmal pro Partition initialisieren. – zero323
Können Sie mir näher erklären, was das Problem ist? Warum ist Pythons gemeinsamer Status ein Problem? – member555