Sehr kurz .. ist das ein Fehler oder fehlt mir etwas? tmp_j
ist eine Tasche mit einem einzelnen Artikel und 6 Partitionen. Allerdings bekomme ich ähnliche Antworten mit größeren Taschen.Dask Bag.to_textfiles funktioniert mit einer einzelnen Partition aber nicht multiple
>>> tmp_j = jnode_b.filter(lambda r: (r['node']['attrib']['uid'] == '8909') &
(r['node']['attrib']['version'] == '1')).pluck('node').pluck('attire')
und sieht aus wie:
>>> tmp_j.compute()
[{'changeset': '39455176',
'id': '4197394169',
'lat': '53.4803608',
'lon': '-113.4955328',
'timestamp': '2016-05-20T16:43:02Z',
'uid': '8909',
'user': 'mvexel',
'version': '1'}]
wieder ..
>>> tmp_j.repartition(1).map(json.dumps).to_textfiles('tmpA*.json')
ordnungsgemäß funktioniert, (schreibt die Datei)
danke, aber
Diese besondere Tasche mit konstruiert
>>> tmp_j.map(json.dumps).to_textfiles('tmpA*.json')
gibt
StopIteration Traceback (most recent call last)
<ipython-input-28-a77a33e2ff26> in <module>()
----> 1 tmp_j.map(json.dumps).to_textfiles('tmp*.json')
/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/bag/core.py in to_textfiles(self, path, name_function, compression, encoding, compute)
469 def to_textfiles(self, path, name_function=str, compression='infer',
470 encoding=system_encoding, compute=True):
--> 471 return to_textfiles(self, path, name_function, compression, encoding, compute)
472
473 def fold(self, binop, combine=None, initial=no_default, split_every=None):
/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/bag/core.py in to_textfiles(b, path, name_function, compression, encoding, compute)
167 result = Bag(merge(b.dask, dsk), name, b.npartitions)
168 if compute:
--> 169 result.compute()
170 else:
171 return result
/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs)
35
36 def compute(self, **kwargs):
---> 37 return compute(self, **kwargs)[0]
38
39 @classmethod
/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs)
108 for opt, val in groups.items()])
109 keys = [var._keys() for var in variables]
--> 110 results = get(dsk, keys, **kwargs)
111
112 results_iter = iter(results)
/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/multiprocessing.py in get(dsk, keys, optimizations, num_workers, func_loads, func_dumps, **kwargs)
76 # Run
77 result = get_async(apply_async, len(pool._pool), dsk3, keys,
---> 78 queue=queue, get_id=_process_get_id, **kwargs)
79 finally:
80 if cleanup:
/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
486 _execute_task(task, data) # Re-execute locally
487 else:
--> 488 raise(remote_exception(res, tb))
489 state['cache'][key] = res
490 finish_task(dsk, key, state, results, keyorder.get)
StopIteration:
Traceback
---------
File "/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py", line 267, in execute_task
result = _execute_task(task, data)
File "/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py", line 249, in _execute_task
return func(*args2)
File "/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/bag/core.py", line 1024, in write
firstline = next(data)
der Anmerkung: ist, dass
>>> tmp_b = db.from_sequence(tmp_j,partition_size=3)
>>> tmp_b.map(json.dumps).to_textfiles('tmp*.json')
funktioniert gut (aber auch hier die tmp_b.npartitions == 1
).
noch einmal für einblicke - ich schaute auf die quelle, aber dann erkannte mein smart/lazy ratio zu niedrig.
Ich werde Dokumente einreichen, wenn ich überzeugt bin, dass ich dies im Griff habe. Diese
Siehe https://github.com/dask/dask/pull/1256 gelöst – MRocklin
Sie beobachtete mich nicht, während ich dies schreibe, bist du? (das war schnell). Danke. – JMann