Je veux créer un ThreadPoolExecutor
tel que lorsqu'il a atteint sa taille maximale et que la file d'attente est pleine, la submit()
méthode se bloque lors de la tentative d'ajout de nouvelles tâches. Dois-je implémenter une personnalisation RejectedExecutionHandler
pour cela ou existe-t-il un moyen de le faire à l'aide d'une bibliothèque Java standard?
java
concurrency
executor
Point fixe
la source
la source
Réponses:
Une des solutions possibles que je viens de trouver:
Y a-t-il d'autres solutions? Je préférerais quelque chose basé sur
RejectedExecutionHandler
car cela semble être un moyen standard de gérer de telles situations.la source
throw e;
n'est PAS dans le livre. JCIP est correct!Vous pouvez utiliser ThreadPoolExecutor et un blockingQueue:
la source
Vous devez utiliser le
CallerRunsPolicy
, qui exécute la tâche rejetée dans le thread appelant. De cette façon, il ne peut pas soumettre de nouvelles tâches à l'exécuteur jusqu'à ce que cette tâche soit terminée, à quel point il y aura des threads de pool libres ou le processus se répétera.http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html
À partir de la documentation:
Assurez-vous également d'utiliser une file d'attente limitée, telle que ArrayBlockingQueue, lors de l'appel du
ThreadPoolExecutor
constructeur. Sinon, rien ne sera rejeté.Edit: en réponse à votre commentaire, définissez la taille de l'ArrayBlockingQueue pour qu'elle soit égale à la taille maximale du pool de threads et utilisez la méthode AbortPolicy.
Edit 2: Ok, je vois où tu veux en venir. Qu'en est-il de ceci: remplacer la
beforeExecute()
méthode pour vérifier quegetActiveCount()
cela ne dépasse pasgetMaximumPoolSize()
, et si c'est le cas, dormir et réessayer?la source
Hibernate a un
BlockPolicy
qui est simple et peut faire ce que vous voulez:Voir: Executors.java
la source
ThreadPoolExecutor
même dit littéralement: "La méthode getQueue () permet d'accéder à la file d'attente de travail à des fins de surveillance et de débogage. L'utilisation de cette méthode à d'autres fins est fortement déconseillée.". Que cela soit disponible dans une bibliothèque si largement connue, est absolument triste à voir.La
BoundedExecutor
réponse citée ci-dessus de Java Concurrency en pratique ne fonctionne correctement que si vous utilisez une file d'attente illimitée pour l'Executor, ou si la limite du sémaphore n'est pas supérieure à la taille de la file d'attente. Le sémaphore est un état partagé entre le thread soumissionnaire et les threads du pool, ce qui permet de saturer l'exécuteur même si la taille de la file d'attente <liée <= (taille de la file d'attente + taille du pool).L'utilisation
CallerRunsPolicy
n'est valide que si vos tâches ne s'exécutent pas indéfiniment, auquel cas votre fil de soumission resterarejectedExecution
indéfiniment, et une mauvaise idée si vos tâches prennent beaucoup de temps à s'exécuter, car le fil de soumission ne peut pas soumettre de nouvelles tâches ou faire autre chose s'il exécute une tâche elle-même.Si ce n'est pas acceptable, je suggère de vérifier la taille de la file d'attente limitée de l'exécuteur avant de soumettre une tâche. Si la file d'attente est pleine, attendez quelques instants avant d'essayer de soumettre à nouveau. Le débit en souffrira, mais je suggère que c'est une solution plus simple que la plupart des autres solutions proposées et vous êtes assuré qu'aucune tâche ne sera rejetée.
la source
Je sais, c'est un hack, mais à mon avis, le hack le plus propre entre ceux proposés ici ;-)
Étant donné que ThreadPoolExecutor utilise la file d'attente de blocage "offer" au lieu de "put", nous allons remplacer le comportement de "offer" de la file de blocage:
Je l'ai testé et il semble fonctionner. La mise en œuvre d'une politique de temporisation est laissée à l'exercice du lecteur.
la source
La classe suivante s'enroule autour d'un ThreadPoolExecutor et utilise un sémaphore pour bloquer, puis la file d'attente de travail est pleine:
Cette classe wrapper est basée sur une solution donnée dans le livre Java Concurrency in Practice de Brian Goetz. La solution dans le livre ne prend que deux paramètres de constructeur: un
Executor
et une borne utilisée pour le sémaphore. Ceci est montré dans la réponse donnée par Fixpoint. Il y a un problème avec cette approche: elle peut entrer dans un état où les threads du pool sont occupés, la file d'attente est pleine, mais le sémaphore vient de publier un permis. (semaphore.release()
dans le bloc finally). Dans cet état, une nouvelle tâche peut récupérer l'autorisation qui vient d'être publiée, mais elle est rejetée car la file d'attente des tâches est pleine. Bien sûr, ce n'est pas quelque chose que vous voulez; vous souhaitez bloquer dans ce cas.Pour résoudre ce problème, nous devons utiliser une file d'attente illimitée , comme le mentionne clairement JCiP. Le sémaphore agit comme une garde, donnant l'effet d'une taille de file d'attente virtuelle. Cela a pour effet secondaire qu'il est possible que l'unité puisse contenir des
maxPoolSize + virtualQueueSize + maxPoolSize
tâches. Pourquoi donc? En raison de lasemaphore.release()
dans le bloc finalement. Si tous les threads du pool appellent cette instruction en même temps, lesmaxPoolSize
autorisations sont libérées, permettant au même nombre de tâches d'entrer dans l'unité. Si nous utilisions une file d'attente limitée, elle serait toujours pleine, ce qui entraînerait le rejet d'une tâche. Maintenant, comme nous savons que cela ne se produit que lorsqu'un thread de pool est presque terminé, ce n'est pas un problème. Nous savons que le thread du pool ne se bloquera pas, donc une tâche sera bientôt retirée de la file d'attente.Vous pouvez cependant utiliser une file d'attente limitée. Assurez-vous simplement que sa taille est égale
virtualQueueSize + maxPoolSize
. De plus grandes tailles sont inutiles, le sémaphore empêchera de laisser entrer plus d'éléments. Des tailles plus petites entraîneront des tâches rejetées. Le risque que les tâches soient rejetées augmente à mesure que la taille diminue. Par exemple, supposons que vous souhaitiez un exécuteur limité avec maxPoolSize = 2 et virtualQueueSize = 5. Ensuite, prenez un sémaphore avec 5 + 2 = 7 autorisations et une taille de file d'attente réelle de 5 + 2 = 7. Le nombre réel de tâches qui peuvent être dans l'unité est alors 2 + 5 + 2 = 9. Lorsque l'exécuteur est plein (5 tâches en file d'attente, 2 dans le pool de threads, donc 0 permis disponible) et TOUS les threads du pool libèrent leurs autorisations, alors exactement 2 autorisations peuvent être prises par les tâches entrantes.Maintenant, la solution de JCiP est quelque peu lourde à utiliser car elle n'applique pas toutes ces contraintes (file d'attente illimitée, ou limitée par ces restrictions mathématiques, etc.). Je pense que cela ne sert que de bon exemple pour démontrer comment vous pouvez créer de nouvelles classes thread-safe basées sur les parties déjà disponibles, mais pas en tant que classe réutilisable à part entière. Je ne pense pas que cette dernière était l'intention de l'auteur.
la source
vous pouvez utiliser un RejectedExecutionHandler personnalisé comme celui-ci
la source
Créez votre propre file d'attente de blocage à utiliser par l'exécuteur, avec le comportement de blocage que vous recherchez, tout en renvoyant toujours la capacité restante disponible (en vous assurant que l'exécuteur n'essaiera pas de créer plus de threads que son pool de base, ou de déclencher le gestionnaire de rejet).
Je pense que cela vous permettra d'obtenir le comportement de blocage que vous recherchez. Un gestionnaire de rejet ne correspondra jamais à la facture, car cela indique que l'exécuteur ne peut pas effectuer la tâche. Ce que je pourrais imaginer, c'est que vous obtenez une forme d '«attente occupée» dans le gestionnaire. Ce n'est pas ce que vous voulez, vous voulez une file d'attente pour l'exécuteur qui bloque l'appelant ...
la source
ThreadPoolExecutor
utilise uneoffer
méthode pour ajouter des tâches à la file d'attente. Si je créais une personnalisationBlockingQueue
qui bloqueoffer
, cela rompraitBlockingQueue
le contrat.ThreadPoolExecutor
été mis en œuvre pour utiliseroffer
et nonput
(la version de blocage)? De plus, s'il y avait un moyen pour le code client de dire lequel utiliser quand, beaucoup de gens essayant de lancer des solutions personnalisées auraient été soulagésPour éviter les problèmes avec la solution @FixPoint. On pourrait utiliser ListeningExecutorService et libérer le sémaphore onSuccess et onFailure dans FutureCallback.
la source
Runnable
car ces méthodes sont toujours appelées avant le nettoyage des travailleurs dans la normaleThreadPoolExecutor
. Cela signifie que vous devrez toujours gérer les exceptions de rejet.Récemment, j'ai trouvé que cette question avait le même problème. L'OP ne le dit pas explicitement, mais nous ne voulons pas utiliser le
RejectedExecutionHandler
qui exécute une tâche sur le thread de l'émetteur, car cela sous-utilisera les threads de travail si cette tâche est longue.En lisant toutes les réponses et commentaires, en particulier la solution défectueuse avec le sémaphore ou en utilisant,
afterExecute
j'ai examiné de plus près le code de ThreadPoolExecutor pour voir s'il y avait un moyen de sortir. J'ai été étonné de voir qu'il y avait plus de 2000 lignes de code (commentées), dont certaines me donnent le vertige . Compte tenu de l'exigence plutôt simple que j'ai en fait - un producteur, plusieurs consommateurs, laisser le producteur bloquer quand aucun consommateur ne peut travailler - j'ai décidé de lancer ma propre solution. Ce n'est pas unExecutorService
mais juste unExecutor
. Et il n'adapte pas le nombre de threads à la charge de travail, mais ne contient qu'un nombre fixe de threads, ce qui correspond également à mes besoins. Voici le code. N'hésitez pas à vous en plaindre :-)la source
Je pense qu'il existe un moyen assez élégant de résoudre ce problème en utilisant
java.util.concurrent.Semaphore
et en déléguant le comportement deExecutor.newFixedThreadPool
. Le nouveau service exécuteur n'exécutera une nouvelle tâche que s'il y a un thread pour le faire. Le blocage est géré par Semaphore avec un nombre d'autorisations égal au nombre de threads. Lorsqu'une tâche est terminée, elle renvoie un permis.la source
J'avais le même besoin dans le passé: une sorte de file d'attente de blocage avec une taille fixe pour chaque client adossée à un pool de threads partagé. J'ai fini par écrire mon propre type de ThreadPoolExecutor:
UserThreadPoolExecutor (file d'attente de blocage (par client) + threadpool (partagé entre tous les clients))
Voir: https://github.com/d4rxh4wx/UserThreadPoolExecutor
Chaque UserThreadPoolExecutor reçoit un nombre maximum de threads d'un ThreadPoolExecutor partagé
Chaque UserThreadPoolExecutor peut:
la source
J'ai trouvé cette politique de rejet dans le client de recherche élastique. Il bloque le fil de l'appelant sur la file d'attente de blocage. Code ci-dessous-
la source
J'ai récemment eu besoin de réaliser quelque chose de similaire, mais sur un
ScheduledExecutorService
.Je devais également m'assurer de gérer le retard transmis à la méthode et de m'assurer que soit la tâche est soumise pour s'exécuter au moment attendu par l'appelant, soit échoue tout simplement en lançant un fichier
RejectedExecutionException
.D'autres méthodes de
ScheduledThreadPoolExecutor
pour exécuter ou soumettre une tâche en interne appellent#schedule
qui à leur tour invoqueront toujours les méthodes remplacées.J'ai le code ici, j'apprécierai vos commentaires. https://github.com/AmitabhAwasthi/BlockingScheduler
la source
Voici la solution qui semble très bien fonctionner. Il s'appelle NotifyingBlockingThreadPoolExecutor .
Programme de démonstration.
Edit: Il y a un problème avec ce code, la méthode await () est boguée. L'appel de shutdown () + awaitTermination () semble fonctionner correctement.
la source
Je n'aime pas toujours la CallerRunsPolicy, d'autant plus qu'elle permet à la tâche rejetée de «sauter la file d'attente» et d'être exécutée avant les tâches qui ont été soumises plus tôt. De plus, l'exécution de la tâche sur le thread appelant peut prendre beaucoup plus de temps que d'attendre que le premier emplacement devienne disponible.
J'ai résolu ce problème en utilisant un RejectedExecutionHandler personnalisé, qui bloque simplement le thread appelant pendant un petit moment, puis essaie à nouveau de soumettre la tâche:
Cette classe peut simplement être utilisée dans l'exécuteur du pool de threads en tant que RejectedExecutinHandler comme n'importe quel autre, par exemple:
Le seul inconvénient que je vois est que le thread appelant peut être verrouillé un peu plus longtemps que strictement nécessaire (jusqu'à 250 ms). De plus, puisque cet exécuteur est effectivement appelé de manière récursive, de très longues attentes pour qu'un thread soit disponible (heures) peuvent entraîner un débordement de pile.
Néanmoins, j'aime personnellement cette méthode. Il est compact, facile à comprendre et fonctionne bien.
la source