Quand je lance quelque chose comme:
from multiprocessing import Pool
p = Pool(5)
def f(x):
return x*x
p.map(f, [1,2,3])
ça fonctionne bien. Cependant, en mettant cela en fonction d'une classe:
class calculate(object):
def run(self):
def f(x):
return x*x
p = Pool()
return p.map(f, [1,2,3])
cl = calculate()
print cl.run()
Me donne l'erreur suivante:
Exception in thread Thread-1:
Traceback (most recent call last):
File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/sw/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
J'ai vu un article d'Alex Martelli traitant du même genre de problème, mais ce n'était pas assez explicite.
python
multiprocessing
pickle
Mermoz
la source
la source
IPython.Parallel
, mais là, vous pouvez contourner le problème en poussant les objets vers les nœuds. Il semble assez ennuyeux de contourner ce problème avec le multitraitement.calculate
c'est picklable, il semble donc que cela puisse être résolu en 1) créant un objet fonction avec un constructeur qui copie sur unecalculate
instance, puis 2) passant une instance de cet objet fonction àPool
lamap
méthode de. Non?multiprocessing
module sont dues à son objectif d'être une implémentation multiplateforme et à l'absence d'fork(2)
appel système de type- like dans Windows. Si vous ne vous souciez pas de la prise en charge de Win32, il existe peut-être une solution de contournement plus simple basée sur les processus. Ou si vous êtes prêt à utiliser des threads au lieu de processus, vous pouvez les remplacerfrom multiprocessing import Pool
parfrom multiprocessing.pool import ThreadPool as Pool
.Réponses:
J'ai également été ennuyé par les restrictions sur le type de fonctions que pool.map pouvait accepter. J'ai écrit ce qui suit pour contourner cela. Cela semble fonctionner, même pour une utilisation récursive de parmap.
la source
Je n'ai pas pu utiliser les codes affichés jusqu'à présent car les codes utilisant "multiprocessing.Pool" ne fonctionnent pas avec les expressions lambda et les codes n'utilisant pas "multiprocessing.Pool" engendrent autant de processus qu'il y a d'éléments de travail.
J'ai adapté le code pour qu'il génère un nombre prédéfini de travailleurs et n'itère dans la liste d'entrée que s'il existe un travailleur inactif. J'ai également activé le mode "démon" pour les travailleurs st ctrl-c fonctionne comme prévu.
la source
parmap
fonction?(None, None)
comme dernier élément indiquefun
qu'il a atteint la fin de la séquence d'éléments pour chaque processus.Le multitraitement et le décapage sont interrompus et limités à moins que vous ne sautiez hors de la bibliothèque standard.
Si vous utilisez un fork de
multiprocessing
calledpathos.multiprocesssing
, vous pouvez directement utiliser des classes et des méthodes de classe dans lesmap
fonctions de multiprocessing . C'est parce qu'ildill
est utilisé à la place depickle
oucPickle
, etdill
peut sérialiser presque tout en python.pathos.multiprocessing
fournit également une fonction de carte asynchrone ... et il peutmap
fonctionner avec plusieurs arguments (par exemplemap(math.pow, [1,2,3], [4,5,6])
)Voir les discussions: Que peuvent faire le multitraitement et l'aneth ensemble?
et: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization
Il gère même le code que vous avez écrit initialement, sans modification, et à partir de l'interpréteur. Pourquoi faire autre chose de plus fragile et spécifique à un seul cas?
Obtenez le code ici: https://github.com/uqfoundation/pathos
Et, juste pour montrer un peu plus ce qu'il peut faire:
la source
amap
) qui permet l'utilisation d'une barre de progression et d'autres programmes asynchrones.Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos)
multiprocess
laquelle est compatible 2/3. Voir stackoverflow.com/questions/27873093/… et pypi.python.org/pypi/multiprocess .pathos
a eu une nouvelle version stable et est également compatible 2.x et 3.x.Il n'y a actuellement pas de solution à votre problème, à ma connaissance: la fonction que vous donnez
map()
doit être accessible via un import de votre module. C'est pourquoi le code de robert fonctionne: la fonctionf()
peut être obtenue en important le code suivant:J'ai en fait ajouté une section "principale", car elle suit les recommandations pour la plate-forme Windows ("Assurez-vous que le module principal peut être importé en toute sécurité par un nouvel interpréteur Python sans provoquer d'effets secondaires involontaires").
J'ai également ajouté une lettre majuscule devant
Calculate
, afin de suivre PEP 8 . :)la source
La solution de mrule est correcte mais présente un bug: si l'enfant renvoie une grande quantité de données, il peut remplir le tampon du tube, bloquant sur l'enfant
pipe.send()
, pendant que le parent attend que l'enfant quittepipe.join()
. La solution est de lire les données dejoin()
l'enfant avant de l'engager. De plus, l'enfant doit fermer l'extrémité parent du tuyau pour éviter un blocage. Le code ci-dessous corrige cela. Sachez également que celaparmap
crée un processus par élément dansX
. Une solution plus avancée consiste àmultiprocessing.cpu_count()
diviserX
en plusieurs blocs, puis à fusionner les résultats avant de revenir. Je laisse cela comme un exercice au lecteur pour ne pas gâcher la concision de la belle réponse par mrule. ;)la source
OSError: [Errno 24] Too many open files
. Je pense qu'il doit y avoir une sorte de limites sur le nombre de processus pour que cela fonctionne correctement ...J'ai également eu du mal avec ça. J'avais des fonctions en tant que membres de données d'une classe, à titre d'exemple simplifié:
J'avais besoin d'utiliser la fonction self.f dans un appel Pool.map () depuis la même classe et self.f ne prenait pas de tuple comme argument. Puisque cette fonction était intégrée dans une classe, je ne savais pas comment écrire le type de wrapper que d'autres réponses suggéraient.
J'ai résolu ce problème en utilisant un wrapper différent qui prend un tuple / liste, où le premier élément est la fonction, et les éléments restants sont les arguments de cette fonction, appelés eval_func_tuple (f_args). En utilisant cela, la ligne problématique peut être remplacée par return pool.map (eval_func_tuple, itertools.izip (itertools.repeat (self.f), list1, list2)). Voici le code complet:
Fichier: util.py
Fichier: main.py
Lancer main.py donnera [11, 22, 33]. N'hésitez pas à améliorer cela, par exemple eval_func_tuple pourrait également être modifié pour prendre des arguments de mots-clés.
Sur une autre note, dans une autre réponse, la fonction "parmap" peut être rendue plus efficace pour le cas de plus de Processus que de nombre de CPU disponibles. Je copie une version modifiée ci-dessous. C'est mon premier message et je n'étais pas sûr de devoir modifier directement la réponse originale. J'ai également renommé certaines variables.
la source
J'ai pris la réponse de klaus se et aganders3 et j'ai créé un module documenté qui est plus lisible et tient dans un seul fichier. Vous pouvez simplement l'ajouter à votre projet. Il a même une barre de progression en option!
EDIT : Ajout de la suggestion @ alexander-mcfarlane et d'une fonction de test
la source
join()
en même temps et vous obtiendrez juste un flash de100%
terminé à l'tqdm
écran. Le seul moment où cela sera utile, c'est si chaque processeur a une charge de travail biaiséetqdm()
pour boucler la ligne:result = [q_out.get() for _ in tqdm(sent)]
et cela fonctionne beaucoup mieux - un grand effort bien que j'apprécie vraiment cela, alors +1Je sais que cela a été demandé il y a plus de 6 ans maintenant, mais je voulais simplement ajouter ma solution, car certaines des suggestions ci-dessus semblent horriblement compliquées, mais ma solution était en fait très simple.
Tout ce que j'avais à faire était d'envelopper l'appel pool.map () à une fonction d'assistance. Passer l'objet de classe avec les arguments de la méthode sous forme de tuple, qui ressemblait un peu à ceci.
la source
Les fonctions définies dans les classes (même dans les fonctions au sein des classes) ne sont pas vraiment pickle. Cependant, cela fonctionne:
la source
Je sais que cette question a été posée il y a 8 ans et 10 mois mais je souhaite vous présenter ma solution:
Vous avez juste besoin de faire de votre fonction de classe une méthode statique. Mais c'est aussi possible avec une méthode de classe:
Testé en Python 3.7.3
la source
J'ai modifié la méthode de klaus se car alors qu'elle fonctionnait pour moi avec de petites listes, elle se bloquait lorsque le nombre d'éléments était d'environ 1000 ou plus. Au lieu de pousser les travaux un par un avec la
None
condition d'arrêt, je charge la file d'attente d'entrée en une seule fois et je laisse simplement les processus la grignoter jusqu'à ce qu'elle soit vide.Edit: malheureusement maintenant, je rencontre cette erreur sur mon système: la limite de taille maximale de la file d'attente de multitraitement est de 32767 , j'espère que les solutions de contournement aideront.
la source
Vous pouvez exécuter votre code sans aucun problème si vous ignorez manuellement l'
Pool
objet de la liste des objets de la classe car il n'est paspickle
possible comme le dit l'erreur. Vous pouvez le faire avec la__getstate__
fonction (regardez ici aussi) comme suit. L'Pool
objet va essayer de trouver les__getstate__
et les__setstate__
fonctions et les exécuter si elle le juge lors de l' exécutionmap
,map_async
etc:Alors fais:
vous donnera la sortie:
J'ai testé le code ci-dessus en Python 3.x et cela fonctionne.
la source
Je ne sais pas si cette approche a été adoptée, mais une solution que j'utilise est:
La sortie doit être:
la source
Il est possible que vous souhaitiez appliquer cette fonction pour chaque instance différente de la classe. Alors voici la solution pour cela aussi
la source
Voici ma solution, qui je pense est un peu moins hackish que la plupart des autres ici. C'est similaire à la réponse de Nightowl.
la source
De http://www.rueckstiess.net/research/snippets/show/ca1d7d90 et http://qingkaikong.blogspot.com/2016/12/python-parallel-method-in-class.html
Nous pouvons créer une fonction externe et la semer avec l'objet self de classe:
OU sans joblib:
la source
Ce n'est peut-être pas une très bonne solution mais dans mon cas, je le résous comme ça.
J'ai dû passer
self
à ma fonction car je dois accéder aux attributs et aux fonctions de ma classe via cette fonction. Cela fonctionne pour moi. Les corrections et suggestions sont toujours les bienvenues.la source
Voici un passe-partout que j'ai écrit pour utiliser le pool multiprocesseur dans python3, en particulier python3.7.7 a été utilisé pour exécuter les tests. J'ai obtenu mes courses les plus rapides en utilisant
imap_unordered
. Branchez simplement votre scénario et essayez-le. Vous pouvez utilisertimeit
ou simplementtime.time()
pour déterminer ce qui vous convient le mieux.Dans le scénario ci-dessus, il
imap_unordered
semble que ce soit le pire pour moi. Essayez votre cas et comparez-le sur la machine sur laquelle vous prévoyez de l'exécuter. Lisez également sur les pools de processus . À votre santé!la source