Comment obtenir la valeur de retour d'un thread en python?

342

La fonction fooci-dessous renvoie une chaîne 'foo'. Comment puis-je obtenir la valeur 'foo'renvoyée par la cible du thread?

from threading import Thread

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'

thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()

La "façon évidente de le faire", illustrée ci-dessus, ne fonctionne pas: thread.join()retournée None.

wim
la source

Réponses:

39

Dans Python 3.2+, le concurrent.futuresmodule stdlib fournit une API de niveau supérieur pour threading, y compris la transmission des valeurs de retour ou des exceptions d'un thread de travail au thread principal:

import concurrent.futures

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(foo, 'world!')
    return_value = future.result()
    print(return_value)
Ramarao Amara
la source
1
Pour ceux qui se demandent, cela peut être fait avec une liste de threads. futures = [executor.submit(foo, param) for param in param_list]La commande sera maintenue et la sortie du withpermettra la collecte des résultats. [f.result() for f in futures]
jayreed1
273

FWIW, le multiprocessingmodule a une belle interface pour cela en utilisant la Poolclasse. Et si vous voulez vous en tenir aux threads plutôt qu'aux processus, vous pouvez simplement utiliser la multiprocessing.pool.ThreadPoolclasse comme remplacement de remplacement.

def foo(bar, baz):
  print 'hello {0}'.format(bar)
  return 'foo' + baz

from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)

async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo

# do some other stuff in the main process

return_val = async_result.get()  # get the return value from your function.
Jake Biesinger
la source
50
@JakeBiesinger Mon point est que je cherchais une réponse, comment obtenir une réponse de Thread, suis venu ici et que la réponse acceptée ne répond pas à la question posée. Je différencie les threads et les processus. Je connais Global Interpreter Lock mais je travaille sur un problème lié aux E / S, donc les threads sont ok, je n'ai pas besoin de processus. D'autres réponses ici mieux répondre à la question indiquée.
omikron
7
@omikron Mais les threads en python ne renvoient pas de réponse à moins que vous n'utilisiez une sous-classe qui active cette fonctionnalité. Parmi les sous-classes possibles, ThreadPools est un excellent choix (choisissez le nombre de threads, utilisez map / apply w / sync / async). Malgré leur importation multiprocess, ils n'ont rien à voir avec les processus.
Jake Biesinger
4
@JakeBiesinger Oh, je suis aveugle. Désolé pour mes commentaires inutiles. Vous avez raison. Je viens de supposer que multiprocessing = processus.
omikron
12
N'oubliez pas d'en définir processes=1plusieurs si vous avez plus de threads!
iman
4
Le problème avec le multitraitement et le pool de threads est qu'il est beaucoup plus lent à configurer et à démarrer les threads par rapport à la bibliothèque de threads de base. C'est génial pour démarrer des threads longs mais défaire le but lorsque vous avez besoin de démarrer un grand nombre de threads courts. La solution d'utiliser "threading" et "Queue" documentée dans d'autres réponses ici est à mon avis une meilleure alternative pour ce dernier cas d'utilisation.
Yves Dorfsman
243

Une façon que j'ai vu est de passer un objet mutable, comme une liste ou un dictionnaire, au constructeur du thread, avec un index ou un autre identifiant quelconque. Le thread peut ensuite stocker ses résultats dans son emplacement dédié dans cet objet. Par exemple:

def foo(bar, result, index):
    print 'hello {0}'.format(bar)
    result[index] = "foo"

from threading import Thread

threads = [None] * 10
results = [None] * 10

for i in range(len(threads)):
    threads[i] = Thread(target=foo, args=('world!', results, i))
    threads[i].start()

# do some other stuff

for i in range(len(threads)):
    threads[i].join()

print " ".join(results)  # what sound does a metasyntactic locomotive make?

Si vous voulez vraiment join()renvoyer la valeur de retour de la fonction appelée, vous pouvez le faire avec une Threadsous - classe comme celle-ci:

from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs, Verbose)
        self._return = None
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args,
                                                **self._Thread__kwargs)
    def join(self):
        Thread.join(self)
        return self._return

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print twrv.join()   # prints foo

Cela devient un peu poilu à cause de certains changements de nom, et il accède à des structures de données "privées" qui sont spécifiques à la Threadmise en œuvre ... mais cela fonctionne.

Pour python3

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs)
        self._return = None
    def run(self):
        print(type(self._target))
        if self._target is not None:
            self._return = self._target(*self._args,
                                                **self._kwargs)
    def join(self, *args):
        Thread.join(self, *args)
        return self._return
gentil
la source
37
cool, merci pour l'exemple! Je me demande pourquoi Thread n'a pas été implémenté avec la gestion d'une valeur de retour en premier lieu, cela semble être une chose assez évidente à supporter.
wim
16
Je pense que cela devrait être la réponse acceptée - l'OP a demandé threading, pas une bibliothèque différente à essayer, plus la limitation de la taille du pool introduit un problème potentiel supplémentaire, qui s'est produit dans mon cas.
domoarigato
10
Grande blague de train.
meawoppl
7
Sur python3, cela revient TypeError: __init__() takes from 1 to 6 positional arguments but 7 were given. Une façon de résoudre ce problème?
GuySoft
2
Avertissement à toute personne tentée de faire la seconde de ces (la _Thread__targetchose). Vous ferez en sorte que quiconque essaie de porter votre code sur python 3 vous déteste jusqu'à ce qu'il comprenne ce que vous avez fait (en raison de l'utilisation de fonctionnalités non documentées qui ont changé entre 2 et 3). Documentez bien votre code.
Ben Taylor
84

La réponse de Jake est bonne, mais si vous ne voulez pas utiliser un pool de threads (vous ne savez pas combien de threads vous aurez besoin, mais créez-les au besoin), alors un bon moyen de transmettre des informations entre les threads est la fonction intégrée Classe Queue.Queue , car elle offre la sécurité des threads.

J'ai créé le décorateur suivant pour le faire agir de manière similaire au threadpool:

def threaded(f, daemon=False):
    import Queue

    def wrapped_f(q, *args, **kwargs):
        '''this function calls the decorated function and puts the 
        result in a queue'''
        ret = f(*args, **kwargs)
        q.put(ret)

    def wrap(*args, **kwargs):
        '''this is the function returned from the decorator. It fires off
        wrapped_f in a new thread and returns the thread object with
        the result queue attached'''

        q = Queue.Queue()

        t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
        t.daemon = daemon
        t.start()
        t.result_queue = q        
        return t

    return wrap

Ensuite, vous l'utilisez simplement comme:

@threaded
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Thread object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result_queue.get()
print result

La fonction décorée crée un nouveau thread à chaque appel et renvoie un objet Thread qui contient la file d'attente qui recevra le résultat.

METTRE À JOUR

Cela fait un bon moment que j'ai posté cette réponse, mais elle obtient toujours des vues, alors j'ai pensé que je la mettrais à jour pour refléter la façon dont je fais cela dans les nouvelles versions de Python:

Python 3.2 ajouté dans le concurrent.futuresmodule qui fournit une interface de haut niveau pour les tâches parallèles. Il fournit ThreadPoolExecutoret ProcessPoolExecutorvous pouvez donc utiliser un thread ou un pool de processus avec la même API.

Un avantage de cette API est que la soumission d'une tâche à un Executorretourne un Futureobjet, qui se terminera avec la valeur de retour de l'appelable que vous soumettez.

Cela rend queueinutile d' attacher un objet, ce qui simplifie un peu le décorateur:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)

    return wrap

Cela utilisera un exécuteur de pool de threads de module par défaut s'il n'est pas transmis.

L'utilisation est très similaire à celle d'avant:

@threadpool
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Future object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result()
print result

Si vous utilisez Python 3.4+, une fonctionnalité vraiment intéressante de l'utilisation de cette méthode (et des objets Future en général) est que le futur retourné peut être encapsulé pour le transformer en un asyncio.Futureavec asyncio.wrap_future. Cela le fait fonctionner facilement avec les coroutines:

result = await asyncio.wrap_future(long_task(10))

Si vous n'avez pas besoin d'accéder à l' concurrent.Futureobjet sous-jacent , vous pouvez inclure l'habillage dans le décorateur:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))

    return wrap

Ensuite, chaque fois que vous devez pousser le code intensif ou bloquant du processeur hors du thread de la boucle d'événements, vous pouvez le mettre dans une fonction décorée:

@threadpool
def some_long_calculation():
    ...

# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()
bj0
la source
Je n'arrive pas à faire fonctionner cela; Je reçois une erreur indiquant que AttributeError: 'module' object has no attribute 'Lock'cela semble émaner de la ligne y = long_task(10)... des pensées?
sadmicrowave
1
Le code n'utilise pas explicitement Lock, donc le problème pourrait être ailleurs dans votre code. Vous voudrez peut-être publier une nouvelle question SO à ce sujet
bj0
Pourquoi result_queue est-il un attribut d'instance? Serait-il préférable que ce soit un attribut de classe afin que les utilisateurs n'aient pas besoin d'appeler result_queue lors de l'utilisation de @threaded qui n'est pas explicite et ambigu?
nonbot
@ t88, vous ne savez pas ce que vous voulez dire, vous avez besoin d'un moyen d'accéder au résultat, ce qui signifie que vous devez savoir quoi appeler. Si vous voulez que ce soit autre chose, vous pouvez sous-classer Thread et faire ce que vous voulez (c'était une solution simple). La raison pour laquelle la file d'attente doit être attachée au thread est pour que plusieurs appels / fonctions aient leurs propres files d'attente
bj0
1
C'est génial! Merci beaucoup.
Ganesh Kathiresan,
53

Une autre solution qui ne nécessite pas de changer votre code existant:

import Queue
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return 'foo'

que = Queue.Queue()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result

Il peut également être facilement adapté à un environnement multi-thread:

import Queue
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return 'foo'

que = Queue.Queue()
threads_list = list()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)

# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...

# Join all the threads
for t in threads_list:
    t.join()

# Check thread's return value
while not que.empty():
    result = que.get()
    print result
Arik
la source
t = Thread (target = lambda q, arg1: q.put (foo (arg1)), args = (que, 'world!')) ce que q.put fait ici, que fait Queue.Queue ()
vijay shanker
6
Il devrait y avoir une statue de vous dans votre ville natale, merci!
Onilol
3
@Onilol - Merci beaucoup. Votre commentaire est exactement la raison pour laquelle je fais cela :)
Arik
4
Pour Python3, vous devez passer à from queue import Queue.
Gino Mempin
1
Cela semble être la méthode la moins perturbatrice (pas besoin de restructurer radicalement la base de code d'origine) pour permettre à la valeur de retour de revenir au thread principal.
Fanchen Bao
24

Parris / kindall's answer join / returnanswer ported to Python 3:

from threading import Thread

def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)

        self._return = None

    def run(self):
        if self._target is not None:
            self._return = self._target(*self._args, **self._kwargs)

    def join(self):
        Thread.join(self)
        return self._return


twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print(twrv.join())   # prints foo

Remarque, la Threadclasse est implémentée différemment dans Python 3.

GuySoft
la source
1
join prend un paramètre de délai d'attente qui doit être transmis
cz
22

J'ai volé la réponse de Kindall et l'ai nettoyée un peu.

La partie clé est d'ajouter * args et ** kwargs à join () afin de gérer le délai d'attente

class threadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super(threadWithReturn, self).__init__(*args, **kwargs)

        self._return = None

    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)

    def join(self, *args, **kwargs):
        super(threadWithReturn, self).join(*args, **kwargs)

        return self._return

RÉPONSE MISE À JOUR CI-DESSOUS

C'est ma réponse la plus votée, j'ai donc décidé de mettre à jour avec du code qui fonctionnera à la fois sur py2 et py3.

De plus, je vois de nombreuses réponses à cette question qui montrent un manque de compréhension concernant Thread.join (). Certains échouent complètement à gérer l' timeoutargument. Mais il y a aussi un cas d'angle que vous devez connaître en ce qui concerne les cas où vous avez (1) une fonction cible qui peut retourner Noneet (2) vous passez également l' timeoutargument à join (). Veuillez voir "TEST 4" pour comprendre ce boîtier d'angle.

Classe ThreadWithReturn qui fonctionne avec py2 et py3:

import sys
from threading import Thread
from builtins import super    # https://stackoverflow.com/a/30159479

if sys.version_info >= (3, 0):
    _thread_target_key = '_target'
    _thread_args_key = '_args'
    _thread_kwargs_key = '_kwargs'
else:
    _thread_target_key = '_Thread__target'
    _thread_args_key = '_Thread__args'
    _thread_kwargs_key = '_Thread__kwargs'

class ThreadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._return = None

    def run(self):
        target = getattr(self, _thread_target_key)
        if not target is None:
            self._return = target(
                *getattr(self, _thread_args_key),
                **getattr(self, _thread_kwargs_key)
            )

    def join(self, *args, **kwargs):
        super().join(*args, **kwargs)
        return self._return

Quelques exemples de tests sont présentés ci-dessous:

import time, random

# TEST TARGET FUNCTION
def giveMe(arg, seconds=None):
    if not seconds is None:
        time.sleep(seconds)
    return arg

# TEST 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')

# TEST 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)

# TEST 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # because join() timed out before giveMe() finished

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

Pouvez-vous identifier le cas d'angle que nous pourrions éventuellement rencontrer avec TEST 4?

Le problème est que nous nous attendons à ce que giveMe () retourne None (voir TEST 2), mais nous nous attendons également à ce que join () retourne None s'il expire.

returned is None signifie soit:

(1) c'est ce que giveMe () a renvoyé, ou

(2) rejoindre () expiré

Cet exemple est trivial car nous savons que giveMe () retournera toujours None. Mais dans le cas réel (où la cible peut légitimement renvoyer None ou autre chose), nous voudrions vérifier explicitement ce qui s'est passé.

Voici comment résoudre ce cas d'angle:

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

if my_thread.isAlive():
    # returned is None because join() timed out
    # this also means that giveMe() is still running in the background
    pass
    # handle this based on your app's logic
else:
    # join() is finished, and so is giveMe()
    # BUT we could also be in a race condition, so we need to update returned, just in case
    returned = my_thread.join()
user2426679
la source
Connaissez-vous l'équivalent _Thread_target pour Python3? Cet attribut n'existe pas en Python3.
GreySage
J'ai regardé dans le fichier threading.py, il se trouve que c'est _target (les autres attributs portent le même nom).
GreySage
Vous pouvez éviter l' accès aux variables privées de la classe de fil, si vous enregistrez les target, argset les kwargsarguments pour initialiser les variables membres de votre classe.
Tolli
@GreySage Voir ma réponse, j'ai porté ce bloc vers python3 ci
GuySoft
@GreySage answer prend désormais en charge py2 et py3
user2426679
15

Utilisation de la file d'attente:

import threading, queue

def calc_square(num, out_queue1):
  l = []
  for x in num:
    l.append(x*x)
  out_queue1.put(l)


arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())
user341143
la source
1
Vraiment comme cette sollution, courte et douce. Si votre fonction lit une file d' attente d'entrée, et que vous ajoutez à out_queue1vous devez boucle sur out_queue1.get()et attraper l'exception Queue.Empty: ret = [] ; try: ; while True; ret.append(out_queue1.get(block=False)) ; except Queue.Empty: ; pass. Des points-virgules pour simuler les sauts de ligne.
sastorsl
6

Ma solution au problème consiste à encapsuler la fonction et le thread dans une classe. Ne nécessite pas l'utilisation de pools, de files d'attente ou de transmission de variables de type c. Il est également non bloquant. Vous vérifiez plutôt le statut. Voir l'exemple de la façon de l'utiliser à la fin du code.

import threading

class ThreadWorker():
    '''
    The basic idea is given a function create an object.
    The object can then run the function in a thread.
    It provides a wrapper to start it,check its status,and get data out the function.
    '''
    def __init__(self,func):
        self.thread = None
        self.data = None
        self.func = self.save_data(func)

    def save_data(self,func):
        '''modify function to save its returned data'''
        def new_func(*args, **kwargs):
            self.data=func(*args, **kwargs)

        return new_func

    def start(self,params):
        self.data = None
        if self.thread is not None:
            if self.thread.isAlive():
                return 'running' #could raise exception here

        #unless thread exists and is alive start or restart it
        self.thread = threading.Thread(target=self.func,args=params)
        self.thread.start()
        return 'started'

    def status(self):
        if self.thread is None:
            return 'not_started'
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return 'finished'

    def get_results(self):
        if self.thread is None:
            return 'not_started' #could return exception
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return self.data

def add(x,y):
    return x +y

add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()
Peter Lonjers
la source
comment géreriez-vous une exception? disons que la fonction add a été donnée et int et une chaîne. tous les threads échoueraient-ils ou un seul échouerait-il?
user1745713
4

jointoujours retourner None, je pense que vous devriez sous-classe Threadpour gérer les codes de retour et ainsi.

Idée de génie
la source
4

Compte tenu du commentaire de @iman sur la réponse de @JakeBiesinger, je l'ai recomposé pour avoir un certain nombre de threads:

from multiprocessing.pool import ThreadPool

def foo(bar, baz):
    print 'hello {0}'.format(bar)
    return 'foo' + baz

numOfThreads = 3 
results = []

pool = ThreadPool(numOfThreads)

for i in range(0, numOfThreads):
    results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)

# do some other stuff in the main process
# ...
# ...

results = [r.get() for r in results]
print results

pool.close()
pool.join()

À votre santé,

Gars.

Guy Avraham
la source
2

Vous pouvez définir un mutable au-dessus de la portée de la fonction filetée et ajouter le résultat à cela. (J'ai également modifié le code pour qu'il soit compatible python3)

returns = {}
def foo(bar):
    print('hello {0}'.format(bar))
    returns[bar] = 'foo'

from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)

Cela revient {'world!': 'foo'}

Si vous utilisez l'entrée de fonction comme clé de votre dict de résultats, chaque entrée unique est garantie de donner une entrée dans les résultats

Thijs D
la source
2

J'utilise ce wrapper, qui transforme confortablement n'importe quelle fonction pour fonctionner dans un Thread- en prenant soin de sa valeur de retour ou de son exception. Cela n'ajoute pas de Queuefrais généraux.

def threading_func(f):
    """Decorator for running a function in a thread and handling its return
    value or exception"""
    def start(*args, **kw):
        def run():
            try:
                th.ret = f(*args, **kw)
            except:
                th.exc = sys.exc_info()
        def get(timeout=None):
            th.join(timeout)
            if th.exc:
                raise th.exc[0], th.exc[1], th.exc[2] # py2
                ##raise th.exc[1] #py3                
            return th.ret
        th = threading.Thread(None, run)
        th.exc = None
        th.get = get
        th.start()
        return th
    return start

Exemples d'utilisation

def f(x):
    return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))

@threading_func
def th_mul(a, b):
    return a * b
th = th_mul("text", 2.5)

try:
    print(th.get())
except TypeError:
    print("exception thrown ok.")

Notes sur le threadingmodule

La valeur de retour confortable et la gestion des exceptions d'une fonction filetée est un besoin "Pythonique" fréquent et devrait en effet déjà être proposé par le threadingmodule - éventuellement directement dans la Threadclasse standard . ThreadPoola beaucoup trop de frais généraux pour les tâches simples - 3 threads de gestion, beaucoup de bureaucratie. Malheureusement, Threadla mise en page a été copiée à partir de Java à l'origine - que vous voyez par exemple à partir du 1er paramètre constructeur (!) Encore inutile group.

kxr
la source
le premier constructeur n'est pas inutile, il y est réservé pour une future implémentation .. du livre de cuisine de programmation parallèle python
vijay shanker
1

Définissez votre cible à
1) prenez un argument q
2) remplacez toutes les déclarations return fooparq.put(foo); return

donc une fonction

def func(a):
    ans = a * a
    return ans

deviendrait

def func(a, q):
    ans = a * a
    q.put(ans)
    return

puis vous procéderiez comme tel

from Queue import Queue
from threading import Thread

ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]

threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]

Et vous pouvez utiliser des décorateurs / wrappers de fonction pour le faire afin que vous puissiez utiliser vos fonctions existantes targetsans les modifier, mais suivez ce schéma de base.

tscizzle
la source
Cela devrait êtreresults = [ans_q.get() for _ in xrange(len(threads))]
Hemant H Kumar
1

Comme mentionné, le pool de multitraitement est beaucoup plus lent que le thread de base. L'utilisation des files d'attente comme proposé dans certaines réponses est une alternative très efficace. Je l'ai utilisé avec des dictionnaires afin de pouvoir exécuter beaucoup de petits threads et récupérer plusieurs réponses en les combinant avec des dictionnaires:

#!/usr/bin/env python3

import threading
# use Queue for python2
import queue
import random

LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]

NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

def randoms(k, q):
    result = dict()
    result['letter'] = random.choice(LETTERS)
    result['number'] = random.choice(NUMBERS)
    q.put({k: result})

threads = list()
q = queue.Queue()
results = dict()

for name in ('alpha', 'oscar', 'yankee',):
    threads.append( threading.Thread(target=randoms, args=(name, q)) )
    threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
    results.update(q.get())

print(results)
Yves Dorfsman
la source
1

L'idée de GuySoft est géniale, mais je pense que l'objet n'a pas nécessairement à hériter de Thread et start () pourrait être supprimé de l'interface:

from threading import Thread
import queue
class ThreadWithReturnValue(object):
    def __init__(self, target=None, args=(), **kwargs):
        self._que = queue.Queue()
        self._t = Thread(target=lambda q,arg1,kwargs1: q.put(target(*arg1, **kwargs1)) ,
                args=(self._que, args, kwargs), )
        self._t.start()

    def join(self):
        self._t.join()
        return self._que.get()


def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

print(twrv.join())   # prints foo
pandy.song
la source
0

Une solution habituelle consiste à envelopper votre fonction fooavec un décorateur comme

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

Ensuite, tout le code peut ressembler à ça

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]

for t in threads:
    t.start()
    while(True):
        if(len(threading.enumerate()) < max_num):
            break
for t in threads:
    t.join()
return result

Remarque

Un problème important est que les valeurs de retour peuvent ne pas être ordonnées . (En fait, le return valuen'est pas nécessairement sauvegardé dans le queue, car vous pouvez choisir une structure de données thread-safe arbitraire )

Response777
la source
0

Pourquoi ne pas simplement utiliser une variable globale?

import threading


class myThread(threading.Thread):
    def __init__(self, ind, lock):
        threading.Thread.__init__(self)
        self.ind = ind
        self.lock = lock

    def run(self):
        global results
        with self.lock:
            results.append(self.ind)



results = []
lock = threading.Lock()
threads = [myThread(x, lock) for x in range(1, 4)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(results)
Alexey
la source
0

La réponse de Kindall en Python3

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon)
        self._return = None 

    def run(self):
        try:
            if self._target:
                self._return = self._target(*self._args, **self._kwargs)
        finally:
            del self._target, self._args, self._kwargs 

    def join(self,timeout=None):
        Thread.join(self,timeout)
        return self._return
SmartManoj
la source
-2

Si seul True ou False doit être validé à partir de l'appel d'une fonction, une solution plus simple que je trouve consiste à mettre à jour une liste globale.

import threading

lists = {"A":"True", "B":"True"}

def myfunc(name: str, mylist):
    for i in mylist:
        if i == 31:
            lists[name] = "False"
            return False
        else:
            print("name {} : {}".format(name, i))

t1 = threading.Thread(target=myfunc, args=("A", [1, 2, 3, 4, 5, 6], ))
t2 = threading.Thread(target=myfunc, args=("B", [11, 21, 31, 41, 51, 61], ))
t1.start()
t2.start()
t1.join()
t2.join()

for value in lists.values():
    if value == False:
        # Something is suspicious 
        # Take necessary action 

Cela est plus utile lorsque vous souhaitez rechercher si l'un des threads a renvoyé un faux état pour prendre les mesures nécessaires.

Sravya
la source