Application efficace d'une fonction à un DataFrame pandas groupé en parallèle

89

J'ai souvent besoin d'appliquer une fonction aux groupes d'un très grand DataFrame(de types de données mixtes) et je voudrais profiter de plusieurs cœurs.

Je peux créer un itérateur à partir des groupes et utiliser le module multitraitement, mais ce n'est pas efficace car chaque groupe et les résultats de la fonction doivent être picklés pour la messagerie entre les processus.

Existe-t-il un moyen d'éviter le décapage ou même d'éviter la copie du DataFramecomplètement? Il semble que les fonctions de mémoire partagée des modules multiprocesseurs soient limitées aux numpytableaux. Il y a-t-il des alternatives?

user2303
la source
Autant que je sache, il n'y a aucun moyen de partager des objets arbitraires. Je me demande, si le décapage prend tellement plus de temps, que le gain grâce au multi-traitement. Vous devriez peut-être chercher la possibilité de créer des lots de travaux plus volumineux pour chaque processus afin de réduire le temps de décapage relatif. Une autre possibilité serait d'utiliser le multitraitement lorsque vous créez les groupes.
Sebastian Werk
3
Je fais quelque chose comme ça, mais en utilisant UWSGI, Flask et preforking: je charge la dataframe pandas dans un processus, je la fourche x fois (en en faisant un objet de mémoire partagée), puis j'appelle ces processus à partir d'un autre processus python où je concat les résultats. atm J'utilise JSON comme processus de communication, mais cela arrive (encore très expérimental): pandas.pydata.org/pandas-docs/dev/io.html#msgpack-experimental
Carst
Au fait, avez-vous déjà regardé HDF5 avec segmentation? (HDF5 n'est pas enregistré pour l'écriture simultanée, mais vous pouvez également enregistrer dans des fichiers séparés et à la fin concaténer des éléments)
Carst
7
cela sera ciblé pour 0.14, voir ce numéro: github.com/pydata/pandas/issues/5751
Jeff
4
@Jeff a été poussé à 0,15 = (
pyCthon

Réponses:

12

D'après les commentaires ci-dessus, il semble que cela soit prévu depuis un pandascertain temps (il y a aussi un rosettaprojet intéressant que je viens de remarquer).

Cependant, jusqu'à ce que toutes les fonctionnalités parallèles soient incorporées pandas, j'ai remarqué qu'il était très facile d'écrire des augmentations parallèles efficaces et sans copie de mémoire pandasen utilisant directement cython+ OpenMP et C ++.

Voici un court exemple d'écriture d'un groupby-sum parallèle, dont l'utilisation est quelque chose comme ceci:

import pandas as pd
import para_group_demo

df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)

et la sortie est:

     sum
key     
0      6
1      11
2      4

Remarque Sans aucun doute, la fonctionnalité de cet exemple simple fera éventuellement partie de pandas. Cependant, certaines choses seront plus naturelles à paralléliser en C ++ pendant un certain temps, et il est important de savoir à quel point il est facile de les combiner pandas.


Pour ce faire, j'ai écrit une simple extension de fichier source unique dont le code suit.

Cela commence par des importations et des définitions de type

from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map

cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange

import pandas as pd

ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t

Le unordered_maptype C ++ est destiné à la sommation par un seul thread et le type vectorest à la somme par tous les threads.

Passons maintenant à la fonction sum. Il commence par des vues de mémoire typées pour un accès rapide:

def sum(crit, vals):
    cdef int64_t[:] crit_view = crit.values
    cdef int64_t[:] vals_view = vals.values

La fonction continue en divisant le semi-également aux threads (ici codé en dur à 4), et en faisant additionner chaque thread les entrées de sa plage:

    cdef uint64_t num_threads = 4
    cdef uint64_t l = len(crit)
    cdef uint64_t s = l / num_threads + 1
    cdef uint64_t i, j, e
    cdef counts_vec_t counts
    counts = counts_vec_t(num_threads)
    counts.resize(num_threads)
    with cython.boundscheck(False):
        for i in prange(num_threads, nogil=True): 
            j = i * s
            e = j + s
            if e > l:
                e = l
            while j < e:
                counts[i][crit_view[j]] += vals_view[j]
                inc(j)

Lorsque les threads sont terminés, la fonction fusionne tous les résultats (des différentes plages) en un seul unordered_map:

    cdef counts_t total
    cdef counts_it_t it, e_it
    for i in range(num_threads):
        it = counts[i].begin()
        e_it = counts[i].end()
        while it != e_it:
            total[deref(it).first] += deref(it).second
            inc(it)        

Il ne reste plus qu'à créer DataFrameet renvoyer les résultats:

    key, sum_ = [], []
    it = total.begin()
    e_it = total.end()
    while it != e_it:
        key.append(deref(it).first)
        sum_.append(deref(it).second)
        inc(it)

    df = pd.DataFrame({'key': key, 'sum': sum_})
    df.set_index('key', inplace=True)
    return df
Ami Tavory
la source