Indicateur de progression pendant les opérations pandas

159

J'effectue régulièrement des opérations de pandas sur des trames de données de plus de 15 millions de lignes et j'aimerais avoir accès à un indicateur de progression pour des opérations particulières.

Existe-t-il un indicateur de progression basé sur du texte pour les opérations de fractionnement-appliquer-combiner pandas?

Par exemple, dans quelque chose comme:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

feature_rollupest une fonction quelque peu impliquée qui prend de nombreuses colonnes DF et crée de nouvelles colonnes utilisateur par diverses méthodes. Ces opérations peuvent prendre un certain temps pour les grandes trames de données, j'aimerais donc savoir s'il est possible d'avoir une sortie texte dans un bloc-notes iPython qui me met à jour sur la progression.

Jusqu'à présent, j'ai essayé des indicateurs de progression de boucle canoniques pour Python, mais ils n'interagissent pas avec les pandas de manière significative.

J'espère qu'il y a quelque chose que j'ai négligé dans la bibliothèque / documentation de pandas qui permet de connaître la progression d'un split-apply-combine. Une implémentation simple pourrait peut-être examiner le nombre total de sous-ensembles de trames de données sur lesquels la applyfonction travaille et rapporter la progression comme la fraction terminée de ces sous-ensembles.

Est-ce peut-être quelque chose qui doit être ajouté à la bibliothèque?

cwharland
la source
avez-vous fait un% élagage (profil) sur le code? parfois, vous pouvez effectuer des opérations sur l'ensemble du cadre avant de postuler pour éliminer les goulots d'étranglement
Jeff
@Jeff: vous pariez, je l'ai fait plus tôt pour en tirer le maximum de performance. Le problème se résume vraiment à la limite de pseudo-réduction de carte sur laquelle je travaille, car les lignes se comptent par dizaines de millions, donc je ne m'attends pas à ce que les augmentations de super vitesse veuillent juste des commentaires sur les progrès.
cwharland
Envisagez de cythoniser: pandas.pydata.org/pandas-docs/dev/…
Andy Hayden
@AndyHayden - Comme je l'ai commenté sur votre réponse, votre implémentation est assez bonne et ajoute un peu de temps à l'ensemble du travail. J'ai également cythonisé trois opérations dans le cumul des fonctionnalités qui ont récupéré tout le temps qui est désormais dédié à la progression des rapports. Donc, à la fin, je parie que j'aurai des barres de progression avec une réduction du temps de traitement total si je continue avec cython sur toute la fonction.
cwharland le

Réponses:

279

En raison de la demande populaire, tqdma ajouté un support pour pandas. Contrairement aux autres réponses, cela ne ralentira pas sensiblement les pandas - voici un exemple pour DataFrameGroupBy.progress_apply:

import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm  # for notebooks

df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))

# Create and register a new `tqdm` instance with `pandas`
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()

# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)

Si vous souhaitez savoir comment cela fonctionne (et comment le modifier pour vos propres rappels), consultez les exemples sur github , la documentation complète sur pypi , ou importez le module et exécutez help(tqdm).

ÉDITER


Pour répondre directement à la question d'origine, remplacez:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

avec:

from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)

Remarque: tqdm <= v4.8 : Pour les versions de tqdm inférieures à 4.8, au lieu de cela, tqdm.pandas()vous deviez faire:

from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())
casper.dcl
la source
5
tqdma été en fait créé pour des itérables simples à l'origine: from tqdm import tqdm; for i in tqdm( range(int(1e8)) ): passLe support pandas était un hack récent que j'ai fait :)
casper.dcl
6
Btw, si vous utilisez des notebooks Jupyter, vous pouvez également utiliser tqdm_notebooks pour obtenir une plus jolie barre. Avec les pandas, vous devez actuellement l'instancier comme from tqdm import tqdm_notebook; tqdm_notebook().pandas(*args, **kwargs) voir ici
grinsbaeckchen
2
À partir de la version 4.8.1 - utilisez plutôt tqdm.pandas (). github.com/tqdm/tqdm/commit/…
mork
1
Merci, @mork a raison. Nous travaillons (lentement) vers la tqdmv5 qui rend les choses plus modulaires.
casper.dcl
1
Pour des recommandations de syntaxe récentes, voir la documentation de tqdm Pandas ici: pypi.python.org/pypi/tqdm#pandas-integration
Manu CJ
18

Pour modifier la réponse de Jeff (et avoir ceci comme fonction réutilisable).

def logged_apply(g, func, *args, **kwargs):
    step_percentage = 100. / len(g)
    import sys
    sys.stdout.write('apply progress:   0%')
    sys.stdout.flush()

    def logging_decorator(func):
        def wrapper(*args, **kwargs):
            progress = wrapper.count * step_percentage
            sys.stdout.write('\033[D \033[D' * 4 + format(progress, '3.0f') + '%')
            sys.stdout.flush()
            wrapper.count += 1
            return func(*args, **kwargs)
        wrapper.count = 0
        return wrapper

    logged_func = logging_decorator(func)
    res = g.apply(logged_func, *args, **kwargs)
    sys.stdout.write('\033[D \033[D' * 4 + format(100., '3.0f') + '%' + '\n')
    sys.stdout.flush()
    return res

Note: le pourcentage appliquent progrès mises à jour en ligne . Si votre fonction stdouts, cela ne fonctionnera pas.

In [11]: g = df_users.groupby(['userID', 'requestDate'])

In [12]: f = feature_rollup

In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]: 
...

Comme d'habitude, vous pouvez ajouter ceci à vos objets groupby en tant que méthode:

from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply

In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]: 
...

Comme mentionné dans les commentaires, ce n'est pas une fonctionnalité que les pandas de base seraient intéressés à implémenter. Mais python vous permet de les créer pour de nombreux objets / méthodes pandas (cela serait un peu de travail ... bien que vous devriez être en mesure de généraliser cette approche).

Andy Hayden
la source
Je dis "pas mal de travail", mais vous pourriez probablement réécrire toute cette fonction en décorateur (plus général).
Andy Hayden le
Merci d'avoir développé le message de Jeff. J'ai implémenté les deux et le ralentissement pour chacun est assez minime (ajouté un total de 1,1 minute à une opération qui a pris 27 minutes). De cette façon, je peux voir les progrès et étant donné la nature ad hoc de ces opérations, je pense que c'est un ralentissement acceptable.
cwharland le
Excellent, heureux que cela ait aidé. J'ai été en fait surpris par le ralentissement (quand j'ai essayé un exemple), je m'attendais à ce que ce soit bien pire.
Andy Hayden
1
Pour ajouter encore à l'efficacité des méthodes publiées, j'étais paresseux à propos de l'importation de données (les pandas sont tout simplement trop doués pour gérer les csv désordonnés !!) et quelques-unes de mes entrées (~ 1%) avaient complètement supprimé les insertions (pensez-y ensemble enregistrements insérés dans des champs uniques). Leur élimination entraîne une accélération massive du cumul des fonctionnalités car il n'y avait aucune ambiguïté sur ce qu'il fallait faire pendant les opérations de fractionnement-application-combinaison.
cwharland
1
Je suis descendu à 8 minutes ... mais j'ai ajouté quelque chose au cumul des fonctionnalités (plus de fonctionnalités -> meilleure AUC!). Ces 8 minutes sont par morceau (deux morceaux au total actuellement) avec chaque morceau aux alentours de 12 millions de lignes. Alors oui ... 16 minutes pour faire de lourdes opérations sur 24 millions de lignes en utilisant HDFStore (et il y a des trucs nltk dans le cumul des fonctionnalités). Plutôt bien. Espérons
qu'Internet
11

Au cas où vous auriez besoin d'aide pour savoir comment l'utiliser dans un notebook Jupyter / ipython, comme je l'ai fait, voici un guide utile et une source vers l' article pertinent :

from tqdm._tqdm_notebook import tqdm_notebook
import pandas as pd
tqdm_notebook.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.groupby(0).progress_apply(lambda x: x**2)

Notez le trait de soulignement dans l'instruction d'importation pour _tqdm_notebook. Comme le mentionne l'article référencé, le développement est en phase bêta tardive.

Victor Vulovic
la source
8

Pour tous ceux qui cherchent à appliquer tqdm sur leur code d'application de pandas parallèle personnalisé.

(J'ai essayé certaines des bibliothèques de parallélisation au fil des ans, mais je n'ai jamais trouvé de solution de parallélisation à 100%, principalement pour la fonction apply, et je devais toujours revenir pour mon code "manuel".)

df_multi_core - c'est celui que vous appelez. Il accepte:

  1. Votre objet df
  2. Le nom de la fonction que vous souhaitez appeler
  3. Le sous-ensemble de colonnes sur lequel la fonction peut être exécutée (aide à réduire le temps / la mémoire)
  4. Le nombre de travaux à exécuter en parallèle (-1 ou omis pour tous les cœurs)
  5. Tout autre kwargs accepté par la fonction df (comme "axis")

_df_split - il s'agit d'une fonction d'assistance interne qui doit être positionnée globalement sur le module en cours d'exécution (Pool.map est "dépendant du placement"), sinon je la localiserais en interne ..

voici le code de mon résumé (j'ajouterai plus de tests de fonction pandas ici):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

Ci-dessous est un code de test pour une application parallélisée avec tqdm "progress_apply".

from time import time
from tqdm import tqdm
tqdm.pandas()

if __name__ == '__main__': 
    sep = '-' * 50

    # tqdm progress_apply test      
    def apply_f(row):
        return row['c1'] + 0.1
    N = 1000000
    np.random.seed(0)
    df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})

    print('testing pandas apply on {}\n{}'.format(df.shape, sep))
    t1 = time()
    res = df.progress_apply(apply_f, axis=1)
    t2 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    # res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    t4 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))

Dans la sortie, vous pouvez voir 1 barre de progression pour l'exécution sans parallélisation et des barres de progression par cœur lors de l'exécution avec parallélisation. Il y a un léger accrochage et parfois le reste des cœurs apparaît à la fois, mais même dans ce cas, je pense que c'est utile car vous obtenez les statistiques de progression par cœur (it / sec et total des enregistrements, par exemple)

entrez la description de l'image ici

Merci @abcdaa pour cette superbe bibliothèque!

mork
la source
1
Merci @mork - n'hésitez pas à ajouter à github.com/tqdm/tqdm/wiki/How-to-make-a-great-Progress-Bar ou créez une nouvelle page sur github.com/tqdm/tqdm/wiki
casper. dcl
Merci, mais try: splits = np.array_split(df[subset], njobs) except ValueError: splits = np.array_split(df, njobs)j'ai dû changer ces parties: à cause de l'exception KeyError au lieu de ValueError, passez à Exception pour gérer tous les cas.
Marius
Merci @mork - cette réponse devrait être plus élevée.
Andy
5

Vous pouvez facilement le faire avec un décorateur

from functools import wraps 

def logging_decorator(func):

    @wraps
    def wrapper(*args, **kwargs):
        wrapper.count += 1
        print "The function I modify has been called {0} times(s).".format(
              wrapper.count)
        func(*args, **kwargs)
    wrapper.count = 0
    return wrapper

modified_function = logging_decorator(feature_rollup)

puis utilisez simplement la fonction_modifiée (et changez quand vous voulez qu'elle s'imprime)

Jeff
la source
1
Un avertissement évident étant que cela ralentira votre fonction! Vous pouvez même le faire mettre à jour avec la progression stackoverflow.com/questions/5426546 /... par exemple count / len en pourcentage.
Andy Hayden
oui - vous aurez l'ordre (nombre de groupes), donc en fonction de votre goulot d'étranglement, cela pourrait faire une différence
Jeff
peut-être que la chose intuitive à faire est d'envelopper cela dans une logged_apply(g, func)fonction, où vous auriez accès à la commande et pourriez vous connecter depuis le début.
Andy Hayden le
J'ai fait ce qui précède dans ma réponse, également une mise à jour effrontée du pourcentage. En fait, je ne pouvais pas faire fonctionner le vôtre ... Je pense qu'avec le bit wraps. Si vous l'utilisez pour l'application, ce n'est pas si important de toute façon.
Andy Hayden le
1

J'ai changé la réponse de Jeff , pour inclure un total, afin que vous puissiez suivre la progression et une variable pour simplement imprimer toutes les itérations X (cela améliore en fait beaucoup les performances, si le "print_at" est raisonnablement élevé)

def count_wrapper(func,total, print_at):

    def wrapper(*args):
        wrapper.count += 1
        if wrapper.count % wrapper.print_at == 0:
            clear_output()
            sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) )
            sys.stdout.flush()
        return func(*args)
    wrapper.count = 0
    wrapper.total = total
    wrapper.print_at = print_at

    return wrapper

la fonction clear_output () est de

from IPython.core.display import clear_output

sinon sur IPython La réponse d'Andy Hayden le fait sans elle

Filipe Silva
la source