2015-08-09 18 views
7

Lets mit einer einfachen Funktion starten, die immer eine zufällige ganze Zahl zurückgibt:Zufallszahlen Generation in PySpark

import numpy as np 

def f(x): 
    return np.random.randint(1000) 

und RDD mit Nullen gefüllt und f kartiert werden:

rdd = sc.parallelize([0] * 10).map(f) 

da oberhalb RDD ist nicht beibehalten Ich erwarte, dass ich jedes Mal, wenn ich sammle, eine andere Ausgabe erhalte:

> rdd.collect() 
[255, 512, 512, 512, 255, 512, 255, 512, 512, 255] 

Wenn wir die Tatsache ignorieren, dass die Verteilung der Werte nicht wirklich zufällig aussieht, passiert mehr oder weniger was passiert. Problem beginnt, wir uns, wenn nimmt nur ein erstes Element:

assert len(set(rdd.first() for _ in xrange(100))) == 1 

oder

assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1 

Es scheint, die gleiche Anzahl jedes Mal zurück. Ich konnte dieses Verhalten auf zwei verschiedenen Maschinen mit Spark 1.2, 1.3 und 1.4 reproduzieren. Hier verwende ich np.random.randint, aber es verhält sich auf die gleiche Weise mit random.randint.

Dieses Problem, das gleiche wie nicht-genau-zufällige Ergebnisse mit collect scheint Python spezifisch zu sein, und ich konnte es nicht Scala reproduzieren mit:

def f(x: Int) = scala.util.Random.nextInt(1000) 

val rdd = sc.parallelize(List.fill(10)(0)).map(f) 
(1 to 100).map(x => rdd.first).toSet.size 

rdd.collect() 

Habe ich etwas offensichtlich hier vermissen?

bearbeiten:

Schaltet die Quelle des Problems heraus ist Python RNG-Implementierung. Um zu zitieren official documentation:

Die von diesem Modul bereitgestellten Funktionen sind tatsächlich gebundenen Methoden einer versteckten Instanz der Random.Random-Klasse. Sie können eigene Instanzen von Random instanziieren, um Generatoren zu erhalten, die keinen gemeinsamen Status haben.

Ich gehe davon aus NumPy die gleiche Weise funktioniert und f mit RandomState Instanz Umschreiben als

import os 
import binascii 

def f(x, seed=None): 
    seed = (
     seed if seed is not None 
     else int(binascii.hexlify(os.urandom(4)), 16)) 
    rs = np.random.RandomState(seed) 
    return rs.randint(1000) 

macht folgt es langsamer, aber löst das Problem.

Während oben erklärt nicht zufällige Ergebnisse von sammeln ich immer noch nicht verstehen, wie es zwischen mehreren Aktionen first/take(1) wirkt.

+0

Nur um zu verdeutlichen: Wenn ich die zufällige Funktion von numpy in Spark verwende, wählt es immer die gleichen Ergebnisse in jeder Partition? Wie kann ich np.random.choice verwenden, damit es zufällig ist? – member555

+0

_Es werden immer die gleichen Ergebnisse in jeder Partition_ gewählt - nicht genau, aber Werte, die für einen einzelnen Worker berechnet werden, sind nicht unabhängig. _Wie kann ich np.random.choice verwenden, damit es zufällig ist? _ - Ich habe die Lösung bereits in einer Bearbeitung beschrieben. Sie sollten einen separaten Status verwenden. Da es ziemlich teuer ist, möchten Sie es vielleicht einmal pro Partition initialisieren. – zero323

+0

Können Sie mir näher erklären, was das Problem ist? Warum ist Pythons gemeinsamer Status ein Problem? – member555

Antwort

2

Also das eigentliche Problem ist hier relativ einfach.Jeder Teilprozess in Python erbt seinen Zustand von seinen Eltern:

len(set(sc.parallelize(range(4), 4).map(lambda _: random.getstate()).collect())) 
# 1 

Da Eltern Zustand keinen Grund hat, in diesem speziellen Szenario zu ändern und die Arbeitnehmer haben eine begrenzte Lebensdauer, Zustand jedes Kind wird genau das gleiche auf jedem Lauf.

1

Dies scheint ein Bug (oder Feature) von randint zu sein. Ich sehe das gleiche Verhalten, aber sobald ich die f ändere, ändern sich die Werte tatsächlich. Also bin ich mir der tatsächlichen Zufälligkeit dieser Methode nicht sicher .... Ich kann keine Dokumentation finden, aber es scheint einen deterministischen mathematischen Algorithmus zu verwenden, anstatt mehr variable Merkmale der laufenden Maschine zu verwenden. Auch wenn ich hin und her gehe, scheinen die Zahlen bei der Rückkehr zum ursprünglichen Wert gleich zu sein ...

+0

Es ist Pseudozufallsgenerator Mersenne Twister implementieren, aber es sollte kein Problem sein. Das Problem hängt definitiv mit der freigegebenen 'Random'-Klasse zusammen (ich habe die Frage bearbeitet, um das zu reflektieren), aber wie es die' erste' Ausgabe beeinflusst, verwirrt mich immer noch. – zero323