Comment utiliser MDC avec des pools de threads?

146

Dans notre logiciel, nous utilisons largement MDC pour suivre des éléments tels que les identifiants de session et les noms d'utilisateur pour les demandes Web. Cela fonctionne bien lors de l'exécution dans le thread d'origine. Cependant, il y a beaucoup de choses qui doivent être traitées en arrière-plan. Pour cela , nous utilisons java.concurrent.ThreadPoolExecutoret les java.util.Timerclasses ainsi que quelques async roulées services d'exécution. Tous ces services gèrent leur propre pool de threads.

Voici ce que le manuel de Logback a à dire sur l'utilisation de MDC dans un tel environnement:

Une copie du contexte de diagnostic mappé ne peut pas toujours être héritée par les threads de travail du thread initiateur. C'est le cas lorsque java.util.concurrent.Executors est utilisé pour la gestion des threads. Par exemple, la méthode newCachedThreadPool crée un ThreadPoolExecutor et, comme tout autre code de regroupement de threads, elle a une logique de création de thread complexe.

Dans de tels cas, il est recommandé que MDC.getCopyOfContextMap () soit appelé sur le thread d'origine (maître) avant de soumettre une tâche à l'exécuteur. Lorsque la tâche s'exécute, en tant que première action, elle doit appeler MDC.setContextMapValues ​​() pour associer la copie stockée des valeurs MDC d'origine au nouveau thread géré par Executor.

Ce serait bien, mais il est très facile d'oublier l'ajout de ces appels, et il n'y a pas de moyen facile de reconnaître le problème avant qu'il ne soit trop tard. Le seul signe avec Log4j est que vous obtenez des informations MDC manquantes dans les journaux, et avec Logback, vous obtenez des informations MDC périmées (puisque le thread dans le pool de bandes de roulement hérite de son MDC de la première tâche qui a été exécutée dessus). Les deux sont de graves problèmes dans un système de production.

Je ne vois aucunement notre situation particulière, mais je n'ai pas pu trouver grand-chose sur ce problème sur le Web. Apparemment, ce n'est pas quelque chose contre quoi beaucoup de gens se heurtent, il doit donc y avoir un moyen de l'éviter. Que faisons-nous de mal ici?

Lóránt Pintér
la source
1
Si votre application est déployée dans un environnement JEE, vous pouvez utiliser des intercepteurs java pour définir le contexte MDC avant l'appel EJB.
Maxim Kirilov
2
À partir de la version 1.1.5 de logback, les valeurs MDC ne sont plus héritées par les threads enfants.
Ceki
2
@Ceki La documentation doit être mise à jour: "Un thread enfant hérite automatiquement d'une copie du contexte de diagnostic mappé de son parent." logback.qos.ch/manual/mdc.html
steffen
J'ai créé une pull request vers slf4j qui résout le problème de l'utilisation de MDC à travers les threads (lien github.com/qos-ch/slf4j/pull/150 ). Peut-être, si les gens commentent et le demandent, ils intégreront le changement dans le SLF4J :)
Homme

Réponses:

79

Oui, c'est également un problème courant que j'ai rencontré. Il existe quelques solutions de contournement (comme le définir manuellement, comme décrit), mais idéalement, vous voulez une solution qui

  • Définit le MDC de manière cohérente;
  • Évite les bogues tacites où le MDC est incorrect mais vous ne le savez pas; et
  • Minimise les changements dans la façon dont vous utilisez les pools de threads (par exemple, sous-classement Callableavec MyCallablepartout, ou laideur similaire).

Voici une solution que j'utilise qui répond à ces trois besoins. Le code doit être explicite.

(En remarque, cet exécuteur peut être créé et envoyé à Guava MoreExecutors.listeningDecorator(), si vous utilisez Guava ListanableFuture.)

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

/**
 * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
 * <p/>
 * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
 * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
 * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
 * <p/>
 * Created by jlevy.
 * Date: 6/14/13
 */
public class MdcThreadPoolExecutor extends ThreadPoolExecutor {

    final private boolean useFixedContext;
    final private Map<String, Object> fixedContext;

    /**
     * Pool where task threads take MDC from the submitting thread.
     */
    public static MdcThreadPoolExecutor newWithInheritedMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                            TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(null, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    /**
     * Pool where task threads take fixed MDC from the thread that creates the pool.
     */
    @SuppressWarnings("unchecked")
    public static MdcThreadPoolExecutor newWithCurrentMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                          TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(MDC.getCopyOfContextMap(), corePoolSize, maximumPoolSize, keepAliveTime, unit,
                workQueue);
    }

    /**
     * Pool where task threads always have a specified, fixed MDC.
     */
    public static MdcThreadPoolExecutor newWithFixedMdc(Map<String, Object> fixedContext, int corePoolSize,
                                                        int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                                        BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(fixedContext, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    private MdcThreadPoolExecutor(Map<String, Object> fixedContext, int corePoolSize, int maximumPoolSize,
                                  long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.fixedContext = fixedContext;
        useFixedContext = (fixedContext != null);
    }

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return useFixedContext ? fixedContext : MDC.getCopyOfContextMap();
    }

    /**
     * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
     * all delegate to this.
     */
    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
        return new Runnable() {
            @Override
            public void run() {
                Map previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            }
        };
    }
}
jlevy
la source
Si le contexte précédent n'est pas vide, n'est-ce pas toujours des déchets? Pourquoi le transportez-vous?
djjeck
2
Droite; il ne devrait pas être réglé. Cela semble juste une bonne hygiène, par exemple si la méthode wrap () a été exposée et utilisée par quelqu'un d'autre sur la route.
jlevy
Pouvez-vous fournir une référence sur la manière dont ce MdcThreadPoolExecutor a été attaché ou référencé par Log4J2? Y a-t-il quelque part où nous devons spécifiquement référencer cette classe, ou est-ce fait «automatiquement»? Je n'utilise pas Guava. Je pourrais, mais j'aimerais savoir s'il existe un autre moyen avant de l'utiliser.
jcb
Si je comprends bien votre question, la réponse est oui, ce sont des variables locales de threads "magiques" dans SLF4J - voir les implémentations de MDC.setContextMap () etc. De plus, en passant, cela utilise SLF4J, pas Log4J, ce qui est préférable car il fonctionne avec Log4j, Logback et d'autres configurations de journalisation.
jlevy
1
Juste pour être complet: si vous utilisez Spring au ThreadPoolTaskExecutorlieu de Java ordinaire ThreadPoolExecutor, vous pouvez utiliser le MdcTaskDecoratordécrit sur moelholm.com/2017/07/24/…
Pino
27

Nous avons rencontré un problème similaire. Vous souhaiterez peut-être étendre ThreadPoolExecutor et remplacer les méthodes before / afterExecute pour effectuer les appels MDC dont vous avez besoin avant de démarrer / arrêter de nouveaux threads.

marque
la source
10
Les méthodes beforeExecute(Thread, Runnable)et afterExecute(Runnable, Throwable)peuvent être utiles dans d'autres cas, mais je ne sais pas comment cela fonctionnera pour la configuration des MDC. Ils sont tous deux exécutés sous le thread généré. Cela signifie que vous devez être en mesure de récupérer la carte mise à jour à partir du thread principal avant beforeExecute.
Kenston Choi
Mieux vaut définir les MDC dans le filtre, ce qui signifie que lorsque la demande est en cours de traitement par la logique métier, le contexte ne sera pas mis à jour. Je ne pense pas que nous devrions mettre à jour MDC partout dans l'application
dereck
15

À mon humble avis, la meilleure solution est de:

  • utilisation ThreadPoolTaskExecutor
  • mettre en œuvre le vôtre TaskDecorator
  • utilise le: executor.setTaskDecorator(new LoggingTaskDecorator());

Le décorateur peut ressembler à ceci:

private final class LoggingTaskDecorator implements TaskDecorator {

    @Override
    public Runnable decorate(Runnable task) {
        // web thread
        Map<String, String> webThreadContext = MDC.getCopyOfContextMap();
        return () -> {
            // work thread
            try {
                // TODO: is this thread safe?
                MDC.setContextMap(webThreadContext);
                task.run();
            } finally {
                MDC.clear();
            }
        };
    }

}
Tomáš Myšík
la source
Désolé, je ne sais pas vraiment ce que vous voulez dire. MISE À JOUR: Je pense que je vois maintenant, va améliorer ma réponse.
Tomáš Myšík
6

Voici comment je le fais avec des pools de threads fixes et des exécuteurs:

ExecutorService executor = Executors.newFixedThreadPool(4);
Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();

Dans la partie filetage:

executor.submit(() -> {
    MDC.setContextMap(mdcContextMap);
    // my stuff
});
Amaury D
la source
2

Semblable aux solutions précédemment publiées, les newTaskForméthodes pour Runnableet Callablepeuvent être écrasées afin d'encapsuler l'argument (voir solution acceptée) lors de la création du RunnableFuture.

Remarque: Par conséquent, la méthode executorServices submitdoit être appelée à la place de la executeméthode.

Pour le ScheduledThreadPoolExecutor, les decorateTaskméthodes seraient écrasées à la place.

Ma clé_
la source
2

Si vous rencontrez ce problème dans un environnement lié au framework Spring où vous exécutez des tâches en utilisant l' @Asyncannotation, vous pouvez décorer les tâches à l'aide de l' approche TaskDecorator . Un exemple de procédure est fourni ici: https://moelholm.com/blog/2017/07/24/spring-43-using-a-taskdecorator-to-copy-mdc-data-to-async-threads

J'ai rencontré ce problème et l'article ci-dessus m'a aidé à le résoudre, c'est pourquoi je le partage ici.

Soner
la source
0

Une autre variante similaire aux réponses existantes ici consiste à implémenter ExecutorServiceet à autoriser un délégué à lui être transmis. Ensuite, en utilisant des génériques, il peut toujours exposer le délégué réel au cas où l'on voudrait obtenir des statistiques (tant qu'aucune autre méthode de modification n'est utilisée).

Code de référence:

public class MDCExecutorService<D extends ExecutorService> implements ExecutorService {

    private final D delegate;

    public MDCExecutorService(D delegate) {
        this.delegate = delegate;
    }

    @Override
    public void shutdown() {
        delegate.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        return delegate.shutdownNow();
    }

    @Override
    public boolean isShutdown() {
        return delegate.isShutdown();
    }

    @Override
    public boolean isTerminated() {
        return delegate.isTerminated();
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.awaitTermination(timeout, unit);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return delegate.submit(wrap(task));
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return delegate.submit(wrap(task), result);
    }

    @Override
    public Future<?> submit(Runnable task) {
        return delegate.submit(wrap(task));
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        return delegate.invokeAll(wrapCollection(tasks));
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.invokeAll(wrapCollection(tasks), timeout, unit);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        return delegate.invokeAny(wrapCollection(tasks));
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return delegate.invokeAny(wrapCollection(tasks), timeout, unit);
    }

    @Override
    public void execute(Runnable command) {
        delegate.execute(wrap(command));
    }

    public D getDelegate() {
        return delegate;
    }

    /* Copied from https://github.com/project-ncl/pnc/blob/master/common/src/main/java/org/jboss/pnc/common
    /concurrent/MDCWrappers.java */

    private static Runnable wrap(final Runnable runnable) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return () -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                runnable.run();
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Callable<T> wrap(final Callable<T> callable) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return () -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                return callable.call();
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Consumer<T> wrap(final Consumer<T> consumer) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return (t) -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                consumer.accept(t);
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Collection<Callable<T>> wrapCollection(Collection<? extends Callable<T>> tasks) {
        Collection<Callable<T>> wrapped = new ArrayList<>();
        for (Callable<T> task : tasks) {
            wrapped.add(wrap(task));
        }
        return wrapped;
    }
}
Kenston Choi
la source
-3

J'ai pu résoudre ce problème en utilisant l'approche suivante

Dans le thread principal (Application.java, le point d'entrée de mon application)

static public Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();

Dans la méthode run de la classe qui est appelée par Executer

MDC.setContextMap(Application.mdcContextMap);
Smishra
la source