Je rencontre un problème où si j'essaie de redimensionner ThreadPoolExecutor
la taille du pool principal d'un à un nombre différent après la création du pool, puis par intermittence, certaines tâches sont rejetées avec un RejectedExecutionException
même si je ne soumets jamais plus de queueSize + maxPoolSize
nombre de tâches.
Le problème que j'essaye de résoudre est d'étendre ThreadPoolExecutor
qui redimensionne ses threads principaux en fonction des exécutions en attente se trouvant dans la file d'attente du pool de threads. J'en ai besoin parce que par défaut, un ThreadPoolExecutor
n'en créera un nouveau Thread
que si la file d'attente est pleine.
Voici un petit programme Pure Java 8 autonome qui illustre le problème.
import static java.lang.Math.max;
import static java.lang.Math.min;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolResizeTest {
public static void main(String[] args) throws Exception {
// increase the number of iterations if unable to reproduce
// for me 100 iterations have been enough
int numberOfExecutions = 100;
for (int i = 1; i <= numberOfExecutions; i++) {
executeOnce();
}
}
private static void executeOnce() throws Exception {
int minThreads = 1;
int maxThreads = 5;
int queueCapacity = 10;
ThreadPoolExecutor pool = new ThreadPoolExecutor(
minThreads, maxThreads,
0, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueCapacity),
new ThreadPoolExecutor.AbortPolicy()
);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads),
0, 10, TimeUnit.MILLISECONDS);
CompletableFuture<Void> taskBlocker = new CompletableFuture<>();
try {
int totalTasksToSubmit = queueCapacity + maxThreads;
for (int i = 1; i <= totalTasksToSubmit; i++) {
// following line sometimes throws a RejectedExecutionException
pool.submit(() -> {
// block the thread and prevent it from completing the task
taskBlocker.join();
});
// Thread.sleep(10); //enabling even a small sleep makes the problem go away
}
} finally {
taskBlocker.complete(null);
scheduler.shutdown();
pool.shutdown();
}
}
/**
* Resize the thread pool if the number of pending tasks are non-zero.
*/
private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) {
int pendingExecutions = pool.getQueue().size();
int approximateRunningExecutions = pool.getActiveCount();
/*
* New core thread count should be the sum of pending and currently executing tasks
* with an upper bound of maxThreads and a lower bound of minThreads.
*/
int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions));
pool.setCorePoolSize(newThreadCount);
pool.prestartAllCoreThreads();
}
}
Pourquoi le pool devrait-il jamais lancer une RejectedExecutionException si je ne soumets jamais plus que queueCapacity + maxThreads. Je ne modifie jamais le nombre maximal de threads, donc, selon la définition de ThreadPoolExecutor, il devrait accueillir la tâche dans un thread ou dans la file d'attente.
Bien sûr, si je ne redimensionne jamais le pool, le pool de threads ne rejette aucune soumission. C'est également difficile à déboguer, car l'ajout de retards dans les soumissions fait disparaître le problème.
Des conseils sur la façon de corriger l'exception RejectedExecutionException?
la source
ExecutorService
en encapsulant une existante, qui soumet à nouveau les tâches dont la soumission a échoué en raison du redimensionnement?ThreadPoolExecutor
est très probablement une mauvaise idée, et n'auriez-vous pas besoin de changer le code existant dans ce cas également? Il serait préférable de fournir un exemple de la façon dont votre code réel accède à l'exécuteur. Je serais surpris s'il utilisait de nombreuses méthodes spécifiquesThreadPoolExecutor
(c'est- à -dire pas dansExecutorService
).Réponses:
Voici un scénario pour lequel cela se produit:
Dans mon exemple, j'utilise minThreads = 0, maxThreads = 2 et queueCapacity = 2 pour le raccourcir. La première commande est soumise, cela se fait dans la méthode execute:
pour cette commande workQueue.offer (commande) que addWorker (null, false) est exécuté. Le thread de travail supprime d'abord cette commande de la file d'attente dans la méthode d'exécution de thread, donc à ce moment, la file d'attente a toujours une commande,
La deuxième commande est soumise cette fois, workQueue.offer (commande) est exécuté. Maintenant, la file d'attente est pleine
Maintenant, ScheduledExecutorService exécute la méthode resizeThreadPool qui appelle setCorePoolSize avec maxThreads. Voici la méthode setCorePoolSize:
Cette méthode ajoute un travailleur en utilisant addWorker (null, true). Non, il y a 2 files d'attente de travail en cours d'exécution, le maximum et la file d'attente est pleine.
La troisième commande est soumise et échoue car workQueue.offer (commande) et addWorker (commande, faux) échouent, ce qui entraîne l'exception:
Je pense que pour résoudre ce problème, vous devez définir la capacité de la file d'attente au maximum de commandes que vous souhaitez exécuter.
la source
Je ne sais pas si cela peut être considéré comme un bug. C'est le comportement lorsque les threads de travail supplémentaires sont créés après que la file d'attente est pleine, mais cela a été noté dans les documents java que l'appelant doit gérer les tâches rejetées.
Documents Java
Lorsque vous redimensionnez la taille du pool principal, disons augmenter, les travailleurs supplémentaires sont créés (
addWorker
méthode danssetCorePoolSize
) et l'appel à créer du travail supplémentaire (addWorker
méthode à partir deexecute
) est rejeté lorsque la valeuraddWorker
false (add Worker
dernier extrait de code) est renvoyée, car suffisamment de travailleurs supplémentaires sont déjà créé parsetCorePoolSize
mais pas encore exécuté pour refléter la mise à jour dans la file d'attente .Pièces pertinentes
Comparer
Utilisez un gestionnaire d'exécution de rejet de relance personnalisé (cela devrait fonctionner pour votre cas car vous avez la limite supérieure comme taille maximale du pool). Veuillez ajuster au besoin.
Notez également que votre utilisation de l'arrêt n'est pas correcte car cela n'attendra pas que la tâche soumise se termine, mais utilisez-la à la
awaitTermination
place.la source