Java ThreadPoolExecutor: la mise à jour de la taille du pool principal rejette dynamiquement les tâches entrantes par intermittence

13

Je rencontre un problème où si j'essaie de redimensionner ThreadPoolExecutorla taille du pool principal d'un à un nombre différent après la création du pool, puis par intermittence, certaines tâches sont rejetées avec un RejectedExecutionExceptionmême si je ne soumets jamais plus de queueSize + maxPoolSizenombre de tâches.

Le problème que j'essaye de résoudre est d'étendre ThreadPoolExecutorqui redimensionne ses threads principaux en fonction des exécutions en attente se trouvant dans la file d'attente du pool de threads. J'en ai besoin parce que par défaut, un ThreadPoolExecutorn'en créera un nouveau Threadque si la file d'attente est pleine.

Voici un petit programme Pure Java 8 autonome qui illustre le problème.

import static java.lang.Math.max;
import static java.lang.Math.min;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolResizeTest {

    public static void main(String[] args) throws Exception {
        // increase the number of iterations if unable to reproduce
        // for me 100 iterations have been enough
        int numberOfExecutions = 100;

        for (int i = 1; i <= numberOfExecutions; i++) {
            executeOnce();
        }
    }

    private static void executeOnce() throws Exception {
        int minThreads = 1;
        int maxThreads = 5;
        int queueCapacity = 10;

        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                minThreads, maxThreads,
                0, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(queueCapacity),
                new ThreadPoolExecutor.AbortPolicy()
        );

        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads),
                0, 10, TimeUnit.MILLISECONDS);
        CompletableFuture<Void> taskBlocker = new CompletableFuture<>();

        try {
            int totalTasksToSubmit = queueCapacity + maxThreads;

            for (int i = 1; i <= totalTasksToSubmit; i++) {
                // following line sometimes throws a RejectedExecutionException
                pool.submit(() -> {
                    // block the thread and prevent it from completing the task
                    taskBlocker.join();
                });
                // Thread.sleep(10); //enabling even a small sleep makes the problem go away
            }
        } finally {
            taskBlocker.complete(null);
            scheduler.shutdown();
            pool.shutdown();
        }
    }

    /**
     * Resize the thread pool if the number of pending tasks are non-zero.
     */
    private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) {
        int pendingExecutions = pool.getQueue().size();
        int approximateRunningExecutions = pool.getActiveCount();

        /*
         * New core thread count should be the sum of pending and currently executing tasks
         * with an upper bound of maxThreads and a lower bound of minThreads.
         */
        int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions));

        pool.setCorePoolSize(newThreadCount);
        pool.prestartAllCoreThreads();
    }
}

Pourquoi le pool devrait-il jamais lancer une RejectedExecutionException si je ne soumets jamais plus que queueCapacity + maxThreads. Je ne modifie jamais le nombre maximal de threads, donc, selon la définition de ThreadPoolExecutor, il devrait accueillir la tâche dans un thread ou dans la file d'attente.

Bien sûr, si je ne redimensionne jamais le pool, le pool de threads ne rejette aucune soumission. C'est également difficile à déboguer, car l'ajout de retards dans les soumissions fait disparaître le problème.

Des conseils sur la façon de corriger l'exception RejectedExecutionException?

Swaranga Sarma
la source
Pourquoi ne pas fournir votre propre implémentation de ExecutorServiceen encapsulant une existante, qui soumet à nouveau les tâches dont la soumission a échoué en raison du redimensionnement?
daniu
@daniu qui est une solution de contournement. Le point des questions est pourquoi le pool devrait-il jamais lancer une RejectedExecutionException si je ne soumets jamais plus que queueCapacity + maxThreads. Je ne modifie jamais le nombre maximal de threads. Par conséquent, selon la définition de ThreadPoolExecutor, il devrait accueillir la tâche dans un thread ou dans la file d'attente.
Swaranga Sarma
D'accord, je semble avoir mal compris votre question. Qu'Est-ce que c'est? Voulez-vous savoir pourquoi le comportement a lieu ou comment vous le contourner vous pose des problèmes?
daniu
Oui, changer mon implémentation en service exécuteur n'est pas possible car une grande partie du code fait référence à ThreadPoolExecutor. Donc, si je voulais toujours avoir un ThreadPoolExecutor redimensionnable, j'ai besoin de savoir comment le réparer. Que la bonne façon de faire quelque chose comme ça soit d'étendre ThreadPoolExecutor et d'avoir accès à certaines de ses variables protégées et de mettre à jour la taille du pool dans un bloc synchronisé sur un verrou partagé par la super classe.
Swaranga Sarma
L'extension ThreadPoolExecutorest très probablement une mauvaise idée, et n'auriez-vous pas besoin de changer le code existant dans ce cas également? Il serait préférable de fournir un exemple de la façon dont votre code réel accède à l'exécuteur. Je serais surpris s'il utilisait de nombreuses méthodes spécifiques ThreadPoolExecutor(c'est- à -dire pas dans ExecutorService).
daniu

Réponses:

5

Voici un scénario pour lequel cela se produit:

Dans mon exemple, j'utilise minThreads = 0, maxThreads = 2 et queueCapacity = 2 pour le raccourcir. La première commande est soumise, cela se fait dans la méthode execute:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

pour cette commande workQueue.offer (commande) que addWorker (null, false) est exécuté. Le thread de travail supprime d'abord cette commande de la file d'attente dans la méthode d'exécution de thread, donc à ce moment, la file d'attente a toujours une commande,

La deuxième commande est soumise cette fois, workQueue.offer (commande) est exécuté. Maintenant, la file d'attente est pleine

Maintenant, ScheduledExecutorService exécute la méthode resizeThreadPool qui appelle setCorePoolSize avec maxThreads. Voici la méthode setCorePoolSize:

 public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        // We don't really know how many new threads are "needed".
        // As a heuristic, prestart enough new workers (up to new
        // core size) to handle the current number of tasks in
        // queue, but stop if queue becomes empty while doing so.
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}

Cette méthode ajoute un travailleur en utilisant addWorker (null, true). Non, il y a 2 files d'attente de travail en cours d'exécution, le maximum et la file d'attente est pleine.

La troisième commande est soumise et échoue car workQueue.offer (commande) et addWorker (commande, faux) échouent, ce qui entraîne l'exception:

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@24c22fe rejected from java.util.concurrent.ThreadPoolExecutor@cd1e646[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at ThreadPoolResizeTest.executeOnce(ThreadPoolResizeTest.java:60)
at ThreadPoolResizeTest.runTest(ThreadPoolResizeTest.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
at org.junit.runners.ParentRunner.run(ParentRunner.java:292)
at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)

Je pense que pour résoudre ce problème, vous devez définir la capacité de la file d'attente au maximum de commandes que vous souhaitez exécuter.

Thomas Krieger
la source
Correct. J'ai pu reproduire en copiant le code dans ma propre classe et en ajoutant des enregistreurs. Fondamentalement, lorsque la file d'attente est pleine et que je soumets une nouvelle tâche, elle essaiera de créer un nouveau travailleur. En attendant, si à ce moment, mon resizer appelle également setCorePoolSize à 2, cela crée également un nouveau Worker. À ce stade, deux travailleurs sont en compétition pour être ajoutés, mais ils ne peuvent pas l'être tous les deux, car cela violerait la contrainte de taille maximale du pool, de sorte que la nouvelle soumission de tâche est rejetée. Je pense que c'est une condition de concurrence et j'ai déposé un rapport de bogue à OpenJDK. Voyons voir. Mais vous avez répondu à ma question pour obtenir la prime. Je vous remercie.
Swaranga Sarma
2

Je ne sais pas si cela peut être considéré comme un bug. C'est le comportement lorsque les threads de travail supplémentaires sont créés après que la file d'attente est pleine, mais cela a été noté dans les documents java que l'appelant doit gérer les tâches rejetées.

Documents Java

Usine de nouveaux fils. Tous les threads sont créés à l'aide de cette usine (via la méthode addWorker). Tous les appelants doivent être préparés à l'échec d'addWorker, ce qui peut refléter la stratégie d'un système ou d'un utilisateur limitant le nombre de threads. Même s'il n'est pas traité comme une erreur, l'échec de la création de threads peut entraîner le rejet de nouvelles tâches ou le blocage de celles existantes dans la file d'attente.

Lorsque vous redimensionnez la taille du pool principal, disons augmenter, les travailleurs supplémentaires sont créés ( addWorkerméthode dans setCorePoolSize) et l'appel à créer du travail supplémentaire ( addWorkerméthode à partir de execute) est rejeté lorsque la valeur addWorkerfalse ( add Workerdernier extrait de code) est renvoyée, car suffisamment de travailleurs supplémentaires sont déjà créé par setCorePoolSize mais pas encore exécuté pour refléter la mise à jour dans la file d'attente .

Pièces pertinentes

Comparer

public void setCorePoolSize(int corePoolSize) {
    ....
    int k = Math.min(delta, workQueue.size());
    while (k-- > 0 && addWorker(null, true)) {
        if (workQueue.isEmpty())
             break;
    }
}

public void execute(Runnable command) {
    ...
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

private boolean addWorker(Runnable firstTask, boolean core) {
....
   if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
     return false;             
}

Utilisez un gestionnaire d'exécution de rejet de relance personnalisé (cela devrait fonctionner pour votre cas car vous avez la limite supérieure comme taille maximale du pool). Veuillez ajuster au besoin.

public static class RetryRejectionPolicy implements RejectedExecutionHandler {
    public RetryRejectionPolicy () {}

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
           while(true)
            if(e.getQueue().offer(r)) break;
        }
    }
}

ThreadPoolExecutor pool = new ThreadPoolExecutor(
      minThreads, maxThreads,
      0, TimeUnit.SECONDS,
      new LinkedBlockingQueue<Runnable>(queueCapacity),
      new ThreadPoolResizeTest.RetryRejectionPolicy()
 );

Notez également que votre utilisation de l'arrêt n'est pas correcte car cela n'attendra pas que la tâche soumise se termine, mais utilisez-la à la awaitTerminationplace.

Sagar Veeram
la source
Je pense que l'arrêt attend les tâches déjà soumises, selon JavaDoc: shutdown () Initie un arrêt ordonné dans lequel les tâches précédemment soumises sont exécutées, mais aucune nouvelle tâche ne sera acceptée.
Thomas Krieger
@ThomasKrieger - Il exécutera les tâches déjà soumises mais n'attendra pas qu'elles soient terminées - à partir de docs docs.oracle.com/javase/7/docs/api/java/util/concurrent/… - Cette méthode n'attend pas les données déjà soumises tâches pour terminer l'exécution. Utilisez waitTermination pour ce faire.
Sagar Veeram