Von den obigen Kommentaren scheint es, dass dies für pandas
einige Zeit geplant ist (es gibt auch eine interessant aussehende rosetta
project, die ich gerade bemerkt habe).
jedoch bis jeder parallele Funktionalität in pandas
eingebaut ist, bemerkte ich, dass es sehr einfach zu schreiben effizienten & Nicht-Speicher-Kopieren parallel Augmentationen zu pandas
direkt mit cython
+ OpenMP und C++.
Hier ist ein kurzes Beispiel eine parallele groupby-Summe zu schreiben, deren Verwendung so etwas wie dieses:
import pandas as pd
import para_group_demo
df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)
und Ausgang:
sum
key
0 6
1 11
2 4
Hinweis Ohne Zweifel, diese Die einfache Beispielfunktionalität wird schließlich Teil von pandas
sein. Einige Dinge werden jedoch in C++ für einige Zeit natürlicher sein und es ist wichtig zu wissen, wie einfach es ist, dies in pandas
zu kombinieren.
Um dies zu tun, schrieb ich eine einfache Single-Source-Datei-Erweiterung, deren Code folgt.
Es beginnt mit einiger Ein- und Typdefinitionen
from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map
cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange
import pandas as pd
ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t
Der C++ unordered_map
Typ zum Summieren von einem einzelnen Faden ist, und die vector
ist von allen Threads zum Summieren.
Nun zur Funktion sum
.Es beginnt mit typed memory views für schnellen Zugriff off:
def sum(crit, vals):
cdef int64_t[:] crit_view = crit.values
cdef int64_t[:] vals_view = vals.values
Die Funktion weiter durch Dividieren der halb gleichmäßig auf die Fäden (hier einprogrammiert bis 4), und wobei jeder Faden Summe der Einträge in seinem Bereich:
cdef uint64_t num_threads = 4
cdef uint64_t l = len(crit)
cdef uint64_t s = l/num_threads + 1
cdef uint64_t i, j, e
cdef counts_vec_t counts
counts = counts_vec_t(num_threads)
counts.resize(num_threads)
with cython.boundscheck(False):
for i in prange(num_threads, nogil=True):
j = i * s
e = j + s
if e > l:
e = l
while j < e:
counts[i][crit_view[j]] += vals_view[j]
inc(j)
Wenn die Fäden abgeschlossen haben, geht die Funktion alle Ergebnisse (aus den verschiedenen Bereichen) in einem einzigen unordered_map
:
cdef counts_t total
cdef counts_it_t it, e_it
for i in range(num_threads):
it = counts[i].begin()
e_it = counts[i].end()
while it != e_it:
total[deref(it).first] += deref(it).second
inc(it)
Alle th auf der linken Seite ist ein DataFrame
und geben die Ergebnisse zu erstellen:
key, sum_ = [], []
it = total.begin()
e_it = total.end()
while it != e_it:
key.append(deref(it).first)
sum_.append(deref(it).second)
inc(it)
df = pd.DataFrame({'key': key, 'sum': sum_})
df.set_index('key', inplace=True)
return df
Soweit ich weiß, gibt es keine Möglichkeit, beliebige Objekte zu teilen. Ich frage mich, ob das Beizen viel mehr Zeit in Anspruch nimmt als der Gewinn durch Multiprozessing. Vielleicht sollten Sie nach einer Möglichkeit suchen, größere Arbeitspakete für jeden Prozess zu erstellen, um die relative Beizzeit zu reduzieren. Eine andere Möglichkeit wäre die Verwendung von Multiprocessing beim Erstellen der Gruppen. –
Ich mache so etwas, aber mit UWSGI, Flask und Preforking: Ich lade den Pandas-Datenframe in einen Prozess, forktiere ihn x-mal (mach es zu einem Shared-Memory-Objekt) und rufe dann diese Prozesse von einem anderen Python-Prozess auf. atm Ich benutze JSON als Kommunikationsprozess, aber das kommt (noch immer sehr experimentell): http://pandas.pydata.org/pandas-docs/dev/io.html#msgpack-experimental – Carst
Übrigens, hast du? jemals HDF5 mit Chunking betrachten? (HDF5 ist nicht für gleichzeitiges Schreiben, aber Sie können auch in separaten Dateien speichern und am Ende verketten Zeug) – Carst