Comment faire en sorte que ThreadPoolExecutor augmente les threads au maximum avant de mettre en file d'attente?

100

J'ai été frustré pendant un certain temps par le comportement par défaut ThreadPoolExecutorqui soutient les ExecutorServicepools de threads que beaucoup d'entre nous utilisent. Pour citer les Javadocs:

S'il y a plus de threads corePoolSize mais moins que maximumPoolSize en cours d'exécution, un nouveau thread sera créé uniquement si la file d'attente est pleine .

Cela signifie que si vous définissez un pool de threads avec le code suivant, il ne démarrera jamais le 2ème thread car le LinkedBlockingQueueest illimité.

ExecutorService threadPool =
   new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
      TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));

Ce n'est que si vous avez une file d'attente limitée et que la file d'attente est pleine que les threads au-dessus du numéro de base démarrent. Je soupçonne qu'un grand nombre de programmeurs multithread juniors Java ne sont pas conscients de ce comportement du ThreadPoolExecutor.

Maintenant, j'ai un cas d'utilisation spécifique où ce n'est pas optimal. Je cherche des moyens, sans écrire ma propre classe TPE, de contourner ce problème.

Mes besoins concernent un service Web qui effectue des rappels vers un tiers éventuellement peu fiable.

  • Je ne veux pas faire le rappel de manière synchrone avec la requête Web, donc je veux utiliser un pool de threads.
  • J'en reçois généralement quelques-uns par minute, donc je ne veux pas avoir newFixedThreadPool(...)un grand nombre de threads qui sont pour la plupart dormants.
  • De temps en temps, j'obtiens une rafale de ce trafic et je veux augmenter le nombre de threads à une valeur maximale (disons 50).
  • Je dois faire un meilleur essai pour faire tous les rappels donc je veux mettre en file d'attente tous ceux supplémentaires au-dessus de 50. Je ne veux pas submerger le reste de mon serveur Web en utilisant un newCachedThreadPool().

Comment puis-je contourner cette limitation dans ThreadPoolExecutorlaquelle la file d'attente doit être limitée et pleine avant que d' autres threads ne soient démarrés? Comment puis-je le faire démarrer plus de threads avant de mettre des tâches en file d'attente?

Éditer:

@Flavio fait un bon point sur l'utilisation de ThreadPoolExecutor.allowCoreThreadTimeOut(true)pour que les threads principaux expirent et se terminent . J'ai pensé à cela mais je voulais toujours la fonctionnalité core-threads. Je ne voulais pas que le nombre de threads dans le pool tombe si possible en dessous de la taille du noyau.

gris
la source
1
Étant donné que votre exemple crée un maximum de 10 threads, y a-t-il de réelles économies à utiliser quelque chose qui se développe / rétrécit sur un pool de threads de taille fixe?
bstempi
Bon point @bstempi. Le nombre était quelque peu arbitraire. Je l'ai augmenté dans la question à 50. Je ne sais pas exactement combien de threads simultanés je veux réellement travailler maintenant que j'ai cette solution.
Gris
1
Oh zut! 10 votes positifs si je pouvais ici, exactement la même position dans laquelle je suis.
Eugene

Réponses:

51

Comment puis-je contourner cette limitation dans ThreadPoolExecutorlaquelle la file d'attente doit être limitée et pleine avant que d'autres threads ne soient démarrés.

Je crois avoir enfin trouvé une solution un peu élégante (peut-être un peu hacky) à cette limitation avec ThreadPoolExecutor. Il consiste à étendre LinkedBlockingQueuel'avoir de retour falsepour queue.offer(...)quand il y a déjà quelques tâches en attente. Si les threads actuels ne suivent pas les tâches en file d'attente, le TPE ajoutera des threads supplémentaires. Si le pool est déjà à max threads, alors le RejectedExecutionHandlersera appelé. C'est le gestionnaire qui fait ensuite le put(...)dans la file d'attente.

Il est certainement étrange d'écrire une file d'attente où offer(...)peut revenir falseet put()ne bloque jamais, c'est donc la partie hack. Mais cela fonctionne bien avec l'utilisation de la file d'attente par TPE, donc je ne vois aucun problème à faire cela.

Voici le code:

// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    private static final long serialVersionUID = -6903933921423432194L;
    @Override
    public boolean offer(Runnable e) {
        // Offer it to the queue if there is 0 items already queued, else
        // return false so the TPE will add another thread. If we return false
        // and max threads have been reached then the RejectedExecutionHandler
        // will be called which will do the put into the queue.
        if (size() == 0) {
            return super.offer(e);
        } else {
            return false;
        }
    }
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
        60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            // This does the actual put into the queue. Once the max threads
            //  have been reached, the tasks will then queue up.
            executor.getQueue().put(r);
            // we do this after the put() to stop race conditions
            if (executor.isShutdown()) {
                throw new RejectedExecutionException(
                    "Task " + r + " rejected from " + e);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    }
});

Avec ce mécanisme, lorsque je soumets des tâches à la file d'attente, la ThreadPoolExecutorvolonté:

  1. Échelle initialement le nombre de threads jusqu'à la taille du noyau (ici 1).
  2. Offrez-le à la file d'attente. Si la file d'attente est vide, elle sera mise en file d'attente pour être gérée par les threads existants.
  3. Si la file d'attente contient déjà un ou plusieurs éléments, le offer(...)renvoie false.
  4. Si false est renvoyé, augmentez le nombre de threads dans le pool jusqu'à ce qu'ils atteignent le nombre maximum (ici 50).
  5. Si au maximum, il appelle le RejectedExecutionHandler
  6. Les RejectedExecutionHandlermet alors la tâche dans la file d' attente à traiter par le premier fil disponible dans l' ordre FIFO.

Bien que dans mon exemple de code ci-dessus, la file d'attente soit illimitée, vous pouvez également la définir comme une file d'attente limitée. Par exemple, si vous ajoutez une capacité de 1000 à la, LinkedBlockingQueuealors cela:

  1. mettre à l'échelle les fils au maximum
  2. puis filez jusqu'à ce qu'il soit plein de 1000 tâches
  3. puis bloquez l'appelant jusqu'à ce que l'espace soit disponible dans la file d'attente.

De plus, si vous deviez utiliser offer(...)dans, RejectedExecutionHandlervous pouvez utiliser la offer(E, long, TimeUnit)méthode à la place avec Long.MAX_VALUEcomme délai d'expiration.

Avertissement:

Si vous vous attendez à ce que des tâches soient ajoutées à l'exécuteur après son arrêt, vous voudrez peut-être être plus intelligent en supprimant RejectedExecutionExceptionnotre coutume RejectedExecutionHandlerlorsque le service d'exécuteur a été arrêté. Merci à @RaduToader pour l'avoir signalé.

Éditer:

Une autre modification de cette réponse pourrait être de demander au TPE s'il y a des threads inactifs et de ne mettre l'élément en file d'attente que s'il y en a. Vous devrez créer une vraie classe pour cela et y ajouter une ourQueue.setThreadPoolExecutor(tpe);méthode.

Ensuite, votre offer(...)méthode pourrait ressembler à quelque chose comme:

  1. Vérifiez si le, tpe.getPoolSize() == tpe.getMaximumPoolSize()auquel cas il suffit d'appeler super.offer(...).
  2. Sinon, tpe.getPoolSize() > tpe.getActiveCount()appelez alors super.offer(...)car il semble y avoir des threads inactifs.
  3. Sinon, retournez falseà la fourchette d'un autre fil.

Peut être ça:

int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
    return super.offer(e);
} else {
    return false;
}

Notez que les méthodes get sur TPE sont coûteuses car elles accèdent aux volatilechamps ou (dans le cas de getActiveCount()) verrouillent le TPE et parcourent la liste des threads. En outre, il existe des conditions de concurrence qui peuvent entraîner la mise en file d'attente incorrecte d'une tâche ou la fourche d'un autre thread lorsqu'il y avait un thread inactif.

gris
la source
J'ai également lutté avec le même problème, j'ai fini par remplacer la méthode d'exécution. Mais c'est vraiment une bonne solution. :)
Batty
Même si je n'aime pas l'idée de rompre le contrat de Queuepour y parvenir, vous n'êtes certainement pas seul dans votre idée: groovy-programming.com/post/26923146865
bstempi
3
N'avez-vous pas une bizarrerie ici en ce que les deux premières tâches seront mises en file d'attente, et seulement après l'apparition de nouveaux threads? Par exemple, si votre thread principal est occupé par une seule tâche de longue durée et que vous appelez execute(runnable), il runnableest simplement ajouté à la file d'attente. Si vous appelez execute(secondRunnable), puis secondRunnableest ajouté à la file d'attente. Mais maintenant, si vous appelez execute(thirdRunnable), puis thirdRunnablesera exécuté dans un nouveau thread. Le runnableet secondRunnablene fonctionne une fois thirdRunnable(ou la tâche d' origine-course longue) sont finis.
Robert Tupelo-Schneck
1
Oui, Robert a raison, dans un environnement hautement multithread, la file d'attente augmente parfois alors qu'il y a des threads libres à utiliser. La solution ci-dessous qui étend TPE - fonctionne beaucoup mieux. Je pense que la suggestion de Robert devrait être marquée comme réponse, même si le hack ci-dessus est intéressant
Wanna Know All
1
Le "RejectedExecutionHandler" a aidé l'exécuteur à l'arrêt. Maintenant, vous êtes obligé d'utiliser shutdownNow () car shutdown () n'empêche pas l'ajout de nouvelles tâches (à cause de reque)
Radu Toader
28

Définissez la taille du noyau et la taille maximale sur la même valeur, et autorisez la suppression des threads principaux du pool avec allowCoreThreadTimeOut(true).

Flavio
la source
+1 Oui, j'y ai pensé mais je voulais toujours avoir la fonction core-threads. Je ne voulais pas que le pool de threads passe à 0 thread pendant les périodes de dormance. Je vais modifier ma question pour le souligner. Mais excellent point.
Gris
Je vous remercie! C'est juste le moyen le plus simple de le faire.
Dmitry Ovchinnikov
28

J'ai déjà deux autres réponses à cette question, mais je soupçonne que celle-ci est la meilleure.

Il est basé sur la technique de la réponse actuellement acceptée , à savoir:

  1. Remplacez la offer()méthode de la file d'attente pour (parfois) retourner false,
  2. ce qui provoque le ThreadPoolExecutorlancement d'un nouveau thread ou le rejet de la tâche, et
  3. définissez le RejectedExecutionHandlerpour mettre réellement la tâche en file d'attente en cas de rejet.

Le problème est de savoir quand offer()doit renvoyer false. La réponse actuellement acceptée retourne false lorsque la file d'attente contient quelques tâches, mais comme je l'ai souligné dans mon commentaire, cela provoque des effets indésirables. Sinon, si vous retournez toujours false, vous continuerez à générer de nouveaux threads même lorsque vous avez des threads en attente dans la file d'attente.

La solution est d'utiliser Java 7 LinkedTransferQueueet d'avoir un offer()appel tryTransfer(). Lorsqu'il y a un thread consommateur en attente, la tâche sera simplement transmise à ce thread. Sinon, offer()retournera false et ThreadPoolExecutorcréera un nouveau thread.

    BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
        @Override
        public boolean offer(Runnable e) {
            return tryTransfer(e);
        }
    };
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
    threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });
Robert Tupelo-Schneck
la source
Je dois être d'accord, cela me semble le plus propre. Le seul inconvénient de la solution est que LinkedTransferQueue est illimité afin que vous n'obteniez pas une file d'attente de tâches limitée par la capacité sans travail supplémentaire.
Yeroc
Il y a un problème lorsque la piscine atteint sa taille maximale. Supposons que le pool soit mis à l'échelle jusqu'à la taille maximale et que chaque thread exécute actuellement une tâche, lorsque runnable est soumis, cette offre impl retournera false et ThreadPoolExecutor essaiera d'ajouter un thread de travail, mais le pool a déjà atteint son maximum, donc runnable sera simplement rejeté. Selon le lostExceHandler que vous avez écrit, il sera à nouveau proposé dans la file d'attente, ce qui entraînera à nouveau cette danse de singe.
Sudheera
1
@Sudheera, je crois que vous vous trompez. queue.offer(), car il appelle réellement LinkedTransferQueue.tryTransfer(), retournera false et ne mettra pas la tâche en file d'attente. Cependant les RejectedExecutionHandlerappels queue.put(), qui n'échouent pas et mettent la tâche en file d'attente.
Robert Tupelo-Schneck
1
@ RobertTupelo-Schneck extrêmement utile et agréable!
Eugene
1
@ RobertTupelo-Schneck Fonctionne comme un charme! Je ne sais pas pourquoi il n'y a pas quelque chose comme ça hors de la boîte dans le java
Georgi Peev
7

Remarque: je préfère maintenant et recommande mon autre réponse .

Voici une version qui me semble beaucoup plus simple: augmentez le corePoolSize (jusqu'à la limite de maximumPoolSize) chaque fois qu'une nouvelle tâche est exécutée, puis diminuez le corePoolSize (jusqu'à la limite de la "taille du pool de base" spécifiée par l'utilisateur) chaque fois qu'un la tâche est terminée.

Pour le dire autrement, gardez une trace du nombre de tâches en cours d'exécution ou mises en file d'attente et assurez-vous que corePoolSize est égal au nombre de tâches tant qu'il se situe entre la "taille du pool de cœurs" spécifiée par l'utilisateur et le maximumPoolSize.

public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
    private int userSpecifiedCorePoolSize;
    private int taskCount;

    public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        userSpecifiedCorePoolSize = corePoolSize;
    }

    @Override
    public void execute(Runnable runnable) {
        synchronized (this) {
            taskCount++;
            setCorePoolSizeToTaskCountWithinBounds();
        }
        super.execute(runnable);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
        super.afterExecute(runnable, throwable);
        synchronized (this) {
            taskCount--;
            setCorePoolSizeToTaskCountWithinBounds();
        }
    }

    private void setCorePoolSizeToTaskCountWithinBounds() {
        int threads = taskCount;
        if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
        if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
        setCorePoolSize(threads);
    }
}

Telle qu'elle est écrite, la classe ne prend pas en charge la modification du corePoolSize ou du maximumPoolSize spécifié par l'utilisateur après la construction, et ne prend pas en charge la manipulation de la file d'attente de travail directement ou via remove()ou purge().

Robert Tupelo-Schneck
la source
Je l'aime sauf pour les synchronizedblocs. Pouvez-vous appeler la file d'attente pour obtenir le nombre de tâches. Ou peut-être utiliser un AtomicInteger?
Gris du
Je voulais les éviter, mais le problème est le suivant. S'il y a un certain nombre d' execute()appels dans des threads séparés, chacun va (1) déterminer combien de threads sont nécessaires, (2) setCorePoolSizeà ce numéro et (3) appeler super.execute(). Si les étapes (1) et (2) ne sont pas synchronisées, je ne suis pas sûr de savoir comment empêcher un ordre malheureux où vous définissez la taille du pool de base sur un nombre inférieur après un nombre plus élevé. Avec un accès direct au champ superclasse, cela pourrait être fait à la place en utilisant compare-and-set, mais je ne vois pas de moyen propre de le faire dans une sous-classe sans synchronisation.
Robert Tupelo-Schneck
Je pense que les pénalités pour cette condition de course sont relativement faibles tant que le taskCountchamp est valide (c'est-à-dire a AtomicInteger). Si deux threads recalculent la taille du pool immédiatement après l'autre, il doit obtenir les valeurs appropriées. Si le 2ème réduit les threads principaux, il doit avoir vu une baisse dans la file d'attente ou quelque chose du genre.
Gris du
1
Malheureusement, je pense que c'est pire que ça. Supposons que les tâches 10 et 11 appellent execute(). Chacun appellera atomicTaskCount.incrementAndGet()et obtiendra respectivement 10 et 11. Mais sans synchronisation (en obtenant le nombre de tâches et en définissant la taille du pool de cœurs), vous pourriez alors obtenir (1) la tâche 11 définit la taille du pool de cœurs sur 11, (2) la tâche 10 définit la taille du pool de cœurs sur 10, (3) la tâche 10 appelle super.execute(), (4) la tâche 11 appelle super.execute()et est mise en file d'attente.
Robert Tupelo-Schneck
2
J'ai fait des tests sérieux à cette solution et c'est clairement la meilleure. Dans un environnement hautement multithread, il sera toujours parfois mis en file d'attente lorsqu'il y a des threads libres (en raison de la nature TPE.execute à thread libre), mais cela arrive rarement, par opposition à la solution marquée comme réponse, où la condition de concurrence a plus de chances de se produit, donc cela se produit à peu près à chaque exécution multi-thread.
Wanna Know All
6

Nous avons une sous-classe de ThreadPoolExecutorqui prend un supplément creationThresholdet remplace execute.

public void execute(Runnable command) {
    super.execute(command);
    final int poolSize = getPoolSize();
    if (poolSize < getMaximumPoolSize()) {
        if (getQueue().size() > creationThreshold) {
            synchronized (this) {
                setCorePoolSize(poolSize + 1);
                setCorePoolSize(poolSize);
            }
        }
    }
}

peut-être que cela aide aussi, mais le vôtre a l'air plus artistique bien sûr…

Ralf H
la source
Intéressant. Merci pour ça. En fait, je ne savais pas que la taille du noyau était modifiable.
Gris
Maintenant que j'y pense un peu plus, cette solution est meilleure que la mienne en termes de vérification de la taille de la file d'attente. J'ai modifié ma réponse pour que la offer(...)méthode ne retourne que falseconditionnellement. Merci!
Gris du
4

La réponse recommandée ne résout qu'un (1) seul des problèmes avec le pool de threads JDK:

  1. Les pools de threads JDK sont orientés vers la mise en file d'attente. Ainsi, au lieu de générer un nouveau thread, ils mettront la tâche en file d'attente. Ce n'est que si la file d'attente atteint sa limite que le pool de threads génère un nouveau thread.

  2. Le retrait de thread ne se produit pas lorsque la charge diminue. Par exemple, si nous avons une rafale de travaux atteignant le pool qui amène le pool à atteindre le maximum, suivi d'une charge légère de 2 tâches maximum à la fois, le pool utilisera tous les threads pour traiter la charge légère empêchant le retrait des threads. (seulement 2 threads seraient nécessaires…)

Mécontent du comportement ci-dessus, je suis allé de l'avant et mis en place une piscine pour surmonter les lacunes ci-dessus.

Pour résoudre 2) L'utilisation de la planification Lifo résout le problème. Cette idée a été présentée par Ben Maurer à la conférence ACM applicative 2015: Systems @ Facebook scale

Une nouvelle implémentation est donc née:

LifoThreadPoolExecutorSQP

Jusqu'à présent, cette implémentation améliore les performances d'exécution asynchrone pour ZEL .

L'implémentation est capable de réduire la surcharge du changement de contexte, offrant des performances supérieures pour certains cas d'utilisation.

J'espère que ça aide...

PS: JDK Fork Join Pool implémente ExecutorService et fonctionne comme un pool de threads "normal", l'implémentation est performante, elle utilise la planification des threads LIFO, mais il n'y a aucun contrôle sur la taille de la file d'attente interne, le délai de retrait ..., et surtout les tâches ne peuvent pas être interrompu lors de leur annulation

user2179737
la source
1
Dommage que cette implémentation ait autant de dépendances externes. Rendre inutile pour moi: - /
Martin L.
1
C'est un très bon point (2e). Malheureusement, l'implémentation n'est pas claire des dépendances externes, mais peut toujours être adoptée si vous le souhaitez.
Alexey Vlasov le
1

Remarque: je préfère maintenant et recommande mon autre réponse .

J'ai une autre proposition, suite à l'idée originale de changer la file d'attente pour retourner false. Dans celui-ci, toutes les tâches peuvent entrer dans la file d'attente, mais chaque fois qu'une tâche est mise en file d'attente après execute(), nous la suivons avec une tâche sentinelle no-op que la file d'attente rejette, provoquant l'apparition d'un nouveau thread, qui exécutera le no-op immédiatement suivi de quelque chose de la file d'attente.

Étant donné que les threads de travail peuvent interroger le LinkedBlockingQueuepour une nouvelle tâche, il est possible qu'une tâche soit mise en file d'attente même lorsqu'un thread est disponible. Pour éviter de générer de nouveaux threads même lorsqu'il y a des threads disponibles, nous devons garder une trace du nombre de threads en attente de nouvelles tâches dans la file d'attente, et ne générer un nouveau thread que lorsqu'il y a plus de tâches dans la file d'attente que de threads en attente.

final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } };

final AtomicInteger waitingThreads = new AtomicInteger(0);

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    @Override
    public boolean offer(Runnable e) {
        // offer returning false will cause the executor to spawn a new thread
        if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get();
        else return super.offer(e);
    }

    @Override
    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.poll(timeout, unit);
        } finally {
            waitingThreads.decrementAndGet();
        }
    }

    @Override
    public Runnable take() throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.take();
        } finally {
            waitingThreads.decrementAndGet();
        }
    }
};

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) {
    @Override
    public void execute(Runnable command) {
        super.execute(command);
        if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP);
    }
};
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r == SENTINEL_NO_OP) return;
        else throw new RejectedExecutionException();            
    }
});
Robert Tupelo-Schneck
la source
0

La meilleure solution à laquelle je puisse penser est d'étendre.

ThreadPoolExecutorpropose quelques méthodes de hook: beforeExecuteet afterExecute. Dans votre extension, vous pouvez continuer à utiliser une file d'attente limitée pour alimenter les tâches et une deuxième file d'attente illimitée pour gérer le débordement. Lorsque quelqu'un appelle submit, vous pouvez tenter de placer la demande dans la file d'attente limitée. Si vous rencontrez une exception, il vous suffit de coller la tâche dans votre file d'attente de débordement. Vous pouvez ensuite utiliser le afterExecutehook pour voir s'il y a quelque chose dans la file d'attente de débordement après avoir terminé une tâche. De cette façon, l'exécuteur s'occupera d'abord du contenu de sa file d'attente limitée, et tirera automatiquement de cette file d'attente illimitée lorsque le temps le permettra.

Cela semble plus de travail que votre solution, mais au moins cela n'implique pas de donner aux files d'attente des comportements inattendus. J'imagine également qu'il existe un meilleur moyen de vérifier l'état de la file d'attente et des threads plutôt que de se fier aux exceptions, qui sont assez lentes à lancer.

bstempi
la source
Je n'aime pas cette solution. Je suis presque sûr que ThreadPoolExecutor n'a pas été conçu pour l'héritage.
scottb
Il existe en fait un exemple d'extension directement dans le JavaDoc. Ils déclarent que la plupart implémenteront probablement simplement les méthodes de hook, mais ils vous disent ce que vous devez rechercher d'autre lors de l'extension.
bstempi
0

Remarque: pour JDK ThreadPoolExecutor lorsque vous avez une file d'attente limitée, vous créez uniquement de nouveaux threads lorsque l'offre renvoie false. Vous pouvez obtenir quelque chose d'utile avec CallerRunsPolicy qui crée un peu de BackPressure et appelle directement run dans le thread de l'appelant.

J'ai besoin des tâches à exécuter à partir de fils créés par la piscine et ont une file d' attente ubounded pour la planification, alors que le nombre de threads dans le pool peut se développer ou se rétrécir entre corePoolSize et maximumPoolSize donc ...

J'ai fini par faire un copier-coller complet de ThreadPoolExecutor et changer un peu la méthode d'exécution car malheureusement cela ne pouvait pas être fait par extension (cela appelle des méthodes privées).

Je ne voulais pas créer de nouveaux threads juste immédiatement quand une nouvelle demande arrive et que tous les threads sont occupés (car j'ai en général des tâches de courte durée). J'ai ajouté un seuil mais n'hésitez pas à le modifier selon vos besoins (peut-être que pour la plupart des IO, il est préférable de supprimer ce seuil)

private final AtomicInteger activeWorkers = new AtomicInteger(0);
private volatile double threshold = 0.7d;

protected void beforeExecute(Thread t, Runnable r) {
    activeWorkers.incrementAndGet();
}
protected void afterExecute(Runnable r, Throwable t) {
    activeWorkers.decrementAndGet();
}
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        if (isRunning(c) && this.workQueue.offer(command)) {
            int recheck = this.ctl.get();
            if (!isRunning(recheck) && this.remove(command)) {
                this.reject(command);
            } else if (workerCountOf(recheck) == 0) {
                this.addWorker((Runnable) null, false);
            }
            //>>change start
            else if (workerCountOf(recheck) < maximumPoolSize //
                && (activeWorkers.get() > workerCountOf(recheck) * threshold
                    || workQueue.size() > workerCountOf(recheck) * threshold)) {
                this.addWorker((Runnable) null, false);
            }
            //<<change end
        } else if (!this.addWorker(command, false)) {
            this.reject(command);
        }
    }
Radu Toader
la source