Gestion des exceptions des tâches Java ExecutorService

213

J'essaie d'utiliser la ThreadPoolExecutorclasse Java pour exécuter un grand nombre de tâches lourdes avec un nombre fixe de threads. Chacune des tâches comporte de nombreux emplacements pendant lesquels elle peut échouer en raison d'exceptions.

J'ai sous ThreadPoolExecutor- classé et j'ai remplacé la afterExecuteméthode qui est censée fournir toutes les exceptions non détectées rencontrées lors de l'exécution d'une tâche. Cependant, je n'arrive pas à le faire fonctionner.

Par exemple:

public class ThreadPoolErrors extends ThreadPoolExecutor {
    public ThreadPoolErrors() {
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    }

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null) {
            System.out.println("Got an error: " + t);
        } else {
            System.out.println("Everything's fine--situation normal!");
        }
    }

    public static void main( String [] args) {
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit( 
                new Runnable() {
                    public void run() {
                        throw new RuntimeException("Ouch! Got an error.");
                    }
                }
        );
        threadPool.shutdown();
    }
}

La sortie de ce programme est "Tout va bien - situation normale!" même si le seul Runnable soumis au pool de threads lève une exception. Une idée de ce qui se passe ici?

Merci!

À M
la source
vous n'avez jamais demandé l'avenir de la tâche, ce qui s'est passé là-bas. L'exécuteur ou le programme de service entier ne va pas être bloqué. L'exception est interceptée et est enveloppée sous ExecutionException. Et reviendra-t-il si vous appelez future.get (). PS: The future.isDone () [Veuillez lire le vrai nom de l'API] retournera vrai, même lorsque le runnable s'est terminé par erreur. Parce que la tâche est accomplie pour de vrai.
Jai Pandit

Réponses:

156

De la documentation :

Remarque: Lorsque des actions sont incluses dans des tâches (telles que FutureTask), soit explicitement, soit via des méthodes telles que submit, ces objets de tâche interceptent et conservent des exceptions de calcul, et ne provoquent donc pas d'arrêt brutal, et les exceptions internes ne sont pas transmises à cette méthode .

Lorsque vous soumettez un Runnable, il sera enveloppé dans un avenir.

Votre afterExecute devrait ressembler à ceci:

public final class ExtendedExecutor extends ThreadPoolExecutor {

    // ...

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Future<?> future = (Future<?>) r;
                if (future.isDone()) {
                    future.get();
                }
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (t != null) {
            System.out.println(t);
        }
    }
}
non
la source
7
Merci, j'ai fini par utiliser cette solution. De plus, au cas où quelqu'un serait intéressé: d'autres ont suggéré de ne pas sous-classer ExecutorService, mais je l'ai quand même fait parce que je voulais surveiller les tâches pendant qu'elles se terminent plutôt que d'attendre qu'elles se terminent toutes et d'appeler get () sur tous les Futures retournés .
Tom
1
Une autre approche pour sous-classer l'exécuteur testamentaire consiste à sous-classer FutureTask et à remplacer sa méthode `` terminée ''
nos
1
Tom >> Pouvez-vous s'il vous plaît publier votre exemple de code d'extrait où vous avez sous
classé
1
Cette réponse ne fonctionnera pas si vous utilisez ComplableFuture.runAsync car afterExecute contiendra un objet qui est un package privé et aucun moyen d'accéder au throwable. Je l'ai contourné en terminant l'appel. Voir ma réponse ci-dessous.
mmm
2
Faut-il vérifier si l'avenir est terminé en utilisant future.isDone()? Puisque afterExecuteest exécuté après la Runnablefin, je suppose que future.isDone()revient toujours true.
Searene
247

AVERTISSEMENT : il convient de noter que cette solution bloquera le thread appelant.


Si vous souhaitez traiter les exceptions levées par la tâche, il est généralement préférable d'utiliser Callableplutôt que Runnable.

Callable.call() est autorisé à lever des exceptions vérifiées, et celles-ci sont propagées au thread appelant:

Callable task = ...
Future future = executor.submit(task);
try {
   future.get();
} catch (ExecutionException ex) {
   ex.getCause().printStackTrace();
}

Si Callable.call()lève une exception, elle sera enveloppée dans un ExecutionExceptionet levée par Future.get().

Cela est probablement préférable à la sous-classification ThreadPoolExecutor. Il vous donne également la possibilité de soumettre à nouveau la tâche si l'exception est récupérable.

skaffman
la source
5
> Callable.call () est autorisé à lever des exceptions vérifiées, et celles-ci sont propagées vers le thread appelant: Notez que l'exception levée ne se propagera vers le thread appelant que si future.get()ou sa version surchargée est appelée.
nylé
16
C'est parfait, mais que faire si j'exécute des tâches en parallèle et que je ne veux pas bloquer l'exécution?
Grigory Kislin
44
N'utilisez pas cette solution, car elle rompt tout le but de l'utilisation de ExecutorService. Un ExecutorService est un mécanisme d'exécution asynchrone capable d'exécuter des tâches en arrière-plan. Si vous appelez future.get () juste après l'exécution, il bloquera le thread appelant jusqu'à la fin de la tâche.
user1801374
2
Cette solution ne devrait pas être aussi bien cotée. Future.get () fonctionne de manière synchrone et agira comme un bloqueur jusqu'à ce que Runnable ou Callable aient été exécutés et, comme indiqué ci-dessus, défait le but d'utiliser le service Executor
Super Hans
2
Comme l'a souligné #nhylated, cela mérite un BUG jdk. Si Future.get () n'est pas appelé, toute exception non interceptée de Callable est silencieusement ignorée. Conception très mauvaise .... juste passé 1+ jour pour comprendre qu'une bibliothèque a utilisé ceci et jdk a silencieusement ignoré les exceptions. Et cela existe toujours dans jdk12.
Ben Jiang
18

L'explication de ce comportement se trouve dans le javadoc pour afterExecute :

Remarque: Lorsque des actions sont incluses dans des tâches (telles que FutureTask), soit explicitement, soit via des méthodes telles que submit, ces objets de tâche interceptent et conservent des exceptions de calcul, et ne provoquent donc pas d'arrêt brutal, et les exceptions internes ne sont pas transmises à cette méthode .

Drew Wills
la source
10

Je l'ai contourné en enveloppant le runnable fourni soumis à l'exécuteur testamentaire.

CompletableFuture.runAsync(() -> {
        try {
              runnable.run();
        } catch (Throwable e) {
              Log.info(Concurrency.class, "runAsync", e);
        }
}, executorService);
mmm
la source
3
Vous pouvez améliorer la lisibilité en utilisant la whenComplete()méthode de CompletableFuture.
Eduard Wirch
@EduardWirch cela fonctionne, mais vous ne pouvez pas lever une exception à la date de saisie de whenComplete ()
Akshat
7

J'utilise la VerboseRunnableclasse de jcabi-log , qui avale toutes les exceptions et les enregistre. Très pratique, par exemple:

import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
  new VerboseRunnable(
    Runnable() {
      public void run() { 
        // the code, which may throw
      }
    },
    true // it means that all exceptions will be swallowed and logged
  ),
  1, 1, TimeUnit.MILLISECONDS
);
yegor256
la source
3

Une autre solution consisterait à utiliser ManagedTask et ManagedTaskListener .

Vous avez besoin d'un Callable ou Runnable qui implémente l'interface ManagedTask .

La méthode getManagedTaskListenerrenvoie l'instance souhaitée.

public ManagedTaskListener getManagedTaskListener() {

Et vous implémentez dans ManagedTaskListener la taskDoneméthode:

@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
    if (exception != null) {
        LOGGER.log(Level.SEVERE, exception.getMessage());
    }
}

Plus de détails sur le cycle de vie des tâches gérées et l'écouteur .

CSchulz
la source
2

Cela marche

  • Il est dérivé de SingleThreadExecutor, mais vous pouvez l'adapter facilement
  • Code lamdas Java 8, mais facile à corriger

Il créera un exécuteur avec un seul thread, qui peut obtenir beaucoup de tâches; et attendra la fin de l'exécution pour commencer avec la prochaine

En cas d'erreur ou d'exception, le uncaughtExceptionHandler l'attrapera

classe finale publique SingleThreadExecutorWithExceptions {

    public static ExecutorService newSingleThreadExecutorWithExceptions (final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {

        Usine ThreadFactory = (exécutable exécutable) -> {
            Thread final newThread = nouveau Thread (exécutable, "SingleThreadExecutorWithExceptions");
            newThread.setUncaughtExceptionHandler ((final Thread caugthThread, final Throwable throwable) -> {
                uncaughtExceptionHandler.uncaughtException (caugthThread, throwable);
            });
            return newThread;
        };
        renvoyer un nouveau FinalizableDelegatedExecutorService
                (nouveau ThreadPoolExecutor (1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        nouveau LinkedBlockingQueue (),
                        usine){


                    void protégé afterExecute (Runnable runnable, Throwable throwable) {
                        super.afterExecute (exécutable, jetable);
                        if (throwable == null && runnable instanceof Future) {
                            essayez {
                                Futur futur = (Futur) exécutable;
                                if (future.isDone ()) {
                                    future.get ();
                                }
                            } catch (CancellationException ce) {
                                jetable = ce;
                            } catch (ExecutionException ee) {
                                throwable = ee.getCause ();
                            } catch (InterruptedException ie) {
                                Thread.currentThread (). Interrupt (); // ignorer / réinitialiser
                            }
                        }
                        if (jetable! = null) {
                            uncaughtExceptionHandler.uncaughtException (Thread.currentThread (), throwable);
                        }
                    }
                });
    }



    classe statique privée FinalizableDelegatedExecutorService
            étend DelegatedExecutorService {
        FinalizableDelegatedExecutorService (exécuteur ExecutorService) {
            super (exécuteur testamentaire);
        }
        void protégé finalize () {
            super.shutdown ();
        }
    }

    / **
     * Une classe wrapper qui expose uniquement les méthodes ExecutorService
     * d'une implémentation ExecutorService.
     * /
    classe statique privée DelegatedExecutorService étend AbstractExecutorService {
        ExecutorService e privé final;
        DelegatedExecutorService (exécuteur ExecutorService) {e = exécuteur; }
        public void execute (commande Runnable) {e.execute (commande); }
        public void shutdown () {e.shutdown (); }
        liste publique shutdownNow () {return e.shutdownNow (); }
        public booléen isShutdown () {return e.isShutdown (); }
        public booléen isTerminated () {return e.isTerminated (); }
        booléen public waitTermination (long timeout, unité TimeUnit)
                lève InterruptedException {
            return e.awaitTermination (timeout, unit);
        }
        Soumission publique future (tâche exécutable) {
            return e.submit (tâche);
        }
        Soumission publique future (tâche appelable) {
            return e.submit (tâche);
        }
        public Future submit (tâche exécutable, résultat T) {
            return e.submit (tâche, résultat);
        }
        Liste publique> invokeAll (Collection> tâches)
                lève InterruptedException {
            return e.invokeAll (tâches);
        }
        Liste publique> invokeAll (Collection> tâches,
                                             long timeout, unité TimeUnit)
                lève InterruptedException {
            return e.invokeAll (tâches, timeout, unité);
        }
        public T invokeAny (Collection> tâches)
                lève InterruptedException, ExecutionException {
            return e.invokeAny (tâches);
        }
        public T invokeAny (Collection> tâches,
                               long timeout, unité TimeUnit)
                lève InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny (tâches, timeout, unité);
        }
    }



    SingleThreadExecutorWithExceptions () {} privé
}
obesga_tirant
la source
L'utilisation de finalize est un peu instable malheureusement, car elle ne sera appelée que "plus tard lorsque le garbage collector la récupérera" (ou peut-être pas dans le cas d'un Thread, dunno) ...
rogerdpack
1

Si vous souhaitez surveiller l'exécution de la tâche, vous pouvez faire tourner 1 ou 2 threads (peut-être plus en fonction de la charge) et les utiliser pour prendre des tâches à partir d'un wrapper ExecutionCompletionService.

Cristian Botiza
la source
0

Si votre ExecutorServiceprovient d'une source externe (c'est-à-dire qu'il n'est pas possible de sous ThreadPoolExecutor-classer et de remplacer afterExecute()), vous pouvez utiliser un proxy dynamique pour obtenir le comportement souhaité:

public static ExecutorService errorAware(final ExecutorService executor) {
    return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
            new Class[] {ExecutorService.class},
            (proxy, method, args) -> {
                if (method.getName().equals("submit")) {
                    final Object arg0 = args[0];
                    if (arg0 instanceof Runnable) {
                        args[0] = new Runnable() {
                            @Override
                            public void run() {
                                final Runnable task = (Runnable) arg0;
                                try {
                                    task.run();
                                    if (task instanceof Future<?>) {
                                        final Future<?> future = (Future<?>) task;

                                        if (future.isDone()) {
                                            try {
                                                future.get();
                                            } catch (final CancellationException ce) {
                                                // Your error-handling code here
                                                ce.printStackTrace();
                                            } catch (final ExecutionException ee) {
                                                // Your error-handling code here
                                                ee.getCause().printStackTrace();
                                            } catch (final InterruptedException ie) {
                                                Thread.currentThread().interrupt();
                                            }
                                        }
                                    }
                                } catch (final RuntimeException re) {
                                    // Your error-handling code here
                                    re.printStackTrace();
                                    throw re;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    } else if (arg0 instanceof Callable<?>) {
                        args[0] = new Callable<Object>() {
                            @Override
                            public Object call() throws Exception {
                                final Callable<?> task = (Callable<?>) arg0;
                                try {
                                    return task.call();
                                } catch (final Exception e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    }
                }
                return method.invoke(executor, args);
            });
}
Basse
la source
0

Ceci est dû AbstractExecutorService :: submitest enveloppant votre runnableen RunnableFuture(rien FutureTask) comme ci - dessous

AbstractExecutorService.java

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
    execute(ftask);
    return ftask;
}

Ensuite, executele transmettre à Workeret Worker.run()appellera ci-dessous.

ThreadPoolExecutor.java

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();           /////////HERE////////
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

Enfin, task.run();dans le code ci-dessus, l'appel sera appelé FutureTask.run(). Voici le code du gestionnaire d'exceptions, pour cette raison, vous n'obtenez PAS l'exception attendue.

class FutureTask<V> implements RunnableFuture<V>

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {   /////////HERE////////
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
Kanagavelu Sugumar
la source
0

C'est similaire à la solution de mmm, mais un peu plus compréhensible. Demandez à vos tâches d'étendre une classe abstraite qui encapsule la méthode run ().

public abstract Task implements Runnable {

    public abstract void execute();

    public void run() {
      try {
        execute();
      } catch (Throwable t) {
        // handle it  
      }
    }
}


public MySampleTask extends Task {
    public void execute() {
        // heavy, error-prone code here
    }
}
ccleve
la source
-4

Au lieu de sous-classer ThreadPoolExecutor, je lui fournirais une instance ThreadFactory qui crée de nouveaux threads et leur fournit un UncaughtExceptionHandler

Kevin
la source
3
J'ai également essayé cela, mais la méthode uncaughtException ne semble jamais être appelée. Je crois que c'est parce qu'un thread de travail dans la classe ThreadPoolExecutor intercepte les exceptions.
Tom
5
La méthode uncaughtException n'est pas appelée car la méthode de soumission d'ExecutorService encapsule le Callable / Runnable dans un avenir; l'exception y est capturée.
Emil Sit