Multiprocessing Python PicklingError: Can't pickle <type 'function'>

244

Je suis désolé de ne pas pouvoir reproduire l'erreur avec un exemple plus simple et mon code est trop compliqué à publier. Si j'exécute le programme dans le shell IPython au lieu du Python normal, les choses fonctionnent bien.

J'ai recherché quelques notes précédentes sur ce problème. Ils ont tous été causés par l'utilisation de pool pour appeler la fonction définie dans une fonction de classe. Mais ce n'est pas le cas pour moi.

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

J'apprécierais toute aide.

Mise à jour : La fonction I pickle est définie au niveau supérieur du module. Bien qu'il appelle une fonction qui contient une fonction imbriquée. c'est-à-dire, f()appelle des g()appels h()qui ont une fonction imbriquée i(), et j'appelle pool.apply_async(f). f(), g(), h()Sont tous définis au niveau supérieur. J'ai essayé un exemple plus simple avec ce modèle et cela fonctionne cependant.

Vengeance
la source
3
La réponse de niveau supérieur / acceptée est bonne, mais cela pourrait signifier que vous devez restructurer votre code, ce qui pourrait être douloureux. Je recommanderais à tous ceux qui ont ce problème de lire également les réponses supplémentaires en utilisant dillet pathos. Cependant, je n'ai aucune chance avec l'une des solutions lorsque je travaille avec vtkobjects :( N'importe qui a réussi à exécuter du code python en traitement parallèle vtkPolyData?
Chris

Réponses:

306

Voici une liste de ce qui peut être mariné . En particulier, les fonctions ne sont picklables que si elles sont définies au niveau supérieur d'un module.

Ce morceau de code:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()

renvoie une erreur presque identique à celle que vous avez publiée:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Le problème est que pooltoutes les méthodes utilisent a mp.SimpleQueuepour transmettre des tâches aux processus de travail. Tout ce qui passe par le mp.SimpleQueuedoit être pickable, et foo.workn'est pas picklable car il n'est pas défini au niveau supérieur du module.

Il peut être corrigé en définissant une fonction au niveau supérieur, qui appelle foo.work():

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

Notez que fooc'est pickable, car Fooest défini au niveau supérieur et foo.__dict__est picklable.

unutbu
la source
2
Merci pour votre réponse. J'ai mis à jour ma question. Je ne pense pas que ce soit la cause, cependant
Vendetta
7
Pour obtenir une PicklingError, quelque chose doit être placé dans la file d'attente qui n'est pas picklable. Ce pourrait être la fonction ou ses arguments. Pour en savoir plus sur le problème, je vous suggère de faire une copie de votre programme et de commencer à le réduire, le rendant plus simple et plus simple, à chaque fois que vous réexécutez le programme pour voir si le problème persiste. Quand cela devient vraiment simple, vous aurez soit découvert le problème vous-même, soit vous aurez quelque chose que vous pouvez poster ici.
unutbu
3
Aussi: si vous définissez une fonction au niveau supérieur d'un module, mais qu'elle est décorée, alors la référence sera à la sortie du décorateur, et vous obtiendrez quand même cette erreur.
bobpoekert
5
Seulement en retard de 5 ans, mais je viens de rencontrer cela. Il s'avère que le "niveau supérieur" doit être pris plus littéralement que d'habitude: il me semble que la définition de la fonction doit précéder l' initialisation du pool (c'est-à-dire la pool = Pool()ligne ici ). Je ne m'attendais pas à cela, et cela pourrait être la raison pour laquelle le problème d'OP a persisté.
Andras Deak
4
En particulier, les fonctions ne sont picklables que si elles sont définies au niveau supérieur d'un module. Il semble que le résultat de l'application functool.partialà une fonction de niveau supérieur soit également picklable, même s'il est défini dans une autre fonction.
user1071847
96

J'utiliserais pathos.multiprocesssing, au lieu de multiprocessing. pathos.multiprocessingest une fourchette de multiprocessingces utilisations dill. dillpeut sérialiser presque n'importe quoi en python, vous pouvez donc en envoyer beaucoup plus en parallèle. Le pathosfork a également la possibilité de travailler directement avec plusieurs fonctions d'argument, comme vous en avez besoin pour les méthodes de classe.

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

Obtenez pathos(et si vous le souhaitez dill) ici: https://github.com/uqfoundation

Mike McKerns
la source
5
travaillé un régal. Pour quelqu'un d'autre, j'ai installé les deux bibliothèques par: sudo pip install git+https://github.com/uqfoundation/dill.git@masteretsudo pip install git+https://github.com/uqfoundation/pathos.git@master
Alexander McFarlane
5
@AlexanderMcFarlane Je n'installerais pas de paquets python avec sudo(à partir de sources externes telles que github en particulier). Au lieu de cela, je recommanderais d'exécuter:pip install --user git+...
Chris
L'utilisation pip install pathosne fonctionne tout simplement pas et donne ce message:Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos)
xApple
11
pip install pathosfonctionne maintenant et pathosest compatible python 3.
Mike McKerns
3
@DanielGoldfarb: multiprocessest une fourchette d' multiprocessingdilla remplacé pickleà plusieurs endroits dans le code ... mais essentiellement, c'est tout. pathosfournit des couches API supplémentaires multiprocesset a également des backends supplémentaires. Mais, c'est l'essentiel.
Mike McKerns du
29

Comme d'autres l'ont dit, multiprocessingil ne peut transférer que des objets Python vers des processus de travail qui peuvent être décapés. Si vous ne pouvez pas réorganiser votre code comme décrit par unutbu, vous pouvez utiliserdill les capacités étendues de pickling / unpickling pour transférer des données (en particulier des données de code) comme je le montre ci-dessous.

Cette solution nécessite uniquement l'installation de dillet aucune autre bibliothèque comme pathos:

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)


def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()
rockportrocker
la source
6
Je suis l' auteur dillet pathos… et pendant que vous avez raison, n'est-il pas tellement plus agréable et plus propre et plus flexible à utiliser aussi pathosque dans ma réponse? Ou peut-être que je suis un peu biaisé…
Mike McKerns
4
Je n'étais pas au courant de l'état d'avancement pathosau moment de la rédaction de cet article et je voulais présenter une solution très proche de la réponse. Maintenant que j'ai vu votre solution, je conviens que c'est la voie à suivre.
rocksportrocker
J'ai lu votre solution et je me suis Doh… I didn't even think of doing it like that. dit: c'était plutôt cool.
Mike McKerns
4
Merci d'avoir posté, j'ai utilisé cette approche pour les arguments de dilling / undilling qui ne pouvaient pas être décapés: stackoverflow.com/questions/27883574/…
jazzblue
@rocksportrocker. Je lis cet exemple et je ne comprends pas pourquoi il y a une forboucle explicite . Je verrais normalement une routine parallèle prendre une liste et retourner une liste sans boucle.
user1700890
20

J'ai trouvé que je peux également générer exactement cette sortie d'erreur sur un morceau de code fonctionnant parfaitement en essayant d'utiliser le profileur dessus.

Notez que c'était sur Windows (où la fourche est un peu moins élégante).

Je courais:

python -m profile -o output.pstats <script> 

Et constaté que la suppression du profilage supprimait l'erreur et que le placement du profilage la restaurait. Me rendait fou aussi parce que je savais que le code fonctionnait. Je vérifiais si quelque chose avait mis à jour pool.py ... puis avait un sentiment de naufrage et éliminé le profilage et c'était tout.

Publier ici pour les archives au cas où quelqu'un d'autre les rencontrerait.

Ezekiel Kruglick
la source
3
WOW, merci d'avoir mentionné! Cela m'a rendu fou pendant la dernière heure environ; J'ai tout essayé jusqu'à un exemple très simple - rien ne semblait fonctionner. Mais j'ai également eu le profileur en cours d'exécution dans mon fichier batch :(
tim
1
Oh, je ne peux pas te remercier assez. Cela semble si stupide, car c'est tellement inattendu. Je pense que cela devrait être mentionné dans les documents. Tout ce que j'avais était une instruction d'importation pdb, et une simple fonction de haut niveau avec juste un passn'était pas «picklable».
0xc0de
10

Lorsque ce problème survient, multiprocessingune solution simple consiste à passer de Poolà ThreadPool. Cela peut être fait sans changement de code autre que l’importation

from multiprocessing.pool import ThreadPool as Pool

Cela fonctionne parce que ThreadPool partage la mémoire avec le thread principal, plutôt que de créer un nouveau processus - cela signifie que le décapage n'est pas requis.

L'inconvénient de cette méthode est que python n'est pas le plus grand langage de gestion des threads - il utilise quelque chose appelé Global Interpreter Lock pour rester sûr, ce qui peut ralentir certains cas d'utilisation ici. Cependant, si vous interagissez principalement avec d'autres systèmes (exécution de commandes HTTP, conversation avec une base de données, écriture sur des systèmes de fichiers), votre code n'est probablement pas lié par le processeur et ne prendra pas beaucoup de succès. En fait, j'ai trouvé lors de l'écriture de benchmarks HTTP / HTTPS que le modèle fileté utilisé ici a moins de temps et de délais, car le temps de création de nouveaux processus est beaucoup plus élevé que le temps de création de nouveaux threads.

Donc, si vous traitez une tonne de choses dans l'espace utilisateur python, ce n'est peut-être pas la meilleure méthode.

tedivm
la source
2
Mais alors vous n'utilisez qu'un seul processeur (au moins avec les versions Python régulières qui utilisent le GIL ), ce qui va à l'encontre de l'objectif.
Endre Both
Cela dépend vraiment de la finalité. Le Global Interpreter Lock signifie qu'une seule instance à la fois peut exécuter du code python, mais pour les actions qui bloquent fortement (accès au système de fichiers, téléchargement de fichiers volumineux ou multiples, exécution de code externe), le GIL n'est finalement pas un problème. Dans certains cas, la surcharge engendrée par l'ouverture de nouveaux processus (plutôt que les threads) l'emporte sur la surcharge GIL.
tedivm
C'est vrai, merci. Vous voudrez peut-être toujours inclure une mise en garde dans la réponse. De nos jours, lorsque l'augmentation de la puissance de traitement se présente principalement sous la forme de cœurs de processeur plus plutôt que plus puissants, le passage d'une exécution multicœur à une exécution à cœur unique est un effet secondaire assez important.
Endre Both
Bon point - j'ai mis à jour la réponse avec plus de détails. Je tiens à souligner que le passage au multitraitement threadé ne fait pas fonctionner python uniquement sur un seul cœur.
tedivm
4

Cette solution nécessite uniquement l'installation d'aneth et pas d'autres bibliothèques comme pathos

def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
    """
    Unpack dumped function as target function and call it with arguments.

    :param (dumped_function, item, args, kwargs):
        a tuple of dumped function and its arguments
    :return:
        result of target function
    """
    target_function = dill.loads(dumped_function)
    res = target_function(item, *args, **kwargs)
    return res


def pack_function_for_map(target_function, items, *args, **kwargs):
    """
    Pack function and arguments to object that can be sent from one
    multiprocessing.Process to another. The main problem is:
        «multiprocessing.Pool.map*» or «apply*»
        cannot use class methods or closures.
    It solves this problem with «dill».
    It works with target function as argument, dumps it («with dill»)
    and returns dumped function with arguments of target function.
    For more performance we dump only target function itself
    and don't dump its arguments.
    How to use (pseudo-code):

        ~>>> import multiprocessing
        ~>>> images = [...]
        ~>>> pool = multiprocessing.Pool(100500)
        ~>>> features = pool.map(
        ~...     *pack_function_for_map(
        ~...         super(Extractor, self).extract_features,
        ~...         images,
        ~...         type='png'
        ~...         **options,
        ~...     )
        ~... )
        ~>>>

    :param target_function:
        function, that you want to execute like  target_function(item, *args, **kwargs).
    :param items:
        list of items for map
    :param args:
        positional arguments for target_function(item, *args, **kwargs)
    :param kwargs:
        named arguments for target_function(item, *args, **kwargs)
    :return: tuple(function_wrapper, dumped_items)
        It returs a tuple with
            * function wrapper, that unpack and call target function;
            * list of packed target function and its' arguments.
    """
    dumped_function = dill.dumps(target_function)
    dumped_items = [(dumped_function, item, args, kwargs) for item in items]
    return apply_packed_function_for_map, dumped_items

Il fonctionne également pour les tableaux numpy.

Ilia w495 Nikitin
la source
2
Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Cette erreur se produira également si vous avez une fonction intégrée dans l'objet modèle qui a été transmise au travail asynchrone.

Assurez-vous donc de vérifier que les objets de modèle transmis n'ont pas de fonctions intégrées. (Dans notre cas, nous utilisions laFieldTracker() fonction de django-model-utils à l'intérieur du modèle pour suivre un certain champ). Voici le lien vers le problème GitHub pertinent.

Penkey Suresh
la source
0

S'appuyant sur la solution @rocksportrocker, il serait judicieux de l'analyser lors de l'envoi et de la récupération des résultats.

import dill
import itertools
def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    res = fun(*args)
    res = dill.dumps(res)
    return res

def dill_map_async(pool, fun, args_list,
                   as_tuple=True,
                   **kw):
    if as_tuple:
        args_list = ((x,) for x in args_list)

    it = itertools.izip(
        itertools.cycle([fun]),
        args_list)
    it = itertools.imap(dill.dumps, it)
    return pool.map_async(run_dill_encoded, it, **kw)

if __name__ == '__main__':
    import multiprocessing as mp
    import sys,os
    p = mp.Pool(4)
    res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
                  [lambda x:x+1]*10,)
    res = res.get(timeout=100)
    res = map(dill.loads,res)
    print(res)
devrait voir
la source