J'ai été frustré pendant un certain temps par le comportement par défaut ThreadPoolExecutor
qui soutient les ExecutorService
pools de threads que beaucoup d'entre nous utilisent. Pour citer les Javadocs:
S'il y a plus de threads corePoolSize mais moins que maximumPoolSize en cours d'exécution, un nouveau thread sera créé uniquement si la file d'attente est pleine .
Cela signifie que si vous définissez un pool de threads avec le code suivant, il ne démarrera jamais le 2ème thread car le LinkedBlockingQueue
est illimité.
ExecutorService threadPool =
new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));
Ce n'est que si vous avez une file d'attente limitée et que la file d'attente est pleine que les threads au-dessus du numéro de base démarrent. Je soupçonne qu'un grand nombre de programmeurs multithread juniors Java ne sont pas conscients de ce comportement du ThreadPoolExecutor
.
Maintenant, j'ai un cas d'utilisation spécifique où ce n'est pas optimal. Je cherche des moyens, sans écrire ma propre classe TPE, de contourner ce problème.
Mes besoins concernent un service Web qui effectue des rappels vers un tiers éventuellement peu fiable.
- Je ne veux pas faire le rappel de manière synchrone avec la requête Web, donc je veux utiliser un pool de threads.
- J'en reçois généralement quelques-uns par minute, donc je ne veux pas avoir
newFixedThreadPool(...)
un grand nombre de threads qui sont pour la plupart dormants. - De temps en temps, j'obtiens une rafale de ce trafic et je veux augmenter le nombre de threads à une valeur maximale (disons 50).
- Je dois faire un meilleur essai pour faire tous les rappels donc je veux mettre en file d'attente tous ceux supplémentaires au-dessus de 50. Je ne veux pas submerger le reste de mon serveur Web en utilisant un
newCachedThreadPool()
.
Comment puis-je contourner cette limitation dans ThreadPoolExecutor
laquelle la file d'attente doit être limitée et pleine avant que d' autres threads ne soient démarrés? Comment puis-je le faire démarrer plus de threads avant de mettre des tâches en file d'attente?
Éditer:
@Flavio fait un bon point sur l'utilisation de ThreadPoolExecutor.allowCoreThreadTimeOut(true)
pour que les threads principaux expirent et se terminent . J'ai pensé à cela mais je voulais toujours la fonctionnalité core-threads. Je ne voulais pas que le nombre de threads dans le pool tombe si possible en dessous de la taille du noyau.
Réponses:
Je crois avoir enfin trouvé une solution un peu élégante (peut-être un peu hacky) à cette limitation avec
ThreadPoolExecutor
. Il consiste à étendreLinkedBlockingQueue
l'avoir de retourfalse
pourqueue.offer(...)
quand il y a déjà quelques tâches en attente. Si les threads actuels ne suivent pas les tâches en file d'attente, le TPE ajoutera des threads supplémentaires. Si le pool est déjà à max threads, alors leRejectedExecutionHandler
sera appelé. C'est le gestionnaire qui fait ensuite leput(...)
dans la file d'attente.Il est certainement étrange d'écrire une file d'attente où
offer(...)
peut revenirfalse
etput()
ne bloque jamais, c'est donc la partie hack. Mais cela fonctionne bien avec l'utilisation de la file d'attente par TPE, donc je ne vois aucun problème à faire cela.Voici le code:
Avec ce mécanisme, lorsque je soumets des tâches à la file d'attente, la
ThreadPoolExecutor
volonté:offer(...)
renvoie false.RejectedExecutionHandler
RejectedExecutionHandler
met alors la tâche dans la file d' attente à traiter par le premier fil disponible dans l' ordre FIFO.Bien que dans mon exemple de code ci-dessus, la file d'attente soit illimitée, vous pouvez également la définir comme une file d'attente limitée. Par exemple, si vous ajoutez une capacité de 1000 à la,
LinkedBlockingQueue
alors cela:De plus, si vous deviez utiliser
offer(...)
dans,RejectedExecutionHandler
vous pouvez utiliser laoffer(E, long, TimeUnit)
méthode à la place avecLong.MAX_VALUE
comme délai d'expiration.Avertissement:
Si vous vous attendez à ce que des tâches soient ajoutées à l'exécuteur après son arrêt, vous voudrez peut-être être plus intelligent en supprimant
RejectedExecutionException
notre coutumeRejectedExecutionHandler
lorsque le service d'exécuteur a été arrêté. Merci à @RaduToader pour l'avoir signalé.Éditer:
Une autre modification de cette réponse pourrait être de demander au TPE s'il y a des threads inactifs et de ne mettre l'élément en file d'attente que s'il y en a. Vous devrez créer une vraie classe pour cela et y ajouter une
ourQueue.setThreadPoolExecutor(tpe);
méthode.Ensuite, votre
offer(...)
méthode pourrait ressembler à quelque chose comme:tpe.getPoolSize() == tpe.getMaximumPoolSize()
auquel cas il suffit d'appelersuper.offer(...)
.tpe.getPoolSize() > tpe.getActiveCount()
appelez alorssuper.offer(...)
car il semble y avoir des threads inactifs.false
à la fourchette d'un autre fil.Peut être ça:
Notez que les méthodes get sur TPE sont coûteuses car elles accèdent aux
volatile
champs ou (dans le cas degetActiveCount()
) verrouillent le TPE et parcourent la liste des threads. En outre, il existe des conditions de concurrence qui peuvent entraîner la mise en file d'attente incorrecte d'une tâche ou la fourche d'un autre thread lorsqu'il y avait un thread inactif.la source
Queue
pour y parvenir, vous n'êtes certainement pas seul dans votre idée: groovy-programming.com/post/26923146865execute(runnable)
, ilrunnable
est simplement ajouté à la file d'attente. Si vous appelezexecute(secondRunnable)
, puissecondRunnable
est ajouté à la file d'attente. Mais maintenant, si vous appelezexecute(thirdRunnable)
, puisthirdRunnable
sera exécuté dans un nouveau thread. Lerunnable
etsecondRunnable
ne fonctionne une foisthirdRunnable
(ou la tâche d' origine-course longue) sont finis.Définissez la taille du noyau et la taille maximale sur la même valeur, et autorisez la suppression des threads principaux du pool avec
allowCoreThreadTimeOut(true)
.la source
J'ai déjà deux autres réponses à cette question, mais je soupçonne que celle-ci est la meilleure.
Il est basé sur la technique de la réponse actuellement acceptée , à savoir:
offer()
méthode de la file d'attente pour (parfois) retourner false,ThreadPoolExecutor
lancement d'un nouveau thread ou le rejet de la tâche, etRejectedExecutionHandler
pour mettre réellement la tâche en file d'attente en cas de rejet.Le problème est de savoir quand
offer()
doit renvoyer false. La réponse actuellement acceptée retourne false lorsque la file d'attente contient quelques tâches, mais comme je l'ai souligné dans mon commentaire, cela provoque des effets indésirables. Sinon, si vous retournez toujours false, vous continuerez à générer de nouveaux threads même lorsque vous avez des threads en attente dans la file d'attente.La solution est d'utiliser Java 7
LinkedTransferQueue
et d'avoir unoffer()
appeltryTransfer()
. Lorsqu'il y a un thread consommateur en attente, la tâche sera simplement transmise à ce thread. Sinon,offer()
retournera false etThreadPoolExecutor
créera un nouveau thread.la source
queue.offer()
, car il appelle réellementLinkedTransferQueue.tryTransfer()
, retournera false et ne mettra pas la tâche en file d'attente. Cependant lesRejectedExecutionHandler
appelsqueue.put()
, qui n'échouent pas et mettent la tâche en file d'attente.Remarque: je préfère maintenant et recommande mon autre réponse .
Voici une version qui me semble beaucoup plus simple: augmentez le corePoolSize (jusqu'à la limite de maximumPoolSize) chaque fois qu'une nouvelle tâche est exécutée, puis diminuez le corePoolSize (jusqu'à la limite de la "taille du pool de base" spécifiée par l'utilisateur) chaque fois qu'un la tâche est terminée.
Pour le dire autrement, gardez une trace du nombre de tâches en cours d'exécution ou mises en file d'attente et assurez-vous que corePoolSize est égal au nombre de tâches tant qu'il se situe entre la "taille du pool de cœurs" spécifiée par l'utilisateur et le maximumPoolSize.
Telle qu'elle est écrite, la classe ne prend pas en charge la modification du corePoolSize ou du maximumPoolSize spécifié par l'utilisateur après la construction, et ne prend pas en charge la manipulation de la file d'attente de travail directement ou via
remove()
oupurge()
.la source
synchronized
blocs. Pouvez-vous appeler la file d'attente pour obtenir le nombre de tâches. Ou peut-être utiliser unAtomicInteger
?execute()
appels dans des threads séparés, chacun va (1) déterminer combien de threads sont nécessaires, (2)setCorePoolSize
à ce numéro et (3) appelersuper.execute()
. Si les étapes (1) et (2) ne sont pas synchronisées, je ne suis pas sûr de savoir comment empêcher un ordre malheureux où vous définissez la taille du pool de base sur un nombre inférieur après un nombre plus élevé. Avec un accès direct au champ superclasse, cela pourrait être fait à la place en utilisant compare-and-set, mais je ne vois pas de moyen propre de le faire dans une sous-classe sans synchronisation.taskCount
champ est valide (c'est-à-dire aAtomicInteger
). Si deux threads recalculent la taille du pool immédiatement après l'autre, il doit obtenir les valeurs appropriées. Si le 2ème réduit les threads principaux, il doit avoir vu une baisse dans la file d'attente ou quelque chose du genre.execute()
. Chacun appelleraatomicTaskCount.incrementAndGet()
et obtiendra respectivement 10 et 11. Mais sans synchronisation (en obtenant le nombre de tâches et en définissant la taille du pool de cœurs), vous pourriez alors obtenir (1) la tâche 11 définit la taille du pool de cœurs sur 11, (2) la tâche 10 définit la taille du pool de cœurs sur 10, (3) la tâche 10 appellesuper.execute()
, (4) la tâche 11 appellesuper.execute()
et est mise en file d'attente.Nous avons une sous-classe de
ThreadPoolExecutor
qui prend un supplémentcreationThreshold
et remplaceexecute
.peut-être que cela aide aussi, mais le vôtre a l'air plus artistique bien sûr…
la source
offer(...)
méthode ne retourne quefalse
conditionnellement. Merci!La réponse recommandée ne résout qu'un (1) seul des problèmes avec le pool de threads JDK:
Les pools de threads JDK sont orientés vers la mise en file d'attente. Ainsi, au lieu de générer un nouveau thread, ils mettront la tâche en file d'attente. Ce n'est que si la file d'attente atteint sa limite que le pool de threads génère un nouveau thread.
Le retrait de thread ne se produit pas lorsque la charge diminue. Par exemple, si nous avons une rafale de travaux atteignant le pool qui amène le pool à atteindre le maximum, suivi d'une charge légère de 2 tâches maximum à la fois, le pool utilisera tous les threads pour traiter la charge légère empêchant le retrait des threads. (seulement 2 threads seraient nécessaires…)
Mécontent du comportement ci-dessus, je suis allé de l'avant et mis en place une piscine pour surmonter les lacunes ci-dessus.
Pour résoudre 2) L'utilisation de la planification Lifo résout le problème. Cette idée a été présentée par Ben Maurer à la conférence ACM applicative 2015: Systems @ Facebook scale
Une nouvelle implémentation est donc née:
LifoThreadPoolExecutorSQP
Jusqu'à présent, cette implémentation améliore les performances d'exécution asynchrone pour ZEL .
L'implémentation est capable de réduire la surcharge du changement de contexte, offrant des performances supérieures pour certains cas d'utilisation.
J'espère que ça aide...
PS: JDK Fork Join Pool implémente ExecutorService et fonctionne comme un pool de threads "normal", l'implémentation est performante, elle utilise la planification des threads LIFO, mais il n'y a aucun contrôle sur la taille de la file d'attente interne, le délai de retrait ..., et surtout les tâches ne peuvent pas être interrompu lors de leur annulation
la source
Remarque: je préfère maintenant et recommande mon autre réponse .
J'ai une autre proposition, suite à l'idée originale de changer la file d'attente pour retourner false. Dans celui-ci, toutes les tâches peuvent entrer dans la file d'attente, mais chaque fois qu'une tâche est mise en file d'attente après
execute()
, nous la suivons avec une tâche sentinelle no-op que la file d'attente rejette, provoquant l'apparition d'un nouveau thread, qui exécutera le no-op immédiatement suivi de quelque chose de la file d'attente.Étant donné que les threads de travail peuvent interroger le
LinkedBlockingQueue
pour une nouvelle tâche, il est possible qu'une tâche soit mise en file d'attente même lorsqu'un thread est disponible. Pour éviter de générer de nouveaux threads même lorsqu'il y a des threads disponibles, nous devons garder une trace du nombre de threads en attente de nouvelles tâches dans la file d'attente, et ne générer un nouveau thread que lorsqu'il y a plus de tâches dans la file d'attente que de threads en attente.la source
La meilleure solution à laquelle je puisse penser est d'étendre.
ThreadPoolExecutor
propose quelques méthodes de hook:beforeExecute
etafterExecute
. Dans votre extension, vous pouvez continuer à utiliser une file d'attente limitée pour alimenter les tâches et une deuxième file d'attente illimitée pour gérer le débordement. Lorsque quelqu'un appellesubmit
, vous pouvez tenter de placer la demande dans la file d'attente limitée. Si vous rencontrez une exception, il vous suffit de coller la tâche dans votre file d'attente de débordement. Vous pouvez ensuite utiliser leafterExecute
hook pour voir s'il y a quelque chose dans la file d'attente de débordement après avoir terminé une tâche. De cette façon, l'exécuteur s'occupera d'abord du contenu de sa file d'attente limitée, et tirera automatiquement de cette file d'attente illimitée lorsque le temps le permettra.Cela semble plus de travail que votre solution, mais au moins cela n'implique pas de donner aux files d'attente des comportements inattendus. J'imagine également qu'il existe un meilleur moyen de vérifier l'état de la file d'attente et des threads plutôt que de se fier aux exceptions, qui sont assez lentes à lancer.
la source
Remarque: pour JDK ThreadPoolExecutor lorsque vous avez une file d'attente limitée, vous créez uniquement de nouveaux threads lorsque l'offre renvoie false. Vous pouvez obtenir quelque chose d'utile avec CallerRunsPolicy qui crée un peu de BackPressure et appelle directement run dans le thread de l'appelant.
J'ai besoin des tâches à exécuter à partir de fils créés par la piscine et ont une file d' attente ubounded pour la planification, alors que le nombre de threads dans le pool peut se développer ou se rétrécir entre corePoolSize et maximumPoolSize donc ...
J'ai fini par faire un copier-coller complet de ThreadPoolExecutor et changer un peu la méthode d'exécution car malheureusement cela ne pouvait pas être fait par extension (cela appelle des méthodes privées).
Je ne voulais pas créer de nouveaux threads juste immédiatement quand une nouvelle demande arrive et que tous les threads sont occupés (car j'ai en général des tâches de courte durée). J'ai ajouté un seuil mais n'hésitez pas à le modifier selon vos besoins (peut-être que pour la plupart des IO, il est préférable de supprimer ce seuil)
la source