Comment attendre la fin de plusieurs threads?

109

Comment attendre simplement la fin de tous les processus threadés? Par exemple, disons que j'ai:

public class DoSomethingInAThread implements Runnable{

    public static void main(String[] args) {
        for (int n=0; n<1000; n++) {
            Thread t = new Thread(new DoSomethingInAThread());
            t.start();
        }
        // wait for all threads' run() methods to complete before continuing
    }

    public void run() {
        // do something here
    }


}

Comment puis-je modifier cela pour que la main()méthode s'arrête au commentaire jusqu'à ce que toutes les run()méthodes des threads se terminent? Merci!

DivideByHero
la source

Réponses:

163

Vous mettez tous les threads dans un tableau, vous les démarrez tous, puis vous avez une boucle

for(i = 0; i < threads.length; i++)
  threads[i].join();

Chaque jointure se bloquera jusqu'à ce que le thread respectif soit terminé. Les threads peuvent se terminer dans un ordre différent de celui dans lequel vous les rejoignez, mais ce n'est pas un problème: lorsque la boucle se termine, tous les threads sont terminés.

Martin c.Löwis
la source
1
@Mykola: quel est exactement l'avantage d'utiliser un groupe de threads? Ce n'est pas parce que l'API est là que vous devez l'utiliser ...
Martin v. Löwis
2
Voir: «Un groupe de threads représente un ensemble de threads». C'est sémantique correct pour ce cas d'utilisation! Et: "Un fil est autorisé à accéder aux informations sur son propre groupe de fil"
Martin K.
4
Le livre «Effective Java» recommande d'éviter les groupes de threads (élément 73).
Bastien Léonard
2
Les bogues mentionnés dans Effective Java auraient dû être corrigés dans Java 6. Si les nouvelles versions de Java ne sont pas une restriction, il est préférable d'utiliser Futures pour résoudre les problèmes de thread. Martin v. Löwis: Vous avez raison. Ce n'est pas pertinent pour ce problème, mais il est agréable d'obtenir plus d'informations sur les threads en cours d'exécution à partir d'un objet (comme ExecutorService). Je pense que c'est bien d'utiliser des fonctionnalités données pour résoudre un problème; vous aurez peut-être besoin de plus de flexibilité (informations sur les threads) à l'avenir. Il est également juste de mentionner les anciennes classes de buggy dans les anciens JDK.
Martin K.
5
ThreadGroup n'implémente pas de jointure au niveau du groupe, donc pourquoi les gens poussent ThreadGroup est un peu déroutant. Les gens utilisent-ils vraiment des verrous de rotation et interrogent-ils le compte actif du groupe? Vous auriez du mal à me convaincre que cela serait mieux que de simplement appeler join sur tous les threads.
41

Une façon serait de faire un Listde Threads, de créer et de lancer chaque thread, tout en l'ajoutant à la liste. Une fois que tout est lancé, parcourez la liste et appelez join()chacun d'eux. Peu importe l'ordre dans lequel les threads finissent de s'exécuter, tout ce que vous devez savoir est que lorsque la seconde boucle se termine, chaque thread sera terminé.

Une meilleure approche consiste à utiliser un ExecutorService et ses méthodes associées:

List<Callable> callables = ... // assemble list of Callables here
                               // Like Runnable but can return a value
ExecutorService execSvc = Executors.newCachedThreadPool();
List<Future<?>> results = execSvc.invokeAll(callables);
// Note: You may not care about the return values, in which case don't
//       bother saving them

L'utilisation d'un ExecutorService (et de toutes les nouvelles fonctionnalités des utilitaires de concurrence de Java 5 ) est incroyablement flexible, et l'exemple ci-dessus efface à peine la surface.

Adam Batkin
la source
ThreadGroup est la voie à suivre! Avec une liste mutable, vous aurez des ennuis (synchronisation)
Martin K.
3
Quoi? Comment auriez-vous des ennuis? Il est uniquement modifiable (uniquement lisible) par le thread qui effectue le lancement, donc tant qu'il ne modifie pas la liste en l' itérant, tout va bien.
Adam Batkin
Cela dépend de la manière dont vous l'utilisez. Si vous utilisez la classe appelante dans un thread, vous aurez des problèmes.
Martin K.
27
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class DoSomethingInAThread implements Runnable
{
   public static void main(String[] args) throws ExecutionException, InterruptedException
   {
      //limit the number of actual threads
      int poolSize = 10;
      ExecutorService service = Executors.newFixedThreadPool(poolSize);
      List<Future<Runnable>> futures = new ArrayList<Future<Runnable>>();

      for (int n = 0; n < 1000; n++)
      {
         Future f = service.submit(new DoSomethingInAThread());
         futures.add(f);
      }

      // wait for all tasks to complete before continuing
      for (Future<Runnable> f : futures)
      {
         f.get();
      }

      //shut down the executor service so that this thread can exit
      service.shutdownNow();
   }

   public void run()
   {
      // do something here
   }
}
jt.
la source
a fonctionné comme un charme ... j'ai deux ensembles de threads qui ne devraient pas fonctionner simultanément en raison de problèmes sur plusieurs cookies. J'ai utilisé votre exemple pour exécuter un ensemble de threads à la fois .. merci de partager vos connaissances ...
arn-arn
@Dantalian - Dans votre classe Runnable (probablement dans la méthode run), vous voudrez capturer toutes les exceptions qui se sont produites et les stocker localement (ou stocker un message / une condition d'erreur). Dans l'exemple, f.get () renvoie votre objet que vous avez soumis à ExecutorService. Votre objet peut avoir une méthode pour récupérer les exceptions / conditions d'erreur. Selon la façon dont vous modifiez l'exemple fourni, vous devrez peut-être convertir l'objet transformé par f.get () en votre type attendu.
jt.
12

au lieu de join(), qui est une ancienne API, vous pouvez utiliser CountDownLatch . J'ai modifié votre code comme ci-dessous pour répondre à vos besoins.

import java.util.concurrent.*;
class DoSomethingInAThread implements Runnable{
    CountDownLatch latch;
    public DoSomethingInAThread(CountDownLatch latch){
        this.latch = latch;
    } 
    public void run() {
        try{
            System.out.println("Do some thing");
            latch.countDown();
        }catch(Exception err){
            err.printStackTrace();
        }
    }
}

public class CountDownLatchDemo {
    public static void main(String[] args) {
        try{
            CountDownLatch latch = new CountDownLatch(1000);
            for (int n=0; n<1000; n++) {
                Thread t = new Thread(new DoSomethingInAThread(latch));
                t.start();
            }
            latch.await();
            System.out.println("In Main thread after completion of 1000 threads");
        }catch(Exception err){
            err.printStackTrace();
        }
    }
}

Explication :

  1. CountDownLatch a été initialisé avec un nombre donné de 1000 selon vos besoins.

  2. Chaque thread de travail DoSomethingInAThread décrémentera le CountDownLatch, qui a été passé dans le constructeur.

  3. Fil principal CountDownLatchDemo await()jusqu'à ce que le compte soit devenu zéro. Une fois que le compte est devenu zéro, vous serez en dessous de la ligne en sortie.

    In Main thread after completion of 1000 threads

Plus d'informations sur la page de documentation d'Oracle

public void await()
           throws InterruptedException

Provoque l'attente du thread actuel jusqu'à ce que le verrou ait décompté jusqu'à zéro, sauf si le thread est interrompu.

Reportez-vous à la question SE associée pour d'autres options:

attendez que tous les threads aient terminé leur travail en java

Ravindra babu
la source
8

Évitez complètement la classe Thread et utilisez à la place les abstractions supérieures fournies dans java.util.concurrent

La classe ExecutorService fournit la méthode invokeAll qui semble faire exactement ce que vous voulez.

Henrik
la source
6

Pensez à utiliser java.util.concurrent.CountDownLatch. Exemples dans javadocs

Pablo Cavalieri
la source
Est un verrou pour les fils, le verrou du verrou fonctionne avec un compte à rebours. Dans la méthode run () de votre thread déclarez explicitement attendre qu'un CountDownLatch atteigne son compte à rebours à 0. Vous pouvez utiliser le même CountDownLatch dans plusieurs threads pour les libérer simultanément. Je ne sais pas si c'est ce dont vous avez besoin, je voulais juste le mentionner car c'est utile lorsque vous travaillez dans un environnement multithread.
Pablo Cavalieri
Peut-être devriez-vous mettre cette explication dans le corps de votre réponse?
Aaron Hall
Les exemples dans le Javadoc sont très descriptifs, c'est pourquoi je n'en ai pas ajouté. docs.oracle.com/javase/7/docs/api/java/util/concurrent/… . Dans le premier exemple, tous les threads Workers sont des versions simultanément car ils attendent que le CountdownLatch startSignal atteigne zéro, ce qui se produit dans startSignal.countDown (). Ensuite, le thread mian attend que tous les travaux se terminent en utilisant l'instruction doneSignal.await (). doneSignal diminue sa valeur dans chaque travailleur.
Pablo Cavalieri
6

Comme l'a suggéré Martin K, cela java.util.concurrent.CountDownLatchsemble être une meilleure solution. Juste en ajoutant un exemple pour le même

     public class CountDownLatchDemo
{

    public static void main (String[] args)
    {
        int noOfThreads = 5;
        // Declare the count down latch based on the number of threads you need
        // to wait on
        final CountDownLatch executionCompleted = new CountDownLatch(noOfThreads);
        for (int i = 0; i < noOfThreads; i++)
        {
            new Thread()
            {

                @Override
                public void run ()
                {

                    System.out.println("I am executed by :" + Thread.currentThread().getName());
                    try
                    {
                        // Dummy sleep
                        Thread.sleep(3000);
                        // One thread has completed its job
                        executionCompleted.countDown();
                    }
                    catch (InterruptedException e)
                    {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }

            }.start();
        }

        try
        {
            // Wait till the count down latch opens.In the given case till five
            // times countDown method is invoked
            executionCompleted.await();
            System.out.println("All over");
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }

}
Freaky Thommi
la source
4

En fonction de vos besoins, vous pouvez également consulter les classes CountDownLatch et CyclicBarrier dans le package java.util.concurrent. Ils peuvent être utiles si vous voulez que vos threads attendent les uns les autres, ou si vous voulez un contrôle plus fin sur la manière dont vos threads s'exécutent (par exemple, attendre dans leur exécution interne qu'un autre thread définisse un état). Vous pouvez également utiliser un CountDownLatch pour signaler à tous vos threads de démarrer en même temps, au lieu de les démarrer un par un lorsque vous parcourez votre boucle. Les documents API standard en ont un exemple, ainsi que l'utilisation d'un autre CountDownLatch pour attendre que tous les threads aient terminé leur exécution.

Jeff
la source
1

Créez l'objet thread à l'intérieur de la première boucle for.

for (int i = 0; i < threads.length; i++) {
     threads[i] = new Thread(new Runnable() {
         public void run() {
             // some code to run in parallel
         }
     });
     threads[i].start();
 }

Et puis ce que tout le monde dit ici.

for(i = 0; i < threads.length; i++)
  threads[i].join();
optimus0127
la source
0

Vous pouvez le faire avec l'objet "ThreadGroup" et son paramètre activeCount :

Martin K.
la source
Je ne sais pas comment vous proposez exactement de le faire. Si vous proposez d'interroger activeCount en boucle: c'est mauvais, car c'est occupé-wait (même si vous dormez entre les sondages - vous obtenez alors un compromis entre business et réactivité).
Martin
@Martin v. Löwis: "Join n'attendra qu'un seul thread. Une meilleure solution pourrait être un java.util.concurrent.CountDownLatch. Initialisez simplement le verrou avec le nombre défini sur le nombre de threads de travail. Chaque thread de travail doit appeler countDown () juste avant sa sortie, et le thread principal appelle simplement wait (), qui bloquera jusqu'à ce que le compteur atteigne zéro. Le problème avec join () est également que vous ne pouvez pas commencer à ajouter plus de threads dynamiquement. La liste va exploser avec une modification concomitante. " Votre solution fonctionne bien pour le problème mais pas pour un usage général.
Martin K.
0

Comme alternative à CountDownLatch, vous pouvez également utiliser CyclicBarrier par exemple

public class ThreadWaitEx {
    static CyclicBarrier barrier = new CyclicBarrier(100, new Runnable(){
        public void run(){
            System.out.println("clean up job after all tasks are done.");
        }
    });
    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(new MyCallable(barrier));
            t.start();
        }       
    }

}    

class MyCallable implements Runnable{
    private CyclicBarrier b = null;
    public MyCallable(CyclicBarrier b){
        this.b = b;
    }
    @Override
    public void run(){
        try {
            //do something
            System.out.println(Thread.currentThread().getName()+" is waiting for barrier after completing his job.");
            b.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }       
}

Pour utiliser CyclicBarrier dans ce cas, barrière.await () doit être la dernière instruction, c'est-à-dire lorsque votre thread a terminé son travail. CyclicBarrier peut être réutilisé avec sa méthode reset (). Pour citer des javadocs:

Un CyclicBarrier prend en charge une commande exécutable facultative qui est exécutée une fois par point de barrière, après l'arrivée du dernier thread de la partie, mais avant la libération des threads. Cette action de barrière est utile pour mettre à jour l'état partagé avant que l'une des parties ne continue.

shailendra1118
la source
Je ne pense pas que ce soit un bon exemple pour CyclicBarrier. Pourquoi utilisez-vous un appel Thread.sleep ()?
Guenther
@Guenther - oui, j'ai changé le code pour répondre à l'exigence.
shailendra1118
CyclicBarrier n'est pas une alternative à CountDownLatch. Lorsque les threads doivent effectuer un compte à rebours à plusieurs reprises, vous devez créer un CyclicBarrier, sinon par défaut CountDownLatch (sauf si cela nécessite une abstraction supplémentaire de l'exécution, à quel point vous devez vous tourner vers le niveau supérieur, Services).
Elysiumplain
0

Le join()n'a pas été utile pour moi. voir cet exemple à Kotlin:

    val timeInMillis = System.currentTimeMillis()
    ThreadUtils.startNewThread(Runnable {
        for (i in 1..5) {
            val t = Thread(Runnable {
                Thread.sleep(50)
                var a = i
                kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
                Thread.sleep(200)
                for (j in 1..5) {
                    a *= j
                    Thread.sleep(100)
                    kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
                }
                kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
            })
            t.start()
        }
    })

Le résultat:

Thread-5|a=5
Thread-1|a=1
Thread-3|a=3
Thread-2|a=2
Thread-4|a=4
Thread-2|2*1=2
Thread-3|3*1=3
Thread-1|1*1=1
Thread-5|5*1=5
Thread-4|4*1=4
Thread-1|2*2=2
Thread-5|10*2=10
Thread-3|6*2=6
Thread-4|8*2=8
Thread-2|4*2=4
Thread-3|18*3=18
Thread-1|6*3=6
Thread-5|30*3=30
Thread-2|12*3=12
Thread-4|24*3=24
Thread-4|96*4=96
Thread-2|48*4=48
Thread-5|120*4=120
Thread-1|24*4=24
Thread-3|72*4=72
Thread-5|600*5=600
Thread-4|480*5=480
Thread-3|360*5=360
Thread-1|120*5=120
Thread-2|240*5=240
Thread-1|TaskDurationInMillis = 765
Thread-3|TaskDurationInMillis = 765
Thread-4|TaskDurationInMillis = 765
Thread-5|TaskDurationInMillis = 765
Thread-2|TaskDurationInMillis = 765

Maintenant, laissez-moi utiliser les join()threads pour:

    val timeInMillis = System.currentTimeMillis()
    ThreadUtils.startNewThread(Runnable {
        for (i in 1..5) {
            val t = Thread(Runnable {
                Thread.sleep(50)
                var a = i
                kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
                Thread.sleep(200)
                for (j in 1..5) {
                    a *= j
                    Thread.sleep(100)
                    kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
                }
                kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
            })
            t.start()
            t.join()
        }
    })

Et le résultat:

Thread-1|a=1
Thread-1|1*1=1
Thread-1|2*2=2
Thread-1|6*3=6
Thread-1|24*4=24
Thread-1|120*5=120
Thread-1|TaskDurationInMillis = 815
Thread-2|a=2
Thread-2|2*1=2
Thread-2|4*2=4
Thread-2|12*3=12
Thread-2|48*4=48
Thread-2|240*5=240
Thread-2|TaskDurationInMillis = 1568
Thread-3|a=3
Thread-3|3*1=3
Thread-3|6*2=6
Thread-3|18*3=18
Thread-3|72*4=72
Thread-3|360*5=360
Thread-3|TaskDurationInMillis = 2323
Thread-4|a=4
Thread-4|4*1=4
Thread-4|8*2=8
Thread-4|24*3=24
Thread-4|96*4=96
Thread-4|480*5=480
Thread-4|TaskDurationInMillis = 3078
Thread-5|a=5
Thread-5|5*1=5
Thread-5|10*2=10
Thread-5|30*3=30
Thread-5|120*4=120
Thread-5|600*5=600
Thread-5|TaskDurationInMillis = 3833

Comme il est clair lorsque nous utilisons le join:

  1. Les threads s'exécutent séquentiellement.
  2. Le premier échantillon prend 765 millisecondes tandis que le deuxième échantillon prend 3833 millisecondes.

Notre solution pour éviter de bloquer d'autres threads était de créer une ArrayList:

val threads = ArrayList<Thread>()

Maintenant, lorsque nous voulons démarrer un nouveau thread, nous l'ajoutons le plus à ArrayList:

addThreadToArray(
    ThreadUtils.startNewThread(Runnable {
        ...
    })
)

La addThreadToArrayfonction:

@Synchronized
fun addThreadToArray(th: Thread) {
    threads.add(th)
}

La startNewThreadfonction:

fun startNewThread(runnable: Runnable) : Thread {
    val th = Thread(runnable)
    th.isDaemon = false
    th.priority = Thread.MAX_PRIORITY
    th.start()
    return th
}

Vérifiez l'achèvement des threads comme ci-dessous partout où c'est nécessaire:

val notAliveThreads = ArrayList<Thread>()
for (t in threads)
    if (!t.isAlive)
        notAliveThreads.add(t)
threads.removeAll(notAliveThreads)
if (threads.size == 0){
    // The size is 0 -> there is no alive threads.
}
Misagh
la source