ExecutorService qui interrompt les tâches après un délai d'expiration

93

Je recherche une implémentation ExecutorService qui peut être fournie avec un délai d'expiration. Les tâches soumises à ExecutorService sont interrompues si elles prennent plus de temps que le délai d’expiration. Mettre en œuvre une telle bête n'est pas une tâche si difficile, mais je me demande si quelqu'un connaît une implémentation existante.

Voici ce que je suis venu avec une partie de la discussion ci-dessous. Des commentaires?

import java.util.List;
import java.util.concurrent.*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        timeoutExecutor.shutdown();
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }
    }

    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            thread.interrupt();
        }
    }
}
Edward Dale
la source
Est-ce que «l'heure de début» du délai est celle de la soumission? Ou l'heure à laquelle la tâche commence à s'exécuter?
Tim Bender
Bonne question. Quand il commence à s'exécuter. Vraisemblablement en utilisant le protected void beforeExecute(Thread t, Runnable r)crochet.
Edward Dale
@ scompt.com utilisez-vous toujours cette solution ou a-t-elle été remplacée
Paul Taylor
@PaulTaylor Le travail dans lequel j'ai implémenté cette solution a été remplacé. :-)
Edward Dale
J'ai besoin exactement de cela, sauf a) j'ai besoin que mon service de planificateur principal soit un pool de threads avec un seul thread de service car j'ai besoin que mes tâches s'exécutent strictement simultanément et b) je dois être en mesure de spécifier la durée du délai d'expiration pour chaque tâche à la l'heure à laquelle la tâche est soumise. J'ai essayé de l'utiliser comme point de départ, mais en étendant ScheduledThreadPoolExecutor, mais je ne vois pas de moyen d'obtenir la durée de temporisation spécifiée qui doit être spécifiée au moment de la soumission de la tâche à la méthode beforeExecute. Toutes les suggestions appréciées avec reconnaissance!
Michael Ellis

Réponses:

89

Vous pouvez utiliser un ScheduledExecutorService pour cela. Tout d'abord, vous ne le soumettez qu'une seule fois pour commencer immédiatement et conserver l'avenir qui est créé. Après cela, vous pouvez soumettre une nouvelle tâche qui annulerait le futur conservé après un certain temps.

 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); 
 final Future handler = executor.submit(new Callable(){ ... });
 executor.schedule(new Runnable(){
     public void run(){
         handler.cancel();
     }      
 }, 10000, TimeUnit.MILLISECONDS);

Cela exécutera votre gestionnaire (fonctionnalité principale à interrompre) pendant 10 secondes, puis annulera (c'est-à-dire interrompra) cette tâche spécifique.

John Vint
la source
13
Idée intéressante, mais que se passe-t-il si la tâche se termine avant l'expiration du délai (ce qu'elle fait normalement)? Je préfère ne pas avoir des tonnes de tâches de nettoyage en attente d'exécuter uniquement pour savoir que la tâche qui leur a été attribuée est déjà terminée. Il faudrait un autre thread surveillant les Futures au fur et à mesure qu'ils terminent pour supprimer leurs tâches de nettoyage.
Edward Dale
3
L'exécuteur testamentaire ne planifiera cette annulation qu'une seule fois. Si la tâche est terminée, l'annulation est une non-opération et le travail continue inchangé. Il ne doit y avoir qu'un seul thread scheudling supplémentaire pour annuler les tâches et un thread pour les exécuter. Vous pourriez avoir deux exécuteurs, un pour soumettre vos tâches principales et un pour les annuler.
John Vint
3
C'est vrai, mais que se passe-t-il si le délai d'expiration est de 5 heures et que 10 000 tâches sont exécutées pendant cette période. Je voudrais éviter que tous ces no-ops ne prennent de la mémoire et provoquent des changements de contexte.
Edward Dale
1
@Scompt Pas nécessairement. Il y aurait 10k invocations future.cancel (), mais si le futur est terminé, alors l'annulation se terminera rapidement et ne fera aucun travail inutile. Si vous ne voulez pas d'invocations d'annulation supplémentaires de 10k, cela peut ne pas fonctionner, mais la quantité de travail effectuée lorsqu'une tâche est terminée est très faible.
John Vint
6
@John W.: Je viens de réaliser un autre problème avec votre implémentation. J'ai besoin du délai pour commencer lorsque la tâche commence l'exécution, comme je l'ai commenté plus tôt. Je pense que la seule façon de faire est d'utiliser le beforeExecutecrochet.
Edward Dale
6

Malheureusement, la solution est imparfaite. Il existe une sorte de bogue avec ScheduledThreadPoolExecutor, également signalé dans cette question : l'annulation d'une tâche soumise ne libère pas complètement les ressources mémoire associées à la tâche; les ressources ne sont libérées qu'à l'expiration de la tâche.

Si vous créez par conséquent un TimeoutThreadPoolExecutoravec un temps d'expiration assez long (une utilisation typique) et soumettez des tâches assez rapidement, vous finissez par remplir la mémoire - même si les tâches se sont effectivement terminées avec succès.

Vous pouvez voir le problème avec le programme de test (très grossier) suivant:

public static void main(String[] args) throws InterruptedException {
    ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, 
            new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES);
    //ExecutorService service = Executors.newFixedThreadPool(1);
    try {
        final AtomicInteger counter = new AtomicInteger();
        for (long i = 0; i < 10000000; i++) {
            service.submit(new Runnable() {
                @Override
                public void run() {
                    counter.incrementAndGet();
                }
            });
            if (i % 10000 == 0) {
                System.out.println(i + "/" + counter.get());
                while (i > counter.get()) {
                    Thread.sleep(10);
                }
            }
        }
    } finally {
        service.shutdown();
    }
}

Le programme épuise la mémoire disponible, bien qu'il attende que les Runnables générés soient terminés.

J'ai réfléchi à cela pendant un moment, mais je n'ai malheureusement pas pu trouver une bonne solution.

EDIT: J'ai découvert que ce problème a été signalé comme le bogue JDK 6602600 et semble avoir été corrigé très récemment.

Flavio
la source
4

Enveloppez la tâche dans FutureTask et vous pouvez spécifier le délai d'expiration pour FutureTask. Regardez l'exemple dans ma réponse à cette question,

Délai d'expiration du processus natif Java

Codeur ZZ
la source
1
Je me rends compte qu'il y a plusieurs façons de faire cela en utilisant les java.util.concurrentclasses, mais je recherche une ExecutorServiceimplémentation.
Edward Dale
1
Si vous dites que vous voulez que votre ExecutorService cache le fait que les délais sont ajoutés à partir du code client, vous pouvez implémenter votre propre ExecutorService qui encapsule chaque exécutable qui lui est remis avec une FutureTask avant de les exécuter.
erikprice
2

Après une tonne de temps pour enquêter,
enfin, j'utilise la invokeAllméthode ExecutorServicepour résoudre ce problème.
Cela interrompra strictement la tâche pendant l'exécution de la tâche.
Voici un exemple

ExecutorService executorService = Executors.newCachedThreadPool();

try {
    List<Callable<Object>> callables = new ArrayList<>();
    // Add your long time task (callable)
    callables.add(new VaryLongTimeTask());
    // Assign tasks for specific execution timeout (e.g. 2 sec)
    List<Future<Object>> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS);
    for (Future<Object> future : futures) {
        // Getting result
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}

executorService.shutdown();

L'avantage est que vous pouvez également soumettre ListenableFutureen même temps ExecutorService.
Modifiez légèrement la première ligne de code.

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

ListeningExecutorServiceest la fonction d'écoute du ExecutorServiceprojet at google guava ( com.google.guava ))

Johnny
la source
2
Merci de l'avoir signalé invokeAll. Cela fonctionne très bien. Juste un mot d'avertissement pour ceux qui envisagent d'utiliser ceci: bien que invokeAllrenvoie une liste d' Futureobjets, cela semble en fait être une opération de blocage.
mxro
1

Il semble que le problème ne soit pas dans le bogue JDK 6602600 (il a été résolu le 22/05/2010), mais dans un appel incorrect de sleep (10) en cercle. Remarque supplémentaire, que le thread principal doit donner directement CHANCE aux autres threads pour réaliser leurs tâches en invoquant SLEEP (0) dans CHAQUE branche du cercle extérieur. Il vaut mieux, je pense, utiliser Thread.yield () au lieu de Thread.sleep (0)

La partie corrigée du résultat du code de problème précédent est comme ceci:

.......................
........................
Thread.yield();         

if (i % 1000== 0) {
System.out.println(i + "/" + counter.get()+ "/"+service.toString());
}

//                
//                while (i > counter.get()) {
//                    Thread.sleep(10);
//                } 

Il fonctionne correctement avec une quantité de compteur externe jusqu'à 150 000 000 de cercles testés.

Sergey
la source
1

En utilisant la réponse de John W, j'ai créé une implémentation qui commence correctement le délai d'expiration lorsque la tâche commence son exécution. J'écris même un test unitaire pour ça :)

Cependant, cela ne répond pas à mes besoins car certaines opérations d'E / S ne s'interrompent pas quand Future.cancel()est appelé (c'est-à-dire quand Thread.interrupt()est appelé). Quelques exemples d'opérations d'E / S qui ne peuvent pas être interrompues lors de l' Thread.interrupt()appel sont Socket.connectet Socket.read(et je soupçonne la plupart des opérations d'E / S implémentées dans java.io). Toutes les opérations d'E / S dans java.niodoivent être interruptibles lors de l' Thread.interrupt()appel. Par exemple, c'est le cas pour SocketChannel.openet SocketChannel.read.

Quoi qu'il en soit, si quelqu'un est intéressé, j'ai créé un gist pour un exécuteur de pool de threads qui permet aux tâches d'expirer (si elles utilisent des opérations interruptibles ...): https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf

amanteaux
la source
Code intéressant, je l'ai inséré dans mon système et curieux de savoir si vous avez des exemples du type d'opérations d'E / S qui n'interrompront pas afin que je puisse voir si cela aura un impact sur mon système. Merci!
Duncan Krebs
@DuncanKrebs J'ai détaillé ma réponse avec un exemple d'E / S non interruptible: Socket.connectetSocket.read
amanteaux
myThread.interrupted()n'est pas la bonne méthode pour interrompre, car elle EFFACE l'indicateur d'interruption. Utilisez à la myThread.interrupt()place, et cela devrait avec les sockets
DanielCuadra
@DanielCuadra: Merci, il semble que j'ai fait une faute de frappe car Thread.interrupted()ne permet pas d'interrompre un fil. Cependant, Thread.interrupt()n'interrompt pas les java.ioopérations, il ne fonctionne que sur les java.nioopérations.
amanteaux
J'utilise interrupt()depuis de nombreuses années et cela a toujours interrompu les opérations java.io (ainsi que d'autres méthodes de blocage, telles que thread sleep, connexions jdbc, blockingqueue take, etc.). Peut-être avez-vous trouvé une classe boguée ou une JVM qui a des bogues
DanielCuadra
0

Qu'en est-il de cette idée alternative:

  • deux ont deux exécuteurs testamentaires:
    • un pour:
      • soumettre la tâche, sans se soucier du délai d'expiration de la tâche
      • en ajoutant le futur résulté et le moment où il devrait se terminer à une structure interne
    • un pour exécuter un travail interne qui vérifie la structure interne si certaines tâches sont expirées et si elles doivent être annulées.

Un petit échantillon est ici:

public class AlternativeExecutorService 
{

private final CopyOnWriteArrayList<ListenableFutureTask> futureQueue       = new CopyOnWriteArrayList();
private final ScheduledThreadPoolExecutor                scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job
private final ListeningExecutorService                   threadExecutor    = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for
private ScheduledFuture scheduledFuture;
private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L;

public AlternativeExecutorService()
{
    scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS);
}

public void pushTask(OwnTask task)
{
    ListenableFuture<Void> future = threadExecutor.submit(task);  // -> create your Callable
    futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end
}

public void shutdownInternalScheduledExecutor()
{
    scheduledFuture.cancel(true);
    scheduledExecutor.shutdownNow();
}

long getCurrentMillisecondsTime()
{
    return Calendar.getInstance().get(Calendar.MILLISECOND);
}

class ListenableFutureTask
{
    private final ListenableFuture<Void> future;
    private final OwnTask                task;
    private final long                   milliSecEndTime;

    private ListenableFutureTask(ListenableFuture<Void> future, OwnTask task, long milliSecStartTime)
    {
        this.future = future;
        this.task = task;
        this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS);
    }

    ListenableFuture<Void> getFuture()
    {
        return future;
    }

    OwnTask getTask()
    {
        return task;
    }

    long getMilliSecEndTime()
    {
        return milliSecEndTime;
    }
}

class TimeoutManagerJob implements Runnable
{
    CopyOnWriteArrayList<ListenableFutureTask> getCopyOnWriteArrayList()
    {
        return futureQueue;
    }

    @Override
    public void run()
    {
        long currentMileSecValue = getCurrentMillisecondsTime();
        for (ListenableFutureTask futureTask : futureQueue)
        {
            consumeFuture(futureTask, currentMileSecValue);
        }
    }

    private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue)
    {
        ListenableFuture<Void> future = futureTask.getFuture();
        boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue;
        if (isTimeout)
        {
            if (!future.isDone())
            {
                future.cancel(true);
            }
            futureQueue.remove(futureTask);
        }
    }
}

class OwnTask implements Callable<Void>
{
    private long     timeoutDuration;
    private TimeUnit timeUnit;

    OwnTask(long timeoutDuration, TimeUnit timeUnit)
    {
        this.timeoutDuration = timeoutDuration;
        this.timeUnit = timeUnit;
    }

    @Override
    public Void call() throws Exception
    {
        // do logic
        return null;
    }

    public long getTimeoutDuration()
    {
        return timeoutDuration;
    }

    public TimeUnit getTimeUnit()
    {
        return timeUnit;
    }
}
}
Ionut Mesaros
la source
0

vérifiez si cela fonctionne pour vous,

    public <T,S,K,V> ResponseObject<Collection<ResponseObject<T>>> runOnScheduler(ThreadPoolExecutor threadPoolExecutor,
      int parallelismLevel, TimeUnit timeUnit, int timeToCompleteEachTask, Collection<S> collection,
      Map<K,V> context, Task<T,S,K,V> someTask){
    if(threadPoolExecutor==null){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("threadPoolExecutor can not be null").build();
    }
    if(someTask==null){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("Task can not be null").build();
    }
    if(CollectionUtils.isEmpty(collection)){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("input collection can not be empty").build();
    }

    LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue = new LinkedBlockingQueue<>(collection.size());
    collection.forEach(value -> {
      callableLinkedBlockingQueue.offer(()->someTask.perform(value,context)); //pass some values in callable. which can be anything.
    });
    LinkedBlockingQueue<Future<T>> futures = new LinkedBlockingQueue<>();

    int count = 0;

    while(count<parallelismLevel && count < callableLinkedBlockingQueue.size()){
      Future<T> f = threadPoolExecutor.submit(callableLinkedBlockingQueue.poll());
      futures.offer(f);
      count++;
    }

    Collection<ResponseObject<T>> responseCollection = new ArrayList<>();

    while(futures.size()>0){
      Future<T> future = futures.poll();
      ResponseObject<T> responseObject = null;
        try {
          T response = future.get(timeToCompleteEachTask, timeUnit);
          responseObject = ResponseObject.<T>builder().data(response).build();
        } catch (InterruptedException e) {
          future.cancel(true);
        } catch (ExecutionException e) {
          future.cancel(true);
        } catch (TimeoutException e) {
          future.cancel(true);
        } finally {
          if (Objects.nonNull(responseObject)) {
            responseCollection.add(responseObject);
          }
          futures.remove(future);//remove this
          Callable<T> callable = getRemainingCallables(callableLinkedBlockingQueue);
          if(null!=callable){
            Future<T> f = threadPoolExecutor.submit(callable);
            futures.add(f);
          }
        }

    }
    return ResponseObject.<Collection<ResponseObject<T>>>builder().data(responseCollection).build();
  }

  private <T> Callable<T> getRemainingCallables(LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue){
    if(callableLinkedBlockingQueue.size()>0){
      return callableLinkedBlockingQueue.poll();
    }
    return null;
  }

vous pouvez restreindre le nombre d'utilisations de thread à partir du planificateur ainsi que mettre un délai d'expiration sur la tâche.

Nitish Kumar
la source