Existe-t-il un moyen simple d'exécuter pandas.DataFrame.isin en parallèle?

25

J'ai un programme de modélisation et de notation qui fait un usage intensif de la DataFrame.isinfonction des pandas, en recherchant dans les listes de Facebook des enregistrements "similaires" d'utilisateurs individuels pour chacune des quelques milliers de pages spécifiques. C'est la partie la plus chronophage du programme, plus encore que les éléments de modélisation ou de notation, simplement parce qu'il ne s'exécute que sur un cœur tandis que le reste s'exécute sur quelques dizaines simultanément.

Bien que je sache que je pourrais diviser manuellement la trame de données en morceaux et exécuter l'opération en parallèle, existe-t-il un moyen simple de le faire automatiquement? En d'autres termes, existe-t-il un type de package qui reconnaîtra que j'exécute une opération facilement déléguée et la distribuera automatiquement? C'est peut-être trop demander, mais j'ai été assez surpris dans le passé par ce qui est déjà disponible en Python, donc je pense que cela vaut la peine de demander.

Toute autre suggestion sur la façon dont cela pourrait être accompli (même si ce n'est pas par un paquet de licorne magique!) Serait également appréciée. Essentiellement, j'essaie simplement de trouver un moyen de raser 15 à 20 minutes par exécution sans passer autant de temps à coder la solution.

Therriault
la source
Quelle est la taille de votre liste de valeurs? Avez-vous essayé de le passer en ensemble? Pour le parallélisme, vous pourriez être intéressé par Joblib. Il est facile à utiliser et peut accélérer les calculs. Utilisez-le avec de gros morceaux de données.
oao
Une autre option consiste à recadrer votre problème en tant que jointure. Les jointures sont beaucoup plus rapides dans Pandas stackoverflow.com/questions/23945493/…
Brian Spiering
Encore une autre option est d'utiliser np.in1d ​​qui est également plus rapide stackoverflow.com/questions/21738882/fast-pandas-filtering
Brian Spiering

Réponses:

8

Malheureusement, la parallélisation n'est pas encore mise en œuvre dans les pandas. Vous pouvez rejoindre ce problème github si vous souhaitez participer au développement de cette fonctionnalité.

Je ne connais aucun "package de licorne magique" à cet effet, donc la meilleure chose sera d'écrire votre propre solution. Mais si vous ne voulez toujours pas y consacrer du temps et que vous voulez apprendre quelque chose de nouveau - vous pouvez essayer les deux méthodes intégrées à MongoDB (réduction de carte et structure d'agg). Voir mongodb_agg_framework .

Stanpol
la source
0

Il existe une version plus commune de cette question concernant parallélisation sur Pandas appliquent la fonction - c'est donc une question rafraîchissante :)

Tout d'abord , je veux mentionner plus rapide puisque vous avez demandé une solution "packagée", et elle apparaît sur la plupart des questions SO concernant la parallélisation des pandas.

Mais.. je voudrais quand même partager mon code personnel, car après plusieurs années de travail avec DataFrame, je n'ai jamais trouvé de solution de parallélisation à 100% (principalement pour la fonction appliquer) et j'ai toujours dû revenir pour mon " manuel ".

Grâce à vous, je l'ai rendu plus générique pour prendre en charge toute méthode (théoriquement) DataFrame par son nom (vous n'aurez donc pas à conserver de versions pour isin, à appliquer, etc.).

Je l'ai testé sur les fonctions "isin", "apply" et "isna" en utilisant à la fois python 2.7 et 3.6. C'est moins de 20 lignes, et j'ai suivi la convention de nommage des pandas comme "sous-ensemble" et "njobs".

J'ai également ajouté une comparaison de temps avec le code équivalent de dask pour "isin" et il semble ~ X2 fois plus lent que cet essentiel.

Il comprend 2 fonctions:

df_multi_core - c'est celui que vous appelez. Il accepte:

  1. Votre objet df
  2. Le nom de la fonction que vous souhaitez appeler
  3. Le sous-ensemble de colonnes sur lequel la fonction peut être exécutée (aide à réduire le temps / la mémoire)
  4. Le nombre de travaux à exécuter en parallèle (-1 ou omis pour tous les cœurs)
  5. Tout autre kwargs que la fonction du df accepte (comme "axe")

_df_split - c'est une fonction d'assistance interne qui doit être positionnée globalement par rapport au module en cours d'exécution (Pool.map est "dépendant du placement"), sinon je la localiserais en interne ..

voici le code de mon essentiel (j'y ajouterai plus de tests de fonction pandas):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

Ci - dessous est un code de test pour une isine parallélisée , comparant les performances natives, multi-core gist et dask. Sur une machine I7 avec 8 cœurs physiques, j'ai obtenu une accélération d'environ X4 fois. J'adorerais entendre ce que vous obtenez sur vos vraies données!

from time import time

if __name__ == '__main__': 
    sep = '-' * 50

    # isin test
    N = 10000000
    df = pd.DataFrame({'c1': np.random.randint(low=1, high=N, size=N), 'c2': np.arange(N)})
    lookfor = np.random.randint(low=1, high=N, size=1000000)

    print('{}\ntesting pandas isin on {}\n{}'.format(sep, df.shape, sep))
    t1 = time()
    print('result\n{}'.format(df.isin(lookfor).sum()))
    t2 = time()
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    res = df_multi_core(df=df, df_f_name='isin', subset=['c1'], njobs=-1, values=lookfor)
    print('result\n{}'.format(res.sum()))
    t4 = time()
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))


    t5 = time()
    ddata = dd.from_pandas(df, npartitions=njobs)
    res = ddata.map_partitions(lambda df: df.apply(apply_f, axis=1)).compute(scheduler='processes')
    t6 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for dask implementation {}\n{}'.format(round(t6 - t5, 2), sep))

--------------------------------------------------
testing pandas isin on (10000000, 2)
--------------------------------------------------
result
c1    953213
c2    951942
dtype: int64
time for native implementation 3.87
--------------------------------------------------
result
c1    953213
dtype: int64
time for multi core implementation 1.16
--------------------------------------------------
result
c1    953213
c2    951942
dtype: int64
time for dask implementation 2.88
mork
la source
@Therriault J'ai ajouté une comparaison de tâches avec isin- il semble que l'extrait de code soit le plus efficace avec 'isin' - ~ X1.75 fois plus rapide que dask (par rapport à la applyfonction qui n'a été que 5% plus rapide que dask)
mork