Ich habe 100 npz-Dateien mit numpy Arrays in Google-Speicher. Ich habe dataproc mit jupyter eingerichtet und ich versuche, alle numpy Arrays in Funke RDD zu lesen. Was ist der beste Weg, um die numpigen Arrays von einem Google-Speicher in pyspark zu laden? Gibt es einen einfachen Weg wie np.load("gs://path/to/array.npz")
das numpy Array zu laden und dann sc.parallelize
darauf tun?lesen numy Array von GCS in Spark
Antwort
Wenn Sie planen, schließlich zu skalieren, sollten Sie die verteilte Eingabemethode in SparkContext
lieber als das Laden lokaler Dateien aus dem Treiberprogramm verwenden, das auf sc.parallelize
basiert. Es klingt wie Sie allerdings intakt jede der Dateien lesen müssen, so in Ihrem Fall Sie wollen:
npz_rdd = sc.binaryFiles("gs://path/to/directory/containing/files/")
Oder Sie können auch einzelne Dateien angeben, wenn Sie wollen, aber dann haben Sie gerade einen RDD mit einem einzigen Element :
npz_rdd = sc.binaryFiles("gs://path/to/directory/containing/files/arr1.npz")
Dann ist jeder Datensatz ein Paar <filename>,<str of bytes>
. Auf Dataproc wird sc.binaryFiles
nur automatisch direkt mit GCS-Pfaden arbeiten, im Gegensatz zu np.load
, die lokale Dateisystempfade erfordert.
Dann in Ihrem Arbeiter Code, müssen Sie nur StringIO
verwenden, um die Byte-Zeichenfolgen als Dateiobjekt zu verwenden, setzen Sie in np.load
:
from StringIO import StringIO
# For example, to create an RDD of the 'arr_0' element of each of the picked objects:
npz_rdd.map(lambda l: numpy.load(StringIO(l[1]))['arr_0'])
Während der Entwicklung, wenn Sie wirklich nur die Dateien lesen wollen in Ihre Haupt-Treiber-Programm, können Sie immer Ihre RDD herunterfallen mit collect()
, um es lokal abzurufen:
npz_rdd = sc.binaryFiles("gs://path/to/directory/containing/files/arr1.npz")
local_bytes = npz_rdd.collect()[0][1]
local_np_obj = np.load(StringIO(local_bytes))