2016-04-14 9 views
1

Arbeiten von Matthew Rocklin's post auf verteilte Datenrahmen mit Dask, ich versuche, einige statistische Statistiken Berechnungen über meinen Cluster zu verteilen. Das Einrichten des Clusters mit dcluster ... funktioniert gut. In einem Notebook,Wie verwenden Sie das dask + für NFS-Dateien verteilt?

import dask.dataframe as dd 
from distributed import Executor, progress 
e = Executor('...:8786') 

df = dd.read_csv(...) 

Die Datei, die ich lese, befindet sich auf einem NFS-Mount, auf das alle Arbeitscomputer Zugriff haben. An dieser Stelle kann ich zum Beispiel df.head() betrachten und alles sieht korrekt aus. Aus der Blog-Post, ich denke, ich soll diese Lage zu tun:

df_future = e.persist(df) 
progress(df_future) 
# ... wait for everything to load ... 
df_future.head() 

Aber das ist ein Fehler:

--------------------------------------------------------------------------- 
AttributeError       Traceback (most recent call last) 
<ipython-input-26-8d59adace8bf> in <module>() 
----> 1 fraudf.head() 

/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/dataframe/core.py in head(self, n, compute) 
    358 
    359   if compute: 
--> 360    result = result.compute() 
    361   return result 
    362 

/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs) 
    35 
    36  def compute(self, **kwargs): 
---> 37   return compute(self, **kwargs)[0] 
    38 
    39  @classmethod 

/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs) 
    108     for opt, val in groups.items()]) 
    109  keys = [var._keys() for var in variables] 
--> 110  results = get(dsk, keys, **kwargs) 
    111 
    112  results_iter = iter(results) 

/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs) 
    55  results = get_async(pool.apply_async, len(pool._pool), dsk, result, 
    56       cache=cache, queue=queue, get_id=_thread_get_id, 
---> 57       **kwargs) 
    58 
    59  return results 

/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs) 
    479     _execute_task(task, data) # Re-execute locally 
    480    else: 
--> 481     raise(remote_exception(res, tb)) 
    482   state['cache'][key] = res 
    483   finish_task(dsk, key, state, results, keyorder.get) 

AttributeError: 'Future' object has no attribute 'head' 

Traceback 
--------- 
    File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py", line 264, in execute_task 
    result = _execute_task(task, data) 
    File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py", line 246, in _execute_task 
    return func(*args2) 
    File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/dataframe/core.py", line 354, in <lambda> 
    dsk = {(name, 0): (lambda x, n: x.head(n=n), (self._name, 0), n)} 

Was einen Datenrahmen zu verteilen der richtige Ansatz ist, wenn es von einer normalen Datei kommt System statt HDFS?

Antwort

1

Dask versucht, den Einzelmaschinen-Scheduler zu verwenden. Dies ist der Standard, wenn Sie einen Datenrahmen mit der normalen DASK-Bibliothek erstellen. Ändern Sie den Standardwert so, dass Ihr Cluster mit den folgenden Zeilen verwendet wird:

import dask 
dask.set_options(get=e.get)