2016-06-08 8 views
1

Sehr kurz .. ist das ein Fehler oder fehlt mir etwas? tmp_j ist eine Tasche mit einem einzelnen Artikel und 6 Partitionen. Allerdings bekomme ich ähnliche Antworten mit größeren Taschen.Dask Bag.to_textfiles funktioniert mit einer einzelnen Partition aber nicht multiple

>>> tmp_j = jnode_b.filter(lambda r: (r['node']['attrib']['uid'] == '8909') & 
       (r['node']['attrib']['version'] == '1')).pluck('node').pluck('attire') 

und sieht aus wie:

>>> tmp_j.compute() 

[{'changeset': '39455176', 
    'id': '4197394169', 
    'lat': '53.4803608', 
    'lon': '-113.4955328', 
    'timestamp': '2016-05-20T16:43:02Z', 
    'uid': '8909', 
    'user': 'mvexel', 
    'version': '1'}] 
wieder ..

>>> tmp_j.repartition(1).map(json.dumps).to_textfiles('tmpA*.json') 

ordnungsgemäß funktioniert, (schreibt die Datei)

danke, aber

Diese besondere Tasche mit konstruiert

>>> tmp_j.map(json.dumps).to_textfiles('tmpA*.json') 

gibt

StopIteration        Traceback (most recent call last) 
<ipython-input-28-a77a33e2ff26> in <module>() 
----> 1 tmp_j.map(json.dumps).to_textfiles('tmp*.json') 

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/bag/core.py in to_textfiles(self, path, name_function, compression, encoding, compute) 
    469  def to_textfiles(self, path, name_function=str, compression='infer', 
    470      encoding=system_encoding, compute=True): 
--> 471   return to_textfiles(self, path, name_function, compression, encoding, compute) 
    472 
    473  def fold(self, binop, combine=None, initial=no_default, split_every=None): 

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/bag/core.py in to_textfiles(b, path, name_function, compression, encoding, compute) 
    167  result = Bag(merge(b.dask, dsk), name, b.npartitions) 
    168  if compute: 
--> 169   result.compute() 
    170  else: 
    171   return result 

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs) 
    35 
    36  def compute(self, **kwargs): 
---> 37   return compute(self, **kwargs)[0] 
    38 
    39  @classmethod 

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs) 
    108     for opt, val in groups.items()]) 
    109  keys = [var._keys() for var in variables] 
--> 110  results = get(dsk, keys, **kwargs) 
    111 
    112  results_iter = iter(results) 

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/multiprocessing.py in get(dsk, keys, optimizations, num_workers, func_loads, func_dumps, **kwargs) 
    76   # Run 
    77   result = get_async(apply_async, len(pool._pool), dsk3, keys, 
---> 78       queue=queue, get_id=_process_get_id, **kwargs) 
    79  finally: 
    80   if cleanup: 

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs) 
    486     _execute_task(task, data) # Re-execute locally 
    487    else: 
--> 488     raise(remote_exception(res, tb)) 
    489   state['cache'][key] = res 
    490   finish_task(dsk, key, state, results, keyorder.get) 

StopIteration: 

Traceback 
--------- 
    File "/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py", line 267, in execute_task 
    result = _execute_task(task, data) 
    File "/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py", line 249, in _execute_task 
    return func(*args2) 
    File "/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/bag/core.py", line 1024, in write 
    firstline = next(data) 

der Anmerkung: ist, dass

>>> tmp_b = db.from_sequence(tmp_j,partition_size=3) 
>>> tmp_b.map(json.dumps).to_textfiles('tmp*.json') 

funktioniert gut (aber auch hier die tmp_b.npartitions == 1).

noch einmal für einblicke - ich schaute auf die quelle, aber dann erkannte mein smart/lazy ratio zu niedrig.

Ich werde Dokumente einreichen, wenn ich überzeugt bin, dass ich dies im Griff habe. Diese

+0

Siehe https://github.com/dask/dask/pull/1256 gelöst – MRocklin

+0

Sie beobachtete mich nicht, während ich dies schreibe, bist du? (das war schnell). Danke. – JMann

Antwort

1

war ein echter Fehler und wurde nun in Master

In [1]: import dask.bag as db 

In [2]: db.range(5, npartitions=5).filter(lambda x: x == 1).map(str).to_textfiles('*.txt') 

In [3]: ls *.txt 
0.txt 1.txt 2.txt 3.txt 4.txt C:\nppdf32Log\debuglog.txt foo.txt 
+0

vielen dank. - Ich lerne gerade Python-Dokumentations-Compilation, damit ich meine oben erwähnte Ergänzung zu dask docs machen kann. – JMann

+0

Jede Hilfe wäre schön :) – MRocklin