L'idée derrière Parallel.ForEach()
est que vous avez un ensemble de threads et que chaque thread traite une partie de la collection. Comme vous l'avez remarqué, cela ne fonctionne pas avec async
- await
, où vous souhaitez libérer le thread pour la durée de l'appel asynchrone.
Vous pouvez «corriger» cela en bloquant les ForEach()
threads, mais cela va à l'encontre de tout l'intérêt de async
- await
.
Ce que vous pouvez faire, c'est utiliser TPL Dataflow à la place de Parallel.ForEach()
, qui prend Task
bien en charge les s asynchrones .
Plus précisément, votre code peut être écrit en utilisant un TransformBlock
qui transforme chaque identifiant en un en Customer
utilisant le async
lambda. Ce bloc peut être configuré pour s'exécuter en parallèle. Vous lieriez ce bloc à un ActionBlock
qui écrit chacun Customer
sur la console. Après avoir configuré le réseau de blocage, vous pouvez Post()
chaque identifiant sur le TransformBlock
.
Dans du code:
var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var getCustomerBlock = new TransformBlock<string, Customer>(
async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in ids)
getCustomerBlock.Post(id);
getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();
Bien que vous souhaitiez probablement limiter le parallélisme du TransformBlock
à une petite constante. En outre, vous pouvez limiter la capacité du TransformBlock
et y ajouter les éléments de manière asynchrone en utilisant SendAsync()
, par exemple, si la collection est trop grande.
Un avantage supplémentaire par rapport à votre code (s'il a fonctionné) est que l'écriture commencera dès qu'un seul élément est terminé, et n'attendra pas que tout le traitement soit terminé.
Parallel.ForEach()
d'Post()
objets en parallèle ne devrait pas avoir d'effet réel.La réponse de svick est (comme d'habitude) excellente.
Cependant, je trouve que Dataflow est plus utile lorsque vous avez réellement de grandes quantités de données à transférer. Ou lorsque vous avez besoin d'une
async
file d'attente compatible.Dans votre cas, une solution plus simple consiste simplement à utiliser le
async
parallélisme -style:la source
Parallel.ForEach()
). Mais je pense que c'est actuellement la meilleure option pour faire presque n'importe quelasync
travail avec des collections.ParallelOptions
va vous aider? Cela s'applique uniquement àParallel.For/ForEach/Invoke
, qui, comme le PO établi, ne sont d'aucune utilité ici.GetCustomer
méthode renvoie unTask<T>
, devrait-on utiliserSelect(async i => { await repo.GetCustomer(i);});
?Parallel.ForEach
ne prend pas en chargeasync
.Utiliser DataFlow comme suggéré par svick peut être exagéré, et la réponse de Stephen ne fournit pas les moyens de contrôler la concurrence de l'opération. Cependant, cela peut être réalisé assez simplement:
Les
ToArray()
appels peuvent être optimisés en utilisant un tableau au lieu d'une liste et en remplaçant les tâches terminées, mais je doute que cela fasse une grande différence dans la plupart des scénarios. Exemple d'utilisation selon la question du PO:EDIT Fellow utilisateur SO et wiz TPL Eli Arbel m'a indiqué un article connexe de Stephen Toub . Comme d'habitude, sa mise en œuvre est à la fois élégante et efficace:
la source
Partitioner.Create
utilise le partitionnement par blocs , qui fournit des éléments de manière dynamique aux différentes tâches de sorte que le scénario que vous avez décrit n'aura pas lieu. Notez également que le partitionnement statique (prédéterminé) peut être plus rapide dans certains cas en raison de moins de frais généraux (en particulier la synchronisation). Pour plus d'informations, consultez: msdn.microsoft.com/en-us/library/dd997411(v=vs.110).aspx .Task.WhenAll
) contiendra l'exception (à l'intérieur d'unAggregateException
), et par conséquent si ledit appelant était utiliséawait
, une exception serait lancée dans le site d'appel. Cependant,Task.WhenAll
attendra toujours que toutes les tâches soient terminées etGetPartitions
allouera dynamiquement des éléments lors de l'partition.MoveNext
appel jusqu'à ce qu'il ne reste plus d'éléments à traiter. Cela signifie que si vous n'ajoutez pas votre propre mécanisme pour arrêter le traitement (par exempleCancellationToken
), il ne se produira pas tout seul.var current = partition.Current
avantawait body
, puis l'utilisercurrent
dans la suite (ContinueWith(t => { ... }
).Vous pouvez économiser des efforts avec le nouveau package NuGet AsyncEnumerator , qui n'existait pas il y a 4 ans lorsque la question a été publiée à l'origine. Il vous permet de contrôler le degré de parallélisme:
Avertissement: je suis l'auteur de la bibliothèque AsyncEnumerator, qui est open source et sous licence MIT, et je publie ce message juste pour aider la communauté.
la source
AsyncStreams
et je dois dire que c'est excellent. Je ne saurais trop recommander cette bibliothèque.Enveloppez le
Parallel.Foreach
dans unTask.Run()
et au lieu de l'await
utilisation du mot clé[yourasyncmethod].Result
(vous devez faire la tâche Task.Run pour ne pas bloquer le thread de l'interface utilisateur)
Quelque chose comme ça:
la source
Parallel.ForEach
faire le travail parallèle, qui bloque jusqu'à ce que tout soit terminé, puis poussez le tout sur un thread d'arrière-plan pour avoir une interface utilisateur réactive. Des problèmes avec ça? C'est peut-être un thread de trop, mais c'est un code court et lisible.Task.Run
quandTaskCompletionSource
c'est préférable.TaskCompletionSource
préférable?await
peut être déplacé à l'avant pour enregistrer le nom de la variable supplémentaire.Cela devrait être assez efficace et plus facile que de faire fonctionner tout le flux de données TPL:
la source
await
commevar customers = await ids.SelectAsync(async i => { ... });
:?Je suis un peu en retard pour faire la fête mais vous voudrez peut-être envisager d'utiliser GetAwaiter.GetResult () pour exécuter votre code asynchrone dans un contexte de synchronisation, mais en parallèle comme ci-dessous;
la source
Une méthode d'extension pour cela qui utilise SemaphoreSlim et permet également de définir le degré maximum de parallélisme
Exemple d'utilisation:
la source
Après avoir introduit un tas de méthodes d'assistance, vous pourrez exécuter des requêtes parallèles avec cette syntaxe simple:
Ce qui se passe ici est: nous divisons la collection source en 10 morceaux (
.Split(DegreeOfParallelism)
), puis exécutons 10 tâches, chacune traitant ses éléments un par un (.SelectManyAsync(...)
) et les fusionnons en une seule liste.Il convient de mentionner qu'il existe une approche plus simple:
Mais il faut une précaution : si vous avez une collection source trop volumineuse, elle en planifiera
Task
immédiatement un pour chaque élément, ce qui peut entraîner des baisses de performances significatives.Les méthodes d'extension utilisées dans les exemples ci-dessus se présentent comme suit:
la source