Python Process Pool non démoniaque?

96

Serait-il possible de créer un pool python qui ne soit pas démoniaque? Je veux qu'un pool puisse appeler une fonction qui a un autre pool à l'intérieur.

Je veux cela parce que les processus de démon ne peuvent pas créer de processus. Plus précisément, cela provoquera l'erreur:

AssertionError: daemonic processes are not allowed to have children

Par exemple, considérons le scénario où function_aa un pool qui s'exécute function_bqui a un pool qui s'exécute function_c. Cette chaîne de fonctions échouera, car elle function_best exécutée dans un processus démon et les processus démons ne peuvent pas créer de processus.

Max
la source
AFAIK, non ce n'est pas possible que tous les ouvriers du pool soient démonisés et il n'est pas possible d' injecter la dépendance , BTW je ne comprends pas la deuxième partie de votre question I want a pool to be able to call a function that has another pool insideet comment cela interfère avec le fait que les ouvriers sont démonisés.
mouad
4
Parce que si la fonction a a un pool qui exécute la fonction b qui a un pool qui exécute la fonction c, il y a un problème dans b qui est en cours d'exécution dans un processus démon, et les processus démons ne peuvent pas créer de processus. AssertionError: daemonic processes are not allowed to have children
Max

Réponses:

118

La multiprocessing.pool.Poolclasse crée les processus de travail dans sa __init__méthode, les rend démoniaques et les démarre, et il n'est pas possible de redéfinir leur daemonattribut Falseavant qu'ils ne soient démarrés (et par la suite, ce n'est plus autorisé). Mais vous pouvez créer votre propre sous-classe de multiprocesing.pool.Pool( multiprocessing.Poolc'est juste une fonction wrapper) et substituer votre propre multiprocessing.Processsous-classe, qui est toujours non démoniaque, à utiliser pour les processus de travail.

Voici un exemple complet de la façon de procéder. Les parties importantes sont les deux classes NoDaemonProcesset MyPoolen haut et pour appeler pool.close()et pool.join()sur votre MyPoolinstance à la fin.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

if __name__ == '__main__':
    test()
Chris Arndt
la source
1
Je viens de tester à nouveau mon code avec Python 2.7 / 3.2 (après avoir corrigé les lignes "print") sur Linux et Python 2.6 / 2.7 / 3.2 OS X. Linux et Python 2.7 / 3.2 sur OS X fonctionnent bien mais le code se bloque en effet avec Python 2.6 sur OS X (Lion). Cela semble être un bogue dans le module multitraitement, qui a été corrigé, mais je n'ai pas vérifié le suivi des bogues.
Chris Arndt
1
Merci! Sur Windows, vous devez également appelermultiprocessing.freeze_support()
frmdstryr
2
Bon travail. Si quelqu'un a une fuite de mémoire avec cela, essayez d'utiliser "with closing (MyPool (processes = num_cpu))) comme pool:" pour disposer du pool correctement
Chris Lucian
31
Quels sont les inconvénients de l'utilisation MyPoolau lieu de la valeur par défaut Pool? En d'autres termes, en échange de la flexibilité du démarrage des processus enfants, quels coûts dois-je payer? (S'il n'y avait pas de coûts, la norme Poolaurait probablement utilisé des processus non démoniaques).
max
4
@machen Oui, malheureusement c'est vrai. Dans Python 3.6, la Poolclasse a été largement refactorisée, ce Processn'est donc plus un simple attribut, mais une méthode, qui renvoie l'instance de processus qu'elle obtient à partir d'un contexte . J'ai essayé d'écraser cette méthode pour renvoyer une NoDaemonPoolinstance, mais cela entraîne une exception AssertionError: daemonic processes are not allowed to have childrenlorsque le pool est utilisé.
Chris Arndt
26

J'ai eu la nécessité d'employer un pool non démoniaque dans Python 3.7 et j'ai fini par adapter le code affiché dans la réponse acceptée. Ci-dessous, il y a l'extrait qui crée le pool non démoniaque:

class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, value):
        pass


class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(MyPool, self).__init__(*args, **kwargs)

Comme la mise en œuvre actuelle de multiprocessing a été largement remaniée pour être basée sur les contextes, nous devons fournir unNoDaemonContext classe qui a notre NoDaemonProcessattribut as. MyPoolutilisera alors ce contexte au lieu de celui par défaut.

Cela dit, je dois avertir qu'il y a au moins 2 mises en garde à cette approche:

  1. Cela dépend toujours des détails de mise en œuvre du multiprocessing package et peut donc être interrompu à tout moment.
  2. Il y a des raisons valables qui ont multiprocessingrendu si difficile l'utilisation de processus non démoniaques, dont beaucoup sont expliqués ici . Le plus convaincant à mon avis est:

    Quant à permettre aux threads enfants de générer leurs propres enfants en utilisant un sous-processus, il y a le risque de créer une petite armée de «petits-enfants» zombies si les threads parents ou enfants se terminent avant que le sous-processus ne se termine et ne retourne.

Massimiliano
la source
En ce qui concerne la mise en garde: mon cas d'utilisation est la mise en parallèle des tâches, mais les petits-enfants renvoient des informations à leurs parents qui à leur tour retournent des informations à leurs parents après avoir effectué un traitement local requis. Par conséquent, chaque niveau / branche a une attente explicite pour toutes ses feuilles. La mise en garde s'applique-t-elle toujours si vous devez explicitement attendre la fin des processus générés?
A_A
Obtenir l'erreur AttributeError: module 'multiprocessing' has no attribute 'pool'dans Python 3.8.0
Nyxynyx le
@Nyxynyx N'oubliez pasimport multiprocessing.pool
Chris Arndt
22

Le module multitraitement a une interface agréable pour utiliser des pools avec des processus ou des threads. En fonction de votre cas d'utilisation actuel, vous pouvez envisager d'utiliser multiprocessing.pool.ThreadPoolpour votre pool externe, ce qui entraînera des threads (qui permettent de générer des processus de l'intérieur) par opposition à des processus.

Cela pourrait être limité par le GIL, mais dans mon cas particulier (j'ai testé les deux) , le temps de démarrage des processus externes Pooltels que créés ici l' emportait de loin sur la solution ThreadPool.


C'est vraiment facile d'échanger Processescontre Threads. En savoir plus sur l'utilisation d'une ThreadPoolsolution ici ou ici .

timmwagener
la source
Merci - cela m'a beaucoup aidé - grande utilisation du threading ici (pour générer des processus qui fonctionnent vraiment bien)
trance_dude
1
Pour les personnes à la recherche d'une solution pratique qui s'applique probablement à leur situation, c'est celle-ci.
abanana
6

Sur certaines versions Python remplaçant la norme piscine à la coutume peut augmenter l' erreur: AssertionError: group argument must be None for now.

Ici, j'ai trouvé une solution qui peut aider:

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, val):
        pass


class NoDaemonProcessPool(multiprocessing.pool.Pool):

    def Process(self, *args, **kwds):
        proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
        proc.__class__ = NoDaemonProcess

        return proc
Atterratio
la source
4

concurrent.futures.ProcessPoolExecutorn'a pas cette limitation. Il peut avoir un pool de processus imbriqué sans aucun problème:

from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time

def pid():
    return current_process().pid

def _square(i):  # Runs in inner_pool
    square = i ** 2
    time.sleep(i / 10)
    print(f'{pid()=} {i=} {square=}')
    return square

def _sum_squares(i, j):  # Runs in outer_pool
    with Pool(max_workers=2) as inner_pool:
        squares = inner_pool.map(_square, (i, j))
    sum_squares = sum(squares)
    time.sleep(sum_squares ** .5)
    print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
    return sum_squares

def main():
    with Pool(max_workers=3) as outer_pool:
        for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
            print(f'{pid()=} {sum_squares=}')

if __name__ == "__main__":
    main()

Le code de démonstration ci-dessus a été testé avec Python 3.8.

Crédit: réponse de jfs

Acumenus
la source
1
C'est désormais clairement la meilleure solution, car elle nécessite des changements minimes.
DreamFlasher
1
fonctionne parfaitement! ... comme note d'accompagnement en utilisant un enfant - à l' multiprocessing.Poolintérieur d'un ProcessPoolExecutor.Poolest également possible!
raphael le
3

Le problème que j'ai rencontré était en essayant d'importer des globaux entre les modules, provoquant l'évaluation de la ligne ProcessPool () plusieurs fois.

globals.py

from processing             import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading       import ThreadPool

class SingletonMeta(type):
    def __new__(cls, name, bases, dict):
        dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
        return super(SingletonMeta, cls).__new__(cls, name, bases, dict)

    def __init__(cls, name, bases, dict):
        super(SingletonMeta, cls).__init__(name, bases, dict)
        cls.instance = None

    def __call__(cls,*args,**kw):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
        return cls.instance

    def __deepcopy__(self, item):
        return item.__class__.instance

class Globals(object):
    __metaclass__ = SingletonMeta
    """     
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children

    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug    
    """
    def __init__(self):
        print "%s::__init__()" % (self.__class__.__name__)
        self.shared_manager      = Manager()
        self.shared_process_pool = ProcessPool()
        self.shared_thread_pool  = ThreadPool()
        self.shared_lock         = Lock()        # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin

Puis importez en toute sécurité depuis ailleurs dans votre code

from globals import Globals
Globals().shared_manager      
Globals().shared_process_pool
Globals().shared_thread_pool  
Globals().shared_lock         
James McGuigan
la source
2

J'ai vu des gens résoudre ce problème en utilisant celeryle fork de multiprocessingappelé billiard (extensions de pool multiprocesseur), qui permet aux processus démoniaques de générer des enfants. La solution consiste simplement à remplacer le multiprocessingmodule par:

import billiard as multiprocessing
Tomasz Bartkowiak
la source