Impossible de créer un pool de threads mis en cache avec une limite de taille?

127

Il semble impossible de créer un pool de threads mis en cache avec une limite au nombre de threads qu'il peut créer.

Voici comment static Executors.newCachedThreadPool est implémenté dans la bibliothèque Java standard:

 public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Donc, en utilisant ce modèle pour continuer à créer un pool de threads mis en cache de taille fixe:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());

Maintenant, si vous utilisez ceci et soumettez 3 tâches, tout ira bien. La soumission de toute autre tâche entraînera le rejet d'exceptions d'exécution.

Essayer ceci:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());

Cela entraînera l'exécution séquentielle de tous les threads. Par exemple, le pool de threads ne créera jamais plus d'un thread pour gérer vos tâches.

Il s'agit d'un bogue dans la méthode d'exécution de ThreadPoolExecutor? Ou peut-être que c'est intentionnel? Ou il y a un autre moyen?

Edit: Je veux quelque chose d'exactement comme le pool de threads mis en cache (il crée des threads à la demande, puis les tue après un certain délai) mais avec une limite sur le nombre de threads qu'il peut créer et la possibilité de continuer à mettre en file d'attente des tâches supplémentaires une fois qu'il a atteint sa limite de thread. Selon la réponse de sjlee, cela est impossible. En regardant la méthode execute () de ThreadPoolExecutor, c'est en effet impossible. J'aurais besoin de sous-classer ThreadPoolExecutor et de surcharger execute () un peu comme SwingWorker, mais ce que SwingWorker fait dans son execute () est un hack complet.

Matt Crinklaw-Vogt
la source
1
Quelle est ta question? Votre deuxième exemple de code n'est-il pas la réponse à votre titre?
rsp
4
Je veux un pool de threads qui ajoutera des threads à la demande au fur et à mesure que le nombre de tâches augmentera, mais qui n'ajoutera jamais plus qu'un certain nombre maximum de threads. CachedThreadPool le fait déjà, sauf qu'il ajoutera un nombre illimité de threads et ne s'arrêtera pas à une taille prédéfinie. La taille que je définis dans les exemples est 3. Le deuxième exemple ajoute 1 thread, mais n'en ajoute pas deux de plus lorsque de nouvelles tâches arrivent alors que les autres tâches ne sont pas encore terminées.
Matt Crinklaw-Vogt
Vérifiez ceci, il le résout, debuggingisfun.blogspot.com/2012/05/…
ethan

Réponses:

235

ThreadPoolExecutor a les plusieurs comportements clés suivants, et vos problèmes peuvent être expliqués par ces comportements.

Lorsque les tâches sont soumises,

  1. Si le pool de threads n'a pas atteint la taille de cœur, il crée de nouveaux threads.
  2. Si la taille du cœur a été atteinte et qu'il n'y a pas de threads inactifs, il met les tâches en file d'attente.
  3. Si la taille du cœur a été atteinte, il n'y a pas de threads inactifs et la file d'attente devient pleine, elle crée de nouveaux threads (jusqu'à ce qu'elle atteigne la taille maximale).
  4. Si la taille maximale a été atteinte, qu'il n'y a pas de threads inactifs et que la file d'attente est pleine, la politique de rejet entre en vigueur.

Dans le premier exemple, notez que SynchronousQueue a essentiellement une taille de 0. Par conséquent, dès que vous atteignez la taille maximale (3), la politique de rejet entre en jeu (# 4).

Dans le deuxième exemple, la file d'attente de choix est une LinkedBlockingQueue qui a une taille illimitée. Par conséquent, vous êtes coincé avec le comportement n ° 2.

Vous ne pouvez pas vraiment bricoler beaucoup avec le type mis en cache ou le type fixe, car leur comportement est presque complètement déterminé.

Si vous souhaitez avoir un pool de threads limité et dynamique, vous devez utiliser une taille de cœur positive et une taille maximale combinée à une file d'attente de taille finie. Par exemple,

new ThreadPoolExecutor(10, // core size
    50, // max size
    10*60, // idle timeout
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(20)); // queue with a size

Addendum : c'est une réponse assez ancienne, et il semble que JDK a changé son comportement en ce qui concerne la taille de cœur de 0. Depuis JDK 1.6, si la taille de cœur est de 0 et que le pool n'a pas de threads, le ThreadPoolExecutor ajoutera un thread pour exécuter cette tâche. Par conséquent, la taille de cœur de 0 est une exception à la règle ci-dessus. Merci Steve d' avoir porté cela à mon attention.

sjlee
la source
4
Vous devez écrire quelques mots sur la méthode allowCoreThreadTimeOutpour rendre cette réponse parfaite. Voir la réponse de @ user1046052
hsestupin
1
Très bonne réponse! Juste un point à ajouter: d'autres politiques de rejet méritent également d'être mentionnées. Voir la réponse de @brianegge
Jeff
1
Le comportement 2 ne devrait pas dire "Si la taille maxThread a été atteinte et qu'il n'y a pas de threads inactifs, il met les tâches en file d'attente." ?
Zoltán
1
Pourriez-vous expliquer ce qu'implique la taille de la file d'attente? Cela signifie-t-il que seulement 20 tâches peuvent être mises en file d'attente avant d'être rejetées?
Zoltán
1
@ Zoltán J'ai écrit ceci il y a quelque temps, il y a donc une chance que certains comportements aient changé depuis lors (je n'ai pas suivi de trop près les dernières activités), mais en supposant que ces comportements restent inchangés, le n ° 2 est correct comme indiqué, et c'est peut-être le point le plus important (et quelque peu surprenant) de ceci. Une fois la taille du cœur atteinte, TPE privilégie la mise en file d'attente à la création de nouveaux threads. La taille de la file d'attente correspond littéralement à la taille de la file d'attente transmise au TPE. Si la file d'attente est pleine mais qu'elle n'a pas atteint la taille maximale, elle créera un nouveau thread (pas de tâches de rejet). Voir # 3. J'espère que cela pourra aider.
sjlee
60

Sauf si j'ai manqué quelque chose, la solution à la question initiale est simple. Le code suivant implémente le comportement souhaité comme décrit par l'affiche d'origine. Il engendrera jusqu'à 5 threads pour travailler sur une file d'attente illimitée et les threads inactifs se termineront après 60 secondes.

tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>());
tp.allowCoreThreadTimeOut(true);

la source
1
Vous avez raison. Cette méthode a été ajoutée dans jdk 1.6, donc peu de gens le savent. De plus, vous ne pouvez pas avoir une taille de pool de base "min", ce qui est malheureux.
jtahlborn
4
Mon seul souci à ce sujet est (à partir de la documentation du JDK 8): "Lorsqu'une nouvelle tâche est soumise dans la méthode execute (Runnable), et que moins de threads corePoolSize sont en cours d'exécution, un nouveau thread est créé pour gérer la demande, même si un autre travailleur les threads sont inactifs. "
veegee
À peu près sûr que cela ne fonctionne pas réellement. La dernière fois que j'ai regardé faire ce qui précède n'exécute en fait que votre travail dans un thread même si vous en créez 5. Encore une fois, cela fait quelques années, mais quand j'ai plongé dans l'implémentation de ThreadPoolExecutor, il n'est envoyé aux nouveaux threads qu'une fois que votre file d'attente était pleine. L'utilisation d'une file d'attente illimitée empêche cela. Vous pouvez tester en soumettant le travail et en enregistrant le nom du thread puis en dormant. Chaque exécutable finira par imprimer le même nom / ne sera exécuté sur aucun autre thread.
Matt Crinklaw-Vogt
2
Cela fonctionne, Matt. Vous définissez la taille du noyau sur 0, c'est pourquoi vous n'aviez qu'un seul thread. L'astuce ici est de définir la taille du noyau sur la taille maximale.
T-Gergely
1
@vegee a raison - Cela ne fonctionne pas vraiment très bien - ThreadPoolExecutor ne réutilisera les threads qu'au-dessus de corePoolSize. Ainsi, lorsque corePoolSize est égal à maxPoolSize, vous ne bénéficierez de la mise en cache des threads que lorsque votre pool est plein (donc si vous avez l'intention de l'utiliser mais que vous restez généralement en dessous de la taille maximale de votre pool, vous pouvez également réduire le délai d'expiration du thread à un faible valeur; et sachez qu'il n'y a pas de mise en cache - toujours de nouveaux fils)
Chris Riddell
7

Eu le même problème. Puisqu'aucune autre réponse ne met tous les problèmes ensemble, j'ajoute le mien:

Il est maintenant clairement écrit dans la documentation : si vous utilisez une file d'attente qui ne bloque pas ( LinkedBlockingQueue) le paramètre max threads n'a aucun effet, seuls les threads principaux sont utilisés.

alors:

public class MyExecutor extends ThreadPoolExecutor {

    public MyExecutor() {
        super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        allowCoreThreadTimeOut(true);
    }

    public void setThreads(int n){
        setMaximumPoolSize(Math.max(1, n));
        setCorePoolSize(Math.max(1, n));
    }

}

Cet exécuteur testamentaire a:

  1. Aucun concept de threads maximum car nous utilisons une file d'attente illimitée. C'est une bonne chose car une telle file d'attente peut amener l'exécuteur à créer un nombre massif de threads supplémentaires non essentiels s'il suit sa politique habituelle.

  2. Une file d'attente de taille maximale Integer.MAX_VALUE. Submit()jettera RejectedExecutionExceptionsi le nombre de tâches en attente dépasse Integer.MAX_VALUE. Pas sûr que nous allons d'abord manquer de mémoire ou cela se produira.

  3. Possède 4 fils de base. Les threads de base inactifs se terminent automatiquement s'ils sont inactifs pendant 5 secondes.Donc, oui, strictement à la demande, le nombre peut être modifié à l'aide de la setThreads()méthode.

  4. S'assure que le nombre minimum de threads principaux n'est jamais inférieur à un, sinon submit()rejettera chaque tâche. Puisque les threads principaux doivent être> = max threads, la méthode setThreads()définit également max threads, bien que le paramètre max threads soit inutile pour une file d'attente illimitée.

Dakota du Sud
la source
Je pense que vous devez également définir 'allowCoreThreadTimeOut' sur 'true', sinon, une fois les threads créés, vous les garderez pour toujours: gist.github.com/ericdcobb/46b817b384f5ca9d5f5d
eric
oups je viens de manquer ça, désolé, votre réponse est parfaite alors!
eric
6

Dans votre premier exemple, les tâches suivantes sont rejetées car il AbortPolicys'agit de la valeur par défaut RejectedExecutionHandler. Le ThreadPoolExecutor contient les stratégies suivantes, que vous pouvez modifier via la setRejectedExecutionHandlerméthode:

CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy

Il semble que vous vouliez un pool de threads mis en cache avec un CallerRunsPolicy.

Brianegge
la source
5

Aucune des réponses ici n'a résolu mon problème, qui concernait la création d'un nombre limité de connexions HTTP à l'aide du client HTTP d'Apache (version 3.x). Puisqu'il m'a fallu quelques heures pour trouver une bonne configuration, je vais partager:

private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
  TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
  Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

Cela crée un ThreadPoolExecutorqui commence par cinq et contient un maximum de dix threads exécutés simultanément en utilisant CallerRunsPolicypour l'exécution.

Paul
la source
Le problème avec cette solution est que si vous augmentez le nombre de producteurs, vous augmenterez le nombre de threads exécutant les threads d'arrière-plan. Dans de nombreux cas, ce n'est pas ce que vous voulez.
Gray
3

Par le Javadoc pour ThreadPoolExecutor:

S'il y a plus de threads corePoolSize mais moins que maximumPoolSize en cours d'exécution, un nouveau thread sera créé uniquement si la file d'attente est pleine . En définissant de la même manière corePoolSize et maximumPoolSize, vous créez un pool de threads de taille fixe.

(Soulignez le mien.)

La réponse de jitter est ce que vous voulez, bien que la mienne réponde à votre autre question. :)

Jonathan Feinberg
la source
2

il y a une autre option. Au lieu d'utiliser le nouveau SynchronousQueue, vous pouvez également utiliser n'importe quelle autre file d'attente, mais vous devez vous assurer que sa taille est de 1, ce qui forcera executorservice à créer un nouveau thread.

Ashkrit
la source
Je pense que vous voulez dire la taille 0 (par défaut), de sorte qu'il n'y aura pas de tâche en file d'attente et forcera vraiment executorservice à créer un nouveau thread à chaque fois.
Leonmax
2

Il ne semble pas que l'une des réponses réponde réellement à la question - en fait, je ne vois pas de moyen de le faire - même si vous sous-classez de PooledExecutorService puisque la plupart des méthodes / propriétés sont privées, par exemple en faisant addIfUnderMaximumPoolSize a été protégé, vous pouvez procédez comme suit:

class MyThreadPoolService extends ThreadPoolService {
    public void execute(Runnable run) {
        if (poolSize() == 0) {
            if (addIfUnderMaximumPoolSize(run) != null)
                return;
        }
        super.execute(run);
    }
}

Le plus proche que j'ai obtenu était celui-ci - mais même ce n'est pas une très bonne solution

new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
    public void execute(Runnable command) {
        if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {        
            super.setCorePoolSize(super.getCorePoolSize() + 1);
        }
        super.execute(command);
    }

    protected void afterExecute(Runnable r, Throwable t) {
         // nothing in the queue
         if (getQueue().isEmpty() && getPoolSize() > min) {
             setCorePoolSize(getCorePoolSize() - 1);
         }
    };
 };

ps pas testé le ci-dessus

Stuart
la source
2

Voici une autre solution. Je pense que cette solution se comporte comme vous le souhaitez (mais pas fier de cette solution):

final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    public boolean offer(Runnable o) {
        if (size() > 1)
            return false;
        return super.offer(o);
    };

    public boolean add(Runnable o) {
        if (super.offer(o))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
};

RejectedExecutionHandler handler = new RejectedExecutionHandler() {         
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        queue.add(r);
    }
};

dbThreadExecutor =
        new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);
Stuart
la source
2

C'est ce que vous voulez (du moins je suppose). Pour une explication, consultez la réponse de Jonathan Feinberg

Executors.newFixedThreadPool(int n)

Crée un pool de threads qui réutilise un nombre fixe de threads fonctionnant à partir d'une file d'attente partagée illimitée. À tout moment, la plupart des threads nThreads seront des tâches de traitement actives. Si des tâches supplémentaires sont soumises lorsque tous les threads sont actifs, ils attendront dans la file d'attente jusqu'à ce qu'un thread soit disponible. Si un thread se termine en raison d'un échec lors de l'exécution avant l'arrêt, un nouveau prendra sa place si nécessaire pour exécuter les tâches suivantes. Les threads du pool existeront jusqu'à ce qu'il soit explicitement arrêté.

trembler
la source
4
Bien sûr, je pourrais utiliser un pool de threads fixe, mais cela laisserait n threads pour toujours, ou jusqu'à ce que j'appelle shutdown. Je veux quelque chose d'exactement comme le pool de threads mis en cache (il crée des threads à la demande et les tue après un certain temps) mais avec une limite sur le nombre de threads qu'il peut créer.
Matt Crinklaw-Vogt
0
  1. Vous pouvez utiliser ThreadPoolExecutorcomme suggéré par @sjlee

    Vous pouvez contrôler dynamiquement la taille du pool. Jetez un œil à cette question pour plus de détails:

    Pool de threads dynamiques

    OU

  2. Vous pouvez utiliser l' API newWorkStealingPool , qui a été introduite avec java 8.

    public static ExecutorService newWorkStealingPool()

    Crée un pool de threads volants en utilisant tous les processeurs disponibles comme niveau de parallélisme cible.

Par défaut, le niveau de parallélisme est défini sur le nombre de cœurs de processeur de votre serveur. Si vous avez un serveur CPU à 4 cœurs, la taille du pool de threads serait de 4. Cette API renvoie le ForkJoinPooltype de ExecutorService et permet le vol de travail des threads inactifs en volant les tâches des threads occupés dans ForkJoinPool.

Ravindra babu
la source
0

Le problème a été résumé comme suit:

Je veux quelque chose d'exactement comme le pool de threads mis en cache (il crée des threads à la demande et les tue après un certain temps) mais avec une limite sur le nombre de threads qu'il peut créer et la possibilité de continuer à mettre en file d'attente des tâches supplémentaires une fois qu'il a atteint son limite de thread.

Avant de pointer vers la solution, je vais expliquer pourquoi les solutions suivantes ne fonctionnent pas:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());

Cela ne mettra en file d'attente aucune tâche lorsque la limite de 3 est atteinte car SynchronousQueue, par définition, ne peut contenir aucun élément.

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

Cela ne créera pas plus d'un seul thread car ThreadPoolExecutor ne crée que des threads dépassant le corePoolSize si la file d'attente est pleine. Mais LinkedBlockingQueue n'est jamais plein.

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS,
    new LinkedBlockingQueue<Runnable>());
executor.allowCoreThreadTimeOut(true);

Cela ne réutilisera pas les threads tant que le corePoolSize n'a pas été atteint car ThreadPoolExecutor augmente le nombre de threads jusqu'à ce que corePoolSize soit atteint même si les threads existants sont inactifs. Si vous pouvez vivre avec cet inconvénient, c'est la solution la plus simple au problème. C'est aussi la solution décrite dans "Java Concurrency in Practice" (note de bas de page p172).

La seule solution complète au problème décrit semble être celle consistant à remplacer la offerméthode de la file d'attente et à écrire un RejectedExecutionHandlercomme expliqué dans les réponses à cette question: Comment faire en sorte que ThreadPoolExecutor augmente les threads au maximum avant de mettre en file d'attente?

Stefan Feuerhahn
la source
0

Cela fonctionne pour Java8 + (et autres, pour l'instant ..)

     Executor executor = new ThreadPoolExecutor(3, 3, 5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>()){{allowCoreThreadTimeOut(true);}};

où 3 est la limite du nombre de threads et 5 est le délai d'expiration pour les threads inactifs.

Si vous voulez vérifier si cela fonctionne vous - même , voici le code pour faire le travail:

public static void main(String[] args) throws InterruptedException {
    final int DESIRED_NUMBER_OF_THREADS=3; // limit of number of Threads for the task at a time
    final int DESIRED_THREAD_IDLE_DEATH_TIMEOUT=5; //any idle Thread ends if it remains idle for X seconds

    System.out.println( java.lang.Thread.activeCount() + " threads");
    Executor executor = new ThreadPoolExecutor(DESIRED_NUMBER_OF_THREADS, DESIRED_NUMBER_OF_THREADS, DESIRED_THREAD_IDLE_DEATH_TIMEOUT, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>()) {{allowCoreThreadTimeOut(true);}};

    System.out.println(java.lang.Thread.activeCount() + " threads");

    for (int i = 0; i < 5; i++) {
        final int fi = i;
        executor.execute(() -> waitsout("starting hard thread computation " + fi, "hard thread computation done " + fi,2000));
    }
    System.out.println("If this is UP, it works");

    while (true) {
        System.out.println(
                java.lang.Thread.activeCount() + " threads");
        Thread.sleep(700);
    }

}

static void waitsout(String pre, String post, int timeout) {
    try {
        System.out.println(pre);
        Thread.sleep(timeout);
        System.out.println(post);
    } catch (Exception e) {
    }
}

la sortie du code ci-dessus pour moi est

1 threads
1 threads
If this is UP, it works
starting hard thread computation 0
4 threads
starting hard thread computation 2
starting hard thread computation 1
4 threads
4 threads
hard thread computation done 2
hard thread computation done 0
hard thread computation done 1
starting hard thread computation 3
starting hard thread computation 4
4 threads
4 threads
4 threads
hard thread computation done 3
hard thread computation done 4
4 threads
4 threads
4 threads
4 threads
3 threads
3 threads
3 threads
1 threads
1 threads
Ondřej
la source