La documentation du multiprocessing
module montre comment transmettre une file d'attente à un processus démarré avec multiprocessing.Process
. Mais comment puis-je partager une file d'attente avec des processus de travail asynchrones démarrés avec apply_async
? Je n'ai pas besoin de jointure dynamique ou de quoi que ce soit d'autre, juste un moyen pour les travailleurs de rapporter (à plusieurs reprises) leurs résultats à la base.
import multiprocessing
def worker(name, que):
que.put("%d is done" % name)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
q = multiprocessing.Queue()
workers = pool.apply_async(worker, (33, q))
Cela échoue avec:
RuntimeError: Queue objects should only be shared between processes through inheritance
. Je comprends ce que cela signifie, et je comprends les conseils d'hériter plutôt que d'exiger le décapage / décapage (et toutes les restrictions Windows spéciales). Mais comment puis - je passer la file d'attente d'une manière qui fonctionne? Je ne trouve pas d'exemple et j'ai essayé plusieurs alternatives qui ont échoué de différentes manières. Aidez-moi, s'il vous plaît?
queue.Queue()
ne convient pas à cela?queue.Queue
été conçu pour le threading, en utilisant des verrous en mémoire. Dans un environnement multiprocessus, chaque sous-processus obtiendrait sa propre copie d'unequeue.Queue()
instance dans son propre espace mémoire, car les sous-processus ne partagent pas la mémoire (principalement).multiprocessing.Pool
a déjà une file d'attente de résultats partagée, il n'est pas nécessaire d'impliquer en plus un fichierManager.Queue
.Manager.Queue
est unequeue.Queue
(file d'attente multithreading) sous le capot, située sur un processus serveur séparé et exposée via des proxys. Cela ajoute une surcharge supplémentaire par rapport à la file d'attente interne du pool. Contrairement au fait de s'appuyer sur la gestion native des résultats de Pool, les résultats dans leManager.Queue
ne sont pas non plus garantis d'être commandés.Les processus de travail ne sont pas démarrés avec
.apply_async()
, cela se produit déjà lorsque vous instanciezPool
. Ce qui est commencé lorsque vous appelezpool.apply_async()
est un nouveau « job ». Les processus de travail de Pool exécutent lamultiprocessing.pool.worker
fonction sous le capot. Cette fonction prend en charge le traitement des nouvelles «tâches» transférées via le pool internePool._inqueue
et le renvoi des résultats au parent via lePool._outqueue
. Votre spécifiéfunc
sera exécuté dansmultiprocessing.pool.worker
.func
n'a qu'àreturn
quelque chose et le résultat sera automatiquement renvoyé au parent..apply_async()
immédiatement (de manière asynchrone) renvoie unAsyncResult
objet (alias pourApplyResult
). Vous devez appeler.get()
(bloque) sur cet objet pour recevoir le résultat réel. Une autre option serait d'enregistrer une fonction de rappel , qui est déclenchée dès que le résultat est prêt.Exemple de sortie:
Remarque: la spécification du
timeout
paramètre-pour.get()
n'arrêtera pas le traitement réel de la tâche dans le worker, elle débloque uniquement le parent en attente en levant unmultiprocessing.TimeoutError
.la source
error_callback
paramètre-pourapply_async
, donc cela n'a pas beaucoup changé depuis.