Faire appliquer Pandas DataFrame () utiliser tous les cœurs?

98

Depuis août 2017, Pandas DataFame.apply () est malheureusement encore limité à travailler avec un seul cœur, ce qui signifie qu'une machine multicœur perdra la majorité de son temps de calcul lorsque vous exécutez df.apply(myfunc, axis=1).

Comment pouvez-vous utiliser tous vos cœurs pour exécuter Apply sur un dataframe en parallèle?

Roko Mijic
la source

Réponses:

73

Vous pouvez utiliser le swifterpackage:

pip install swifter

Il fonctionne comme un plugin pour les pandas, vous permettant de réutiliser la applyfonction:

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

Il trouvera automatiquement le moyen le plus efficace de paralléliser la fonction, qu'elle soit vectorisée (comme dans l'exemple ci-dessus) ou non.

Plus d'exemples et une comparaison des performances sont disponibles sur GitHub. Notez que le package est en cours de développement actif, donc l'API peut changer.

Notez également que cela ne fonctionnera pas automatiquement pour les colonnes de chaîne. Lors de l'utilisation de chaînes, Swifter se repliera sur un Pandas «simple» apply, qui ne sera pas parallèle. Dans ce cas, même le forcer à l'utiliser daskne créera pas d'améliorations des performances, et vous feriez mieux de simplement diviser votre ensemble de données manuellement et paralléliser à l'aide demultiprocessing .

slhck
la source
1
Notre pure curiosité, y a-t-il un moyen de limiter le nombre de cœurs qu'il utilise lors de l'application parallèle? J'ai un serveur partagé, donc si je prends les 32 cœurs, personne ne sera heureux.
Maksim Khaitovich
1
@MaximHaytovich je ne sais pas. Swifter utilise dask en arrière-plan, alors peut-être qu'il respecte ces paramètres: stackoverflow.com/a/40633117/435093 - sinon, je recommanderais d'ouvrir un problème sur GitHub. L'auteur est très réactif.
slhck
@slhck merci! Va creuser un peu plus. De toute façon, cela ne semble pas fonctionner sur le serveur Windows - il se bloque juste de ne rien faire sur la tâche de jouet
Maksim Khaitovich
pouvez-vous s'il vous plaît m'aider à répondre à ceci: - stackoverflow.com/questions/53561794/…
ak3191
2
Pour les chaînes, ajoutez simplement allow_dask_on_strings(enable=True)comme ceci: df.swifter.allow_dask_on_strings(enable=True).apply(some_function) Source: github.com/jmcarpenter2/swifter/issues/45
Sumit Sidana
99

Le moyen le plus simple est d'utiliser les map_partitions de Dask . Vous avez besoin de ces importations (vous en aurez besoin pip install dask):

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get

et la syntaxe est

data = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def myfunc(x,y,z, ...): return <whatever>

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)  

(Je crois que 30 est un nombre approprié de partitions si vous avez 16 cœurs). Juste pour être complet, j'ai chronométré la différence sur ma machine (16 cœurs):

data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)

ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)  
def vectorized(): return myfunc(data['col1'], data['col2']  )

t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))

28,16970546543598

t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))

2,708152851089835

t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))

0,010668013244867325

Donner un facteur 10 d'accélération depuis les pandas s'applique à dask s'applique aux partitions. Bien sûr, si vous avez une fonction que vous pouvez vectoriser, vous devriez - dans ce cas, la fonction ( y*(x**2+1)) est trivialement vectorisée, mais il y a beaucoup de choses qui sont impossibles à vectoriser.

Roko Mijic
la source
2
Bon à savoir, merci pour la publication. Pouvez-vous expliquer pourquoi vous avez choisi 30 partitions? Les performances changent-elles lors de la modification de cette valeur?
Andrew L
2
@AndrewL Je suppose que chaque partition est desservie par un processus séparé, et avec 16 cœurs, je suppose que 16 ou 32 processus peuvent s'exécuter simultanément. Je l'ai essayé et les performances semblent s'améliorer jusqu'à 32 partitions, mais d'autres augmentations n'ont aucun effet bénéfique. Je suppose qu'avec une machine quad-core, vous voudriez 8 partitions, etc. Notez que j'ai remarqué une amélioration entre 16 et 32, donc je pense que vous voulez vraiment 2x $ NUM_PROCESSORS
Roko Mijic
8
La seule chose estThe get= keyword has been deprecated. Please use the scheduler= keyword instead with the name of the desired scheduler like 'threads' or 'processes'
wordsforthewise
4
Pour dask v0.20.0 et les versions ultérieures, utilisez ddata.map_partitions (lambda df: df.apply ((lambda row: myfunc (* row)), axis = 1)). Compute (scheduler = 'processus'), ou l'un des autres options du planificateur. Le code actuel lance "TypeError: Le mot-clé get = a été supprimé. Veuillez utiliser le mot-clé scheduler = à la place avec le nom du planificateur souhaité comme" threads "ou" processus ""
mork
1
Assurez-vous qu'avant cela, la trame de données ne possède aucun index en double lors de son lancement ValueError: cannot reindex from a duplicate axis. Pour contourner cela, vous devez soit supprimer les index dupliqués df = df[~df.index.duplicated()]soit réinitialiser vos index par df.reset_index(inplace=True).
Habib Karbasian
23

vous pouvez essayer à la pandarallelplace: Un outil simple et efficace pour paralléliser vos opérations pandas sur tous vos processeurs (sous Linux et macOS)

  • La parallélisation a un coût (instanciation de nouveaux processus, envoi de données via mémoire partagée, etc ...), donc la parallélisation n'est efficace que si la quantité de calcul à paralléliser est suffisamment élevée. Pour très peu de données, l'utilisation de la parallez ne vaut pas toujours la peine.
  • Les fonctions appliquées ne doivent PAS être des fonctions lambda.
from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)

# ALLOWED
def func(x):
    return sin(x**2)

df.parallel_apply(func, axis=1)

voir https://github.com/nalepae/pandarallel

G_KOBELIEF
la source
bonjour, je ne peux pas résoudre un problème, en utilisant pandarallel il y a une erreur: AttributeError: Impossible de sélectionner l'objet local 'prepare_worker. <locals> .closure. <locals> .wrapper'. Pouvez-vous m'aider avec ça?
Alex Cam le
@Alex Sry Je ne suis pas le développeur de ce module. À quoi ressemblent vos codes? Vous pouvez essayer de déclarer vos "fonctions internes" comme globales? (juste deviner)
G_KOBELIEF
@AlexCam Votre fonction doit être définie en dehors des autres fonctions afin que python puisse la sélectionner pour le multiprocesseur
Kenan
@G_KOBELIEF Avec Python> 3.6, nous pouvons utiliser la fonction lambda avec pandaparallel
user110244
16

Si vous souhaitez rester en python natif:

import multiprocessing as mp

with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(f, df['col'])

appliquera la fonction fde manière parallèle à la colonne coldu dataframedf

Olivier Cruchant
la source
Suite à une approche comme celle-ci, j'ai obtenu un ValueError: Length of values does not match length of indexde __setitem__dedans pandas/core/frame.py. Je ne sais pas si j'ai fait quelque chose de mal ou si l'attribution à df['newcol']n'est pas sûre pour les threads.
Rattle
2
Vous pouvez écrire le pool.map dans une liste intermédiaire temp_result pour permettre de vérifier si la longueur correspond au df, puis faire un df ['newcol'] = temp_result?
Olivier Cruchant
vous voulez dire créer la nouvelle colonne? qu'utiliseriez-vous?
Olivier Cruchant le
oui, en affectant le résultat de la carte à la nouvelle colonne du dataframe. La carte ne renvoie-t-elle pas une liste du résultat de chaque morceau envoyé à la fonction f? Alors, que se passe-t-il lorsque vous attribuez cela à la colonne 'newcol? Utilisation de Pandas et Python 3
Mina
Cela fonctionne vraiment très bien! Est-ce que tu l'as essayé? Il crée une liste de la même longueur du df, du même ordre que ce qui a été envoyé. Il fait littéralement c2 = f (c1) de manière parallèle. Il n'y a pas de moyen plus simple de multi-traiter en python. Sur le plan des performances, il semble que Ray puisse aussi faire de bonnes choses ( versdatascience.com / ... ) mais ce n'est pas aussi mature et l'installation ne se passe pas toujours bien selon mon expérience
Olivier Cruchant
1

Voici un exemple de transformateur de base sklearn, dans lequel les pandas s'appliquent est parallélisé

import multiprocessing as mp
from sklearn.base import TransformerMixin, BaseEstimator

class ParllelTransformer(BaseEstimator, TransformerMixin):
    def __init__(self,
                 n_jobs=1):
        """
        n_jobs - parallel jobs to run
        """
        self.variety = variety
        self.user_abbrevs = user_abbrevs
        self.n_jobs = n_jobs
    def fit(self, X, y=None):
        return self
    def transform(self, X, *_):
        X_copy = X.copy()
        cores = mp.cpu_count()
        partitions = 1

        if self.n_jobs <= -1:
            partitions = cores
        elif self.n_jobs <= 0:
            partitions = 1
        else:
            partitions = min(self.n_jobs, cores)

        if partitions == 1:
            # transform sequentially
            return X_copy.apply(self._transform_one)

        # splitting data into batches
        data_split = np.array_split(X_copy, partitions)

        pool = mp.Pool(cores)

        # Here reduce function - concationation of transformed batches
        data = pd.concat(
            pool.map(self._preprocess_part, data_split)
        )

        pool.close()
        pool.join()
        return data
    def _transform_part(self, df_part):
        return df_part.apply(self._transform_one)
    def _transform_one(self, line):
        # some kind of transformations here
        return line

pour plus d'informations, voir https://towardsdatascience.com/4-easy-steps-to-improve-your-machine-learning-code-performance-88a0b0eeffa8

Maxim Balatsko
la source