Arbeiten von Matthew Rocklin's post auf verteilte Datenrahmen mit Dask, ich versuche, einige statistische Statistiken Berechnungen über meinen Cluster zu verteilen. Das Einrichten des Clusters mit dcluster ...
funktioniert gut. In einem Notebook,Wie verwenden Sie das dask + für NFS-Dateien verteilt?
import dask.dataframe as dd
from distributed import Executor, progress
e = Executor('...:8786')
df = dd.read_csv(...)
Die Datei, die ich lese, befindet sich auf einem NFS-Mount, auf das alle Arbeitscomputer Zugriff haben. An dieser Stelle kann ich zum Beispiel df.head()
betrachten und alles sieht korrekt aus. Aus der Blog-Post, ich denke, ich soll diese Lage zu tun:
df_future = e.persist(df)
progress(df_future)
# ... wait for everything to load ...
df_future.head()
Aber das ist ein Fehler:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-26-8d59adace8bf> in <module>()
----> 1 fraudf.head()
/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/dataframe/core.py in head(self, n, compute)
358
359 if compute:
--> 360 result = result.compute()
361 return result
362
/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs)
35
36 def compute(self, **kwargs):
---> 37 return compute(self, **kwargs)[0]
38
39 @classmethod
/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs)
108 for opt, val in groups.items()])
109 keys = [var._keys() for var in variables]
--> 110 results = get(dsk, keys, **kwargs)
111
112 results_iter = iter(results)
/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs)
55 results = get_async(pool.apply_async, len(pool._pool), dsk, result,
56 cache=cache, queue=queue, get_id=_thread_get_id,
---> 57 **kwargs)
58
59 return results
/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
479 _execute_task(task, data) # Re-execute locally
480 else:
--> 481 raise(remote_exception(res, tb))
482 state['cache'][key] = res
483 finish_task(dsk, key, state, results, keyorder.get)
AttributeError: 'Future' object has no attribute 'head'
Traceback
---------
File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py", line 264, in execute_task
result = _execute_task(task, data)
File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py", line 246, in _execute_task
return func(*args2)
File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/dataframe/core.py", line 354, in <lambda>
dsk = {(name, 0): (lambda x, n: x.head(n=n), (self._name, 0), n)}
Was einen Datenrahmen zu verteilen der richtige Ansatz ist, wenn es von einer normalen Datei kommt System statt HDFS?