Utiliser async / wait pour plusieurs tâches

406

J'utilise un client API complètement asynchrone, c'est-à-dire que chaque opération retourne Taskou Task<T>, par exemple:

static async Task DoSomething(int siteId, int postId, IBlogClient client)
{
    await client.DeletePost(siteId, postId); // call API client
    Console.WriteLine("Deleted post {0}.", siteId);
}

En utilisant les opérateurs asynchrones / attendent C # 5, quelle est la façon correcte / la plus efficace de démarrer plusieurs tâches et d'attendre qu'elles se terminent toutes:

int[] ids = new[] { 1, 2, 3, 4, 5 };
Parallel.ForEach(ids, i => DoSomething(1, i, blogClient).Wait());

ou:

int[] ids = new[] { 1, 2, 3, 4, 5 };
Task.WaitAll(ids.Select(i => DoSomething(1, i, blogClient)).ToArray());

Étant donné que le client API utilise HttpClient en interne, je m'attends à ce que cela émette 5 requêtes HTTP immédiatement, écrivant sur la console à mesure que chacune se termine.

Ben Foster
la source
Et quel est le problème?
Serg Shevchenko
1
@SergShevchenko Le problème est que son Parallel.ForEach se fait incorrectement (voir réponses) - il demande si ses tentatives pour exécuter du code asynchrone en parallèle sont correctes, offrant deux tentatives de solution, et si l'une est meilleure que l'autre (et probablement pourquoi ).
AnorZaken

Réponses:

572
int[] ids = new[] { 1, 2, 3, 4, 5 };
Parallel.ForEach(ids, i => DoSomething(1, i, blogClient).Wait());

Bien que vous exécutiez les opérations en parallèle avec le code ci-dessus, ce code bloque chaque thread sur lequel chaque opération s'exécute. Par exemple, si l'appel réseau prend 2 secondes, chaque thread se bloque pendant 2 secondes sans rien faire mais attendre.

int[] ids = new[] { 1, 2, 3, 4, 5 };
Task.WaitAll(ids.Select(i => DoSomething(1, i, blogClient)).ToArray());

D'un autre côté, le code ci-dessus WaitAllbloque également les threads et vos threads ne seront pas libres de traiter tout autre travail jusqu'à la fin de l'opération.

Approche recommandée

Je préférerais WhenAllqui effectuera vos opérations de manière asynchrone en parallèle.

public async Task DoWork() {

    int[] ids = new[] { 1, 2, 3, 4, 5 };
    await Task.WhenAll(ids.Select(i => DoSomething(1, i, blogClient)));
}

En fait, dans le cas ci-dessus, vous n'en avez même pas besoin await, vous pouvez simplement revenir directement de la méthode car vous n'avez aucune suite:

public Task DoWork() 
{
    int[] ids = new[] { 1, 2, 3, 4, 5 };
    return Task.WhenAll(ids.Select(i => DoSomething(1, i, blogClient)));
}

Pour sauvegarder cela, voici un article de blog détaillé retraçant toutes les alternatives et leurs avantages / inconvénients: Comment et où les E / S asynchrones simultanées avec l'API Web ASP.NET

tugberk
la source
31
"le code ci-dessus WaitAllbloque également les threads" - ne bloque-t-il pas seulement un thread, celui qui a appelé WaitAll?
Rawling
5
@Rawling la documentation indique que "Type: System.Threading.Tasks.Task [] Un tableau d'instances de tâche sur lequel attendre.". Ainsi, il bloque tous les threads.
Mixxiphoid
30
@Mixxiphoid: Le bit que vous avez cité ne signifie pas qu'il bloque tous les threads. Il bloque uniquement le thread appelant pendant l'exécution des tâches fournies. La façon dont ces tâches sont réellement exécutées dépend du planificateur. En règle générale, une fois chaque tâche terminée, le thread sur lequel elle s'exécutait est renvoyé au pool. Chaque thread ne resterait pas bloqué jusqu'à ce que les autres soient terminés.
musaul
3
@tugberk, D'après ce que je comprends, la seule différence entre les méthodes de tâche "classiques" et les équivalents Async est la façon dont ils interagissent avec les threads entre le début et la fin d'une tâche. La méthode classique sous un ordonnanceur par défaut monopolise un thread pendant cette période (même s'il "dort"), contrairement aux asynchrones. Aucune différence en dehors de cette période, c.-à-d. Que la tâche est planifiée mais pas commencée, et quand elle est terminée mais que l'appelant attend toujours.
musaul
3
@tugberk Voir stackoverflow.com/a/6123432/750216 la différence est que le thread appelant soit bloqué ou non, le reste est le même. Vous voudrez peut-être modifier la réponse pour clarifier.
Răzvan Flavius ​​Panda
45

J'étais curieux de voir les résultats des méthodes fournies dans la question ainsi que la réponse acceptée, alors je l'ai mise à l'épreuve.

Voici le code:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncTest
{
    class Program
    {
        class Worker
        {
            public int Id;
            public int SleepTimeout;

            public async Task DoWork(DateTime testStart)
            {
                var workerStart = DateTime.Now;
                Console.WriteLine("Worker {0} started on thread {1}, beginning {2} seconds after test start.",
                    Id, Thread.CurrentThread.ManagedThreadId, (workerStart-testStart).TotalSeconds.ToString("F2"));
                await Task.Run(() => Thread.Sleep(SleepTimeout));
                var workerEnd = DateTime.Now;
                Console.WriteLine("Worker {0} stopped; the worker took {1} seconds, and it finished {2} seconds after the test start.",
                   Id, (workerEnd-workerStart).TotalSeconds.ToString("F2"), (workerEnd-testStart).TotalSeconds.ToString("F2"));
            }
        }

        static void Main(string[] args)
        {
            var workers = new List<Worker>
            {
                new Worker { Id = 1, SleepTimeout = 1000 },
                new Worker { Id = 2, SleepTimeout = 2000 },
                new Worker { Id = 3, SleepTimeout = 3000 },
                new Worker { Id = 4, SleepTimeout = 4000 },
                new Worker { Id = 5, SleepTimeout = 5000 },
            };

            var startTime = DateTime.Now;
            Console.WriteLine("Starting test: Parallel.ForEach...");
            PerformTest_ParallelForEach(workers, startTime);
            var endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            startTime = DateTime.Now;
            Console.WriteLine("Starting test: Task.WaitAll...");
            PerformTest_TaskWaitAll(workers, startTime);
            endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            startTime = DateTime.Now;
            Console.WriteLine("Starting test: Task.WhenAll...");
            var task = PerformTest_TaskWhenAll(workers, startTime);
            task.Wait();
            endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            Console.ReadKey();
        }

        static void PerformTest_ParallelForEach(List<Worker> workers, DateTime testStart)
        {
            Parallel.ForEach(workers, worker => worker.DoWork(testStart).Wait());
        }

        static void PerformTest_TaskWaitAll(List<Worker> workers, DateTime testStart)
        {
            Task.WaitAll(workers.Select(worker => worker.DoWork(testStart)).ToArray());
        }

        static Task PerformTest_TaskWhenAll(List<Worker> workers, DateTime testStart)
        {
            return Task.WhenAll(workers.Select(worker => worker.DoWork(testStart)));
        }
    }
}

Et la sortie résultante:

Starting test: Parallel.ForEach...
Worker 1 started on thread 1, beginning 0.21 seconds after test start.
Worker 4 started on thread 5, beginning 0.21 seconds after test start.
Worker 2 started on thread 3, beginning 0.21 seconds after test start.
Worker 5 started on thread 6, beginning 0.21 seconds after test start.
Worker 3 started on thread 4, beginning 0.21 seconds after test start.
Worker 1 stopped; the worker took 1.90 seconds, and it finished 2.11 seconds after the test start.
Worker 2 stopped; the worker took 3.89 seconds, and it finished 4.10 seconds after the test start.
Worker 3 stopped; the worker took 5.89 seconds, and it finished 6.10 seconds after the test start.
Worker 4 stopped; the worker took 5.90 seconds, and it finished 6.11 seconds after the test start.
Worker 5 stopped; the worker took 8.89 seconds, and it finished 9.10 seconds after the test start.
Test finished after 9.10 seconds.

Starting test: Task.WaitAll...
Worker 1 started on thread 1, beginning 0.01 seconds after test start.
Worker 2 started on thread 1, beginning 0.01 seconds after test start.
Worker 3 started on thread 1, beginning 0.01 seconds after test start.
Worker 4 started on thread 1, beginning 0.01 seconds after test start.
Worker 5 started on thread 1, beginning 0.01 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.01 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.01 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.01 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.01 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.01 seconds after the test start.
Test finished after 5.01 seconds.

Starting test: Task.WhenAll...
Worker 1 started on thread 1, beginning 0.00 seconds after test start.
Worker 2 started on thread 1, beginning 0.00 seconds after test start.
Worker 3 started on thread 1, beginning 0.00 seconds after test start.
Worker 4 started on thread 1, beginning 0.00 seconds after test start.
Worker 5 started on thread 1, beginning 0.00 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.00 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.00 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.00 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.00 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.00 seconds after the test start.
Test finished after 5.00 seconds.
RiaanDP
la source
2
Si vous mettez du temps sur chacun de ces résultats, ce serait plus utile
Serj Sagan
8
@SerjSagan mon idée initiale était juste de vérifier que les travailleurs sont démarrés simultanément dans chaque cas, mais j'ai ajouté des horodatages pour améliorer la clarté du test. Merci pour la suggestion.
RiaanDP
Merci pour le test. Cependant, il semble un peu étrange que vous exécutiez thread.sleep sur un thread distinct du "thread de travail". Ce n'est pas important dans ce cas, mais cela n'aurait-il pas plus de sens pour Task.Run les threads de travail si nous simulons le travail de calcul, ou simplement Task.Delay au lieu de dormir si nous simulons les E / S? Je vérifie simplement ce que vous en pensez.
AnorZaken
24

Étant donné que l'API que vous appelez est asynchrone, la Parallel.ForEachversion n'a pas beaucoup de sens. Vous ne devriez pas utiliser .Waitdans la WaitAllversion car cela perdrait le parallélisme Une autre alternative si l'appelant utilise async Task.WhenAllaprès avoir fait Selectet ToArraypour générer le tableau de tâches. Une deuxième alternative utilise Rx 2.0

James Manning
la source
10

Vous pouvez utiliser la Task.WhenAllfonction que vous pouvez passer n tâches; Task.WhenAllrenverra une tâche qui s'exécute jusqu'à la fin lorsque toutes les tâches que vous avez passées pour se Task.WhenAllterminer. Vous devez attendre de manière asynchrone Task.WhenAllpour ne pas bloquer votre thread d'interface utilisateur:

   public async Task DoSomeThing() {

       var Task[] tasks = new Task[numTasks];
       for(int i = 0; i < numTask; i++)
       {
          tasks[i] = CallSomeAsync();
       }
       await Task.WhenAll(tasks);
       // code that'll execute on UI thread
   }
Ahmed Wasim
la source
8

Parallel.ForEachnécessite une liste de travailleurs définis par l'utilisateur et un non asynchrone Action pour effectuer avec chaque travailleur.

Task.WaitAllet Task.WhenAllnécessitent un List<Task>, qui sont par définition asynchrones.

J'ai trouvé la réponse de RiaanDP très utile pour comprendre la différence, mais elle a besoin d'une correction pour . Pas assez de réputation pour répondre à son commentaire, donc ma propre réponse.Parallel.ForEach

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncTest
{
    class Program
    {
        class Worker
        {
            public int Id;
            public int SleepTimeout;

            public void DoWork(DateTime testStart)
            {
                var workerStart = DateTime.Now;
                Console.WriteLine("Worker {0} started on thread {1}, beginning {2} seconds after test start.",
                    Id, Thread.CurrentThread.ManagedThreadId, (workerStart - testStart).TotalSeconds.ToString("F2"));
                Thread.Sleep(SleepTimeout);
                var workerEnd = DateTime.Now;
                Console.WriteLine("Worker {0} stopped; the worker took {1} seconds, and it finished {2} seconds after the test start.",
                   Id, (workerEnd - workerStart).TotalSeconds.ToString("F2"), (workerEnd - testStart).TotalSeconds.ToString("F2"));
            }

            public async Task DoWorkAsync(DateTime testStart)
            {
                var workerStart = DateTime.Now;
                Console.WriteLine("Worker {0} started on thread {1}, beginning {2} seconds after test start.",
                    Id, Thread.CurrentThread.ManagedThreadId, (workerStart - testStart).TotalSeconds.ToString("F2"));
                await Task.Run(() => Thread.Sleep(SleepTimeout));
                var workerEnd = DateTime.Now;
                Console.WriteLine("Worker {0} stopped; the worker took {1} seconds, and it finished {2} seconds after the test start.",
                   Id, (workerEnd - workerStart).TotalSeconds.ToString("F2"), (workerEnd - testStart).TotalSeconds.ToString("F2"));
            }
        }

        static void Main(string[] args)
        {
            var workers = new List<Worker>
            {
                new Worker { Id = 1, SleepTimeout = 1000 },
                new Worker { Id = 2, SleepTimeout = 2000 },
                new Worker { Id = 3, SleepTimeout = 3000 },
                new Worker { Id = 4, SleepTimeout = 4000 },
                new Worker { Id = 5, SleepTimeout = 5000 },
            };

            var startTime = DateTime.Now;
            Console.WriteLine("Starting test: Parallel.ForEach...");
            PerformTest_ParallelForEach(workers, startTime);
            var endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            startTime = DateTime.Now;
            Console.WriteLine("Starting test: Task.WaitAll...");
            PerformTest_TaskWaitAll(workers, startTime);
            endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            startTime = DateTime.Now;
            Console.WriteLine("Starting test: Task.WhenAll...");
            var task = PerformTest_TaskWhenAll(workers, startTime);
            task.Wait();
            endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            Console.ReadKey();
        }

        static void PerformTest_ParallelForEach(List<Worker> workers, DateTime testStart)
        {
            Parallel.ForEach(workers, worker => worker.DoWork(testStart));
        }

        static void PerformTest_TaskWaitAll(List<Worker> workers, DateTime testStart)
        {
            Task.WaitAll(workers.Select(worker => worker.DoWorkAsync(testStart)).ToArray());
        }

        static Task PerformTest_TaskWhenAll(List<Worker> workers, DateTime testStart)
        {
            return Task.WhenAll(workers.Select(worker => worker.DoWorkAsync(testStart)));
        }
    }
}

La sortie résultante est ci-dessous. Les délais d'exécution sont comparables. J'ai effectué ce test pendant que mon ordinateur effectuait l'analyse antivirus hebdomadaire. Changer l'ordre des tests a changé les temps d'exécution sur eux.

Starting test: Parallel.ForEach...
Worker 1 started on thread 9, beginning 0.02 seconds after test start.
Worker 2 started on thread 10, beginning 0.02 seconds after test start.
Worker 3 started on thread 11, beginning 0.02 seconds after test start.
Worker 4 started on thread 13, beginning 0.03 seconds after test start.
Worker 5 started on thread 14, beginning 0.03 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.02 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.02 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.03 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.03 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.03 seconds after the test start.
Test finished after 5.03 seconds.

Starting test: Task.WaitAll...
Worker 1 started on thread 9, beginning 0.00 seconds after test start.
Worker 2 started on thread 9, beginning 0.00 seconds after test start.
Worker 3 started on thread 9, beginning 0.00 seconds after test start.
Worker 4 started on thread 9, beginning 0.00 seconds after test start.
Worker 5 started on thread 9, beginning 0.01 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.01 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.01 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.01 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.01 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.01 seconds after the test start.
Test finished after 5.01 seconds.

Starting test: Task.WhenAll...
Worker 1 started on thread 9, beginning 0.00 seconds after test start.
Worker 2 started on thread 9, beginning 0.00 seconds after test start.
Worker 3 started on thread 9, beginning 0.00 seconds after test start.
Worker 4 started on thread 9, beginning 0.00 seconds after test start.
Worker 5 started on thread 9, beginning 0.00 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.00 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.00 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.00 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.00 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.01 seconds after the test start.
Test finished after 5.01 seconds.
JPortillo
la source