multithreading python attendez que tous les threads soient terminés

119

Cela a peut-être été demandé dans un contexte similaire, mais je n'ai pas pu trouver de réponse après environ 20 minutes de recherche, je vais donc demander.

J'ai écrit un script Python (disons: scriptA.py) et un script (disons scriptB.py)

Dans scriptB, je veux appeler scriptA plusieurs fois avec des arguments différents, chaque fois prend environ une heure à exécuter, (c'est un énorme script, fait beaucoup de choses ... ne vous inquiétez pas) et je veux pouvoir exécuter le scriptA avec tous les différents arguments simultanément, mais je dois attendre que TOUS soient terminés avant de continuer; mon code:

import subprocess

#setup
do_setup()

#run scriptA
subprocess.call(scriptA + argumentsA)
subprocess.call(scriptA + argumentsB)
subprocess.call(scriptA + argumentsC)

#finish
do_finish()

Je veux tout exécuter subprocess.call()en même temps, puis attendre qu'ils soient tous terminés, comment dois-je faire cela?

J'ai essayé d'utiliser le filetage comme l'exemple ici :

from threading import Thread
import subprocess

def call_script(args)
    subprocess.call(args)

#run scriptA   
t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))
t1.start()
t2.start()
t3.start()

Mais je ne pense pas que ce soit juste.

Comment puis-je savoir qu'ils ont tous fini de courir avant d'accéder à mon do_finish()?

Rose Inbar
la source

Réponses:

150

Vous devez utiliser la méthode de jointure de l' Threadobjet à la fin du script.

t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

Ainsi , le fil conducteur attendra jusqu'à ce que t1, t2et t3terminer l' exécution.

Maksim Skurydzin
la source
5
hmmm - vous avez du mal à comprendre quelque chose, cette première exécution de t1 ne va-t-elle pas attendre la fin, puis passe à t2..etc, etc.? comment faire tout cela en même temps? je ne vois pas comment cela les exécuterait en même temps?
Inbar Rose
25
L'appel aux joinblocs jusqu'à ce que le thread termine l'exécution. Vous devrez de toute façon attendre tous les threads. Si se t1termine en premier, vous commencerez à attendre t2(ce qui est peut-être déjà terminé et vous passerez immédiatement à attendre t3). Si cela a t1pris le plus de temps à exécuter, lorsque vous en revenez à la fois t1et t2que vous reviendrez immédiatement sans blocage.
Maksim Skurydzin
1
vous ne comprenez pas ma question - si je copie le code ci-dessus dans mon code - cela fonctionnera-t-il? ou est-ce que je manque quelque chose?
Inbar Rose
2
d'accord, je vois. maintenant je comprends, joinj'étais un peu confus à ce sujet mais je pense que je comprends, en quelque sorte attache le processus actuel au thread et attend que ce soit fait, et si t2 se termine avant t1, alors lorsque t1 est terminé, il vérifiera que t2 est terminé, voir que c'est le cas, puis vérifiez t3..etc..etc .. et alors seulement quand tout est fait, il continuera. impressionnant.
Inbar Rose
3
disons que t1 prend le plus de temps, mais t2 a une exception. que se passe-t-il alors? pouvez-vous attraper cette exception ou vérifier si t2 s'est terminé correctement ou non?
Ciprian Tomoiagă
174

Mettez les threads dans une liste, puis utilisez la méthode Join

 threads = []

 t = Thread(...)
 threads.append(t)

 ...repeat as often as necessary...

 # Start all threads
 for x in threads:
     x.start()

 # Wait for all of them to finish
 for x in threads:
     x.join()
Aaron Digulla
la source
1
Oui, cela fonctionnerait mais c'est plus difficile à comprendre. Vous devriez toujours essayer de trouver un équilibre entre le code compact et la «lisibilité». Rappelez-vous: le code est écrit une fois mais lu plusieurs fois. Il est donc plus important que ce soit facile à comprendre.
Aaron Digulla
2
Le "modèle d'usine" n'est pas quelque chose que je peux expliquer en une phrase. Google pour cela et recherchez stackoverflow.com. Il existe de nombreux exemples et explications. En un mot: vous écrivez du code qui construit quelque chose de complexe pour vous. Comme une vraie usine: vous donnez une commande et récupérez un produit fini.
Aaron Digulla
18
Je n'aime pas l'idée d'utiliser la compréhension de liste pour ses effets secondaires et de ne rien faire d'utile avec la liste résultante. Une simple boucle for serait plus propre même si elle étend deux rangs ...
Ioan Alexandru Cucu
1
@Aaron DIgull Je comprends cela, ce que je veux dire, c'est que je ferais simplement une for x in threads: x.join()compilation de liste plutôt que d'utiliser la compilation de listes
Ioan Alexandru Cucu
1
@IoanAlexandruCucu: Je me demande toujours s'il existe une solution plus lisible et plus efficace: stackoverflow.com/questions/21428602/…
Aaron Digulla
29

En python3, depuis Python 3.2 , il est une nouvelle approche pour atteindre le même résultat, que je préfère personnellement à la création de fil traditionnelle / start / join, package concurrent.futures: https://docs.python.org/3/library/concurrent.futures .html

Utiliser un ThreadPoolExecutorcode serait:

from concurrent.futures.thread import ThreadPoolExecutor
import time

def call_script(ordinal, arg):
    print('Thread', ordinal, 'argument:', arg)
    time.sleep(2)
    print('Thread', ordinal, 'Finished')

args = ['argumentsA', 'argumentsB', 'argumentsC']

with ThreadPoolExecutor(max_workers=2) as executor:
    ordinal = 1
    for arg in args:
        executor.submit(call_script, ordinal, arg)
        ordinal += 1
print('All tasks has been finished')

La sortie du code précédent est quelque chose comme:

Thread 1 argument: argumentsA
Thread 2 argument: argumentsB
Thread 1 Finished
Thread 2 Finished
Thread 3 argument: argumentsC
Thread 3 Finished
All tasks has been finished

L'un des avantages est que vous pouvez contrôler le débit en définissant le nombre maximal de nœuds de calcul simultanés.

Roberto
la source
mais comment savoir quand tous les threads du threadpool sont terminés?
Prime By Design
1
Comme vous pouvez le voir dans l'exemple, le code après l' withinstruction est exécuté lorsque toutes les tâches sont terminées.
Roberto
cela ne fonctionne pas. Essayez de faire quelque chose de très long en threads. Votre instruction d'impression s'exécutera avant la fin du fil
Pranalee
@Pranalee, ce code fonctionne, j'ai mis à jour le code pour ajouter les lignes de sortie. Vous ne pouvez pas voir la tâche "Toutes les tâches ..." avant que tous les threads ne soient terminés, c'est ainsi que l' withinstruction fonctionne par conception dans ce cas. Quoi qu'il en soit, vous pouvez toujours ouvrir une nouvelle question dans SO et publier votre code afin que nous puissions vous aider à découvrir ce qui se passe dans votre cas.
Roberto
@PrimeByDesign vous pouvez utiliser la concurrent.futures.waitfonction, vous pouvez voir un exemple réel ici Documents
Alexander Fortin
28

Je préfère utiliser la compréhension de liste basée sur une liste d'entrée:

inputs = [scriptA + argumentsA, scriptA + argumentsB, ...]
threads = [Thread(target=call_script, args=(i)) for i in inputs]
[t.start() for t in threads]
[t.join() for t in threads]
Adam Matan
la source
La réponse vérifiée explique bien mais celle-ci est plus courte et ne nécessite pas de répétitions laides. Juste une bonne réponse. :)
tleb
La compréhension de liste juste pour les effets secondaires est généralement dépréciée *. Mais dans ce cas d'utilisation, cela semble être une bonne idée. * stackoverflow.com/questions/5753597/…
Vinayak Kaniyarakkal
3
@VinayakKaniyarakkal for t in threads:t.start()n'est-ce pas mieux?
SmartManoj
5

Vous pouvez avoir une classe quelque chose comme ci-dessous à partir de laquelle vous pouvez ajouter un nombre 'n' de fonctions ou de console_scripts que vous souhaitez exécuter en parallèle avec passion et démarrer l'exécution et attendre que tous les travaux soient terminés.

from multiprocessing import Process

class ProcessParallel(object):
    """
    To Process the  functions parallely

    """    
    def __init__(self, *jobs):
        """
        """
        self.jobs = jobs
        self.processes = []

    def fork_processes(self):
        """
        Creates the process objects for given function deligates
        """
        for job in self.jobs:
            proc  = Process(target=job)
            self.processes.append(proc)

    def start_all(self):
        """
        Starts the functions process all together.
        """
        for proc in self.processes:
            proc.start()

    def join_all(self):
        """
        Waits untill all the functions executed.
        """
        for proc in self.processes:
            proc.join()


def two_sum(a=2, b=2):
    return a + b

def multiply(a=2, b=2):
    return a * b


#How to run:
if __name__ == '__main__':
    #note: two_sum, multiply can be replace with any python console scripts which
    #you wanted to run parallel..
    procs =  ProcessParallel(two_sum, multiply)
    #Add all the process in list
    procs.fork_processes()
    #starts  process execution 
    procs.start_all()
    #wait until all the process got executed
    procs.join_all()
PBD
la source
C'est du multitraitement. La question portait sur docs.python.org/3/library/threading.html
Rustam A.
3

À partir de la documentation du threading module

Il y a un objet «thread principal»; cela correspond au thread de contrôle initial dans le programme Python. Ce n'est pas un thread démon.

Il est possible que des «objets thread factices» soient créés. Ce sont des objets threads correspondant à des «threads étrangers», qui sont des threads de contrôle démarrés en dehors du module de threading, comme directement à partir du code C. Les objets thread factices ont des fonctionnalités limitées; ils sont toujours considérés comme vivants et démoniaques et ne peuvent pas être join()édités. Ils ne sont jamais supprimés, car il est impossible de détecter la fin des threads étrangers.

Donc, pour attraper ces deux cas où vous n'êtes pas intéressé à conserver une liste des threads que vous créez:

import threading as thrd


def alter_data(data, index):
    data[index] *= 2


data = [0, 2, 6, 20]

for i, value in enumerate(data):
    thrd.Thread(target=alter_data, args=[data, i]).start()

for thread in thrd.enumerate():
    if thread.daemon:
        continue
    try:
        thread.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err.args[0]:
            # catchs main thread
            continue
        else:
            raise

Après quoi:

>>> print(data)
[0, 4, 12, 40]
berna1111
la source
2

Peut-être, quelque chose comme

for t in threading.enumerate():
    if t.daemon:
        t.join()
jno
la source
J'ai essayé ce code mais je ne suis pas sûr de son fonctionnement car la dernière instruction de mon code a été imprimée après cette boucle for et le processus n'était toujours pas terminé.
Omkar
1

Je viens de rencontrer le même problème où je devais attendre tous les threads créés à l'aide de la boucle for.Je viens d'essayer le morceau de code suivant.Ce n'est peut-être pas la solution parfaite mais je pensais que ce serait une solution simple tester:

for t in threading.enumerate():
    try:
        t.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err:
            continue
        else:
            raise
Omkar
la source