Comment puis-je récupérer la valeur de retour d'une fonction passée au multiprocessing.Process?

190

Dans l'exemple de code ci-dessous, j'aimerais récupérer la valeur de retour de la fonction worker. Comment puis-je faire cela? Où cette valeur est-elle stockée?

Exemple de code:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

Production:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

Je n'arrive pas à trouver l'attribut pertinent dans les objets stockés dans jobs.

blz
la source

Réponses:

190

Utilisez une variable partagée pour communiquer. Par exemple comme ceci:

import multiprocessing

def worker(procnum, return_dict):
    '''worker function'''
    print str(procnum) + ' represent!'
    return_dict[procnum] = procnum


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print return_dict.values()
vartec
la source
46
Je recommanderais d'utiliser un multiprocessing.Queue, plutôt qu'un Managerici. L'utilisation de a Managernécessite de créer un processus entièrement nouveau, ce qui est excessif quand un Queueferait l'affaire.
dano
1
@dano: Je me demande, si nous utilisons l'objet Queue (), nous ne pouvons pas être sûr de l'ordre lorsque chaque processus renvoie la valeur. Je veux dire si nous avons besoin de l'ordre dans le résultat, pour faire le travail suivant. Comment pourrions-nous savoir où exactement quelle sortie provient de quel processus
Catbuilts
4
@Catbuilts Vous pouvez renvoyer un tuple de chaque processus, où une valeur est la valeur de retour réelle qui vous tient à cœur et l'autre est un identificateur unique du processus. Mais je me demande aussi pourquoi vous devez savoir quel processus renvoie quelle valeur. Si c'est ce que vous devez vraiment savoir sur le processus, ou avez-vous besoin d'établir une corrélation entre votre liste d'entrées et la liste de sorties? Dans ce cas, je recommanderais d'utiliser multiprocessing.Pool.mappour traiter votre liste d'éléments de travail.
dano
5
mises en garde pour les fonctions avec un seul argument : devrait utiliser args=(my_function_argument, ). Notez la ,virgule ici! Ou bien Python se plaindra "d'arguments positionnels manquants". Il m'a fallu 10 minutes pour comprendre. Vérifiez également l' utilisation manuelle (dans la section "classe de processus").
yuqli
2
@vartec un inconvénient de l'utilisation d'un dictionnaire multipriocessing.Manager () est qu'il pickles (sérialise) l'objet qu'il retourne, donc il a un goulot d'étranglement donné par la bibliothèque pickle d'une taille maximale de 2GiB pour l'objet à renvoyer. Existe-t-il un autre moyen de faire cela en évitant la sérialisation de l'objet renvoyé?
hirschme
68

Je pense que l'approche suggérée par @sega_sai est la meilleure. Mais il a vraiment besoin d'un exemple de code, alors voici:

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

Ce qui imprimera les valeurs de retour:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

Si vous connaissez map(le Python 2 intégré), cela ne devrait pas être trop difficile. Sinon, jetez un œil au lien de sega_Sai .

Notez combien peu de code est nécessaire. (Notez également comment les processus sont réutilisés).

marque
la source
1
Des idées pourquoi mon getpid()retour a-t-il la même valeur? J'utilise Python3
zelusp
Je ne sais pas comment Pool répartit les tâches entre les travailleurs. Peut-être qu'ils peuvent tous finir chez le même travailleur s'ils sont vraiment rapides? Cela arrive-t-il systématiquement? Aussi si vous ajoutez un délai?
Mark
Je pensais aussi que c'était une question de vitesse, mais lorsque j'alimente pool.mapune gamme de 1 000 000 en utilisant plus de 10 processus, je vois au plus deux pids différents.
zelusp
1
Alors je ne suis pas sûr. Je pense qu'il serait intéressant d'ouvrir une question distincte à ce sujet.
Marquez
Si les choses que vous voulez envoyer une fonction différente à chaque processus, utilisez pool.apply_async: docs.python.org/3/library
Kyle
24

Cet exemple montre comment utiliser une liste d' instances multiprocessing.Pipe pour renvoyer des chaînes à partir d'un nombre arbitraire de processus:

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

Production:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

Cette solution utilise moins de ressources qu'une multiprocessing.Queue qui utilisations

  • un tuyau
  • au moins une serrure
  • un tampon
  • un fil

ou un multiprocessing.SimpleQueue qui utilise

  • un tuyau
  • au moins une serrure

Il est très instructif d'examiner la source de chacun de ces types.

David Cullen
la source
Quelle serait la meilleure façon de faire cela sans faire des tuyaux une variable globale?
Nickpick
J'ai mis toutes les données globales et le code dans une fonction principale et cela fonctionne de la même manière. Est-ce que ça répond à votre question?
David Cullen
le tube doit-il toujours être lu avant qu'une nouvelle valeur puisse y être ajoutée (envoyée)?
Nickpick
+1, bonne réponse. Mais la solution étant plus efficace, le compromis est que vous en faites une Pipepar processus contre une Queuepour tous les processus. Je ne sais pas si cela finit par être plus efficace dans tous les cas.
sudo
2
Cette réponse provoque un blocage si l'objet renvoyé est volumineux. Au lieu de faire d'abord proc.join (), j'essaierais d'abord de recv () la valeur de retour, puis de faire la jointure.
L. Pes le
22

Pour une raison quelconque, je n'ai pas trouvé d'exemple général sur la façon de faire cela Queuen'importe où (même les exemples de doc de Python ne génèrent pas plusieurs processus), alors voici ce que j'ai travaillé après 10 essais:

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queueest une file d'attente bloquante et sécurisée pour les threads que vous pouvez utiliser pour stocker les valeurs de retour des processus enfants. Vous devez donc transmettre la file d'attente à chaque processus. Quelque chose de moins évident ici est que vous devez sortir get()de la file d'attente avant de joinl' Processes ou bien la file d'attente se remplit et bloque tout.

Mise à jour pour ceux qui sont orientés objet (testé en Python 3.4):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)
sudo
la source
18

Pour toute autre personne qui cherche comment obtenir une valeur d'une Processutilisation Queue:

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    print queue.get()  # Prints {"foo": True}
    p.join()
Matthew Moisen
la source
1
lorsque je mets quelque chose dans une file d'attente dans mon processus de travail, ma jointure n'est jamais atteinte. Une idée de comment cela pourrait arriver?
Laurens Koppenol
@LaurensKoppenol voulez-vous dire que votre code principal se bloque à p.join () de façon permanente et ne continue jamais? Votre processus a-t-il une boucle infinie?
Matthew Moisen
4
Oui, il y accroche indéfiniment. Mes ouvriers finissent tous (la boucle dans la fonction ouvrière se termine, la déclaration d'impression est ensuite imprimée, pour tous les ouvriers). La jointure ne fait rien. Si je supprime le Queuede ma fonction, il me laisse passer lejoin()
Laurens Koppenol
@LaurensKoppenol N'appelez-vous peut-être pas queue.put(ret)avant d'appeler p.start()? Dans ce cas, le thread de travail sera suspendu pour queue.get()toujours. Vous pouvez reproduire cela en copiant mon extrait ci-dessus tout en commentant queue.put(ret).
Matthew Moisen
J'ai édité cette réponse, le queue.get()doit arriver avant le p.join(). Cela fonctionne maintenant pour moi.
jfunk
10

Vous pouvez utiliser la fonction exitintégrée pour définir le code de sortie d'un processus. Il peut être obtenu à partir de l' exitcodeattribut du processus:

import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

Production:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
David Cullen
la source
4
Soyez averti que cette approche pourrait devenir déroutante. Les processus doivent généralement se terminer avec le code de sortie 0 s'ils se terminent sans erreur. Si vous avez quelque chose qui surveille vos codes de sortie de processus système, vous pouvez les voir signalés comme des erreurs.
ferrouswheel
1
Parfait si vous souhaitez simplement déclencher une exception dans le processus parent en cas d'erreur.
crizCraig
6

Le paquet Pebble a une belle exploitation d'abstraction multiprocessing.Pipequi rend cela assez simple:

from pebble import concurrent

@concurrent.process
def function(arg, kwarg=0):
    return arg + kwarg

future = function(1, kwarg=1)

print(future.result())

Exemple tiré de: https://pythonhosted.org/Pebble/#concurrent-decorators

Erikreed
la source
3

Je pensais que je simplifierais les exemples les plus simples copiés ci-dessus, en travaillant pour moi sur Py3.6. Le plus simple est multiprocessing.Pool:

import multiprocessing
import time

def worker(x):
    time.sleep(1)
    return x

pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))

Vous pouvez définir le nombre de processus dans la piscine avec, par exemple, Pool(processes=5). Cependant, la valeur par défaut est le nombre de processeurs, alors laissez ce champ vide pour les tâches liées au processeur. (Les tâches liées aux E / S conviennent souvent aux threads de toute façon, car les threads attendent la plupart du temps et peuvent partager un cœur de processeur.) AppliquePool également l' optimisation de la segmentation .

(Notez que la méthode de travail ne peut pas être imbriquée dans une méthode. J'ai initialement défini ma méthode de travail à l'intérieur de la méthode qui fait l'appel pool.map, pour qu'elle reste entièrement autonome, mais les processus n'ont pas pu l'importer et j'ai jeté "AttributeError : Impossible de sélectionner l'objet local external_method..inner_method ". Plus d'informations ici . Cela peut être à l'intérieur d'une classe.)

(J'apprécie l'impression de la question d'origine 'represent!'plutôt que time.sleep(), mais sans elle, je pensais que du code s'exécutait simultanément alors qu'il ne l'était pas.)


Py3 ProcessPoolExecutorest également composé de deux lignes ( .maprenvoie un générateur donc vous avez besoin du list()):

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
    print(list(executor.map(worker, range(10))))

Avec plaine Process:

import multiprocessing
import time

def worker(x, queue):
    time.sleep(1)
    queue.put(x)

queue = multiprocessing.SimpleQueue()
tasks = range(10)

for task in tasks:
    multiprocessing.Process(target=worker, args=(task, queue,)).start()

for _ in tasks:
    print(queue.get())

À utiliser SimpleQueuesi tout ce dont vous avez besoin est putet get. La première boucle démarre tous les processus, avant que la seconde effectue les queue.getappels de blocage . Je ne pense pas qu'il y ait de raison d'appeler p.join()aussi.

Chris
la source
2

Une solution simple:

import multiprocessing

output=[]
data = range(0,10)

def f(x):
    return x**2

def handler():
    p = multiprocessing.Pool(64)
    r=p.map(f, data)
    return r

if __name__ == '__main__':
    output.append(handler())

print(output[0])

Production:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Rubens_Zimbres
la source
2

Si vous utilisez Python 3, vous pouvez utiliser concurrent.futures.ProcessPoolExecutorcomme abstraction pratique:

from concurrent.futures import ProcessPoolExecutor

def worker(procnum):
    '''worker function'''
    print(str(procnum) + ' represent!')
    return procnum


if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        print(list(executor.map(worker, range(5))))

Production:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
Aleph Aleph
la source
0

J'ai modifié un peu la réponse de vartec car j'avais besoin d'obtenir les codes d'erreur de la fonction. (Merci vertec !!! c'est un truc génial)

Cela peut également être fait avec un manager.listmais je pense qu'il est préférable de l'avoir dans un dict et d'y stocker une liste. De cette façon, nous conservons la fonction et les résultats car nous ne pouvons pas être sûrs de l'ordre dans lequel la liste sera remplie.

from multiprocessing import Process
import time
import datetime
import multiprocessing


def func1(fn, m_list):
    print 'func1: starting'
    time.sleep(1)
    m_list[fn] = "this is the first function"
    print 'func1: finishing'
    # return "func1"  # no need for return since Multiprocess doesnt return it =(

def func2(fn, m_list):
    print 'func2: starting'
    time.sleep(3)
    m_list[fn] = "this is function 2"
    print 'func2: finishing'
    # return "func2"

def func3(fn, m_list):
    print 'func3: starting'
    time.sleep(9)
    # if fail wont join the rest because it never populate the dict
    # or do a try/except to get something in return.
    raise ValueError("failed here")
    # if we want to get the error in the manager dict we can catch the error
    try:
        raise ValueError("failed here")
        m_list[fn] = "this is third"
    except:
        m_list[fn] = "this is third and it fail horrible"
        # print 'func3: finishing'
        # return "func3"


def runInParallel(*fns):  # * is to accept any input in list
    start_time = datetime.datetime.now()
    proc = []
    manager = multiprocessing.Manager()
    m_list = manager.dict()
    for fn in fns:
        # print fn
        # print dir(fn)
        p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()  # 5 is the time out

    print datetime.datetime.now() - start_time
    return m_list, proc

if __name__ == '__main__':
    manager, proc = runInParallel(func1, func2, func3)
    # print dir(proc[0])
    # print proc[0]._name
    # print proc[0].name
    # print proc[0].exitcode

    # here you can check what did fail
    for i in proc:
        print i.name, i.exitcode  # name was set up in the Process line 53

    # here will only show the function that worked and where able to populate the 
    # manager dict
    for i, j in manager.items():
        print dir(i)  # things you can do to the function
        print i, j
pelos
la source