Existe-t-il un ExecutorService qui utilise le thread actuel?

94

Ce que je recherche, c'est un moyen compatible de configurer ou non l'utilisation d'un pool de threads. Idéalement, le reste du code ne devrait pas du tout être affecté. Je pourrais utiliser un pool de threads avec 1 thread mais ce n'est pas tout à fait ce que je veux. Des idées?

ExecutorService es = threads == 0 ? new CurrentThreadExecutor() : Executors.newThreadPoolExecutor(threads);

// es.execute / es.submit / new ExecutorCompletionService(es) etc
Michael Rutherfurd
la source

Réponses:

69

Voici une implémentation vraiment simple Executor(pas ExecutorService, je vous en prie) qui n'utilise que le thread actuel. Voler ceci de "Java Concurrency in Practice" (lecture essentielle).

public class CurrentThreadExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();
    }
}

ExecutorService est une interface plus élaborée, mais pourrait être traitée avec la même approche.

réfléchir
la source
4
+1: Comme vous le dites, un ExecutorService pourrait être géré de la même manière, peut-être en sous-classant AbstractExecutorService.
Paul Cager
@Paul Yep, AbstractExecutorServiceressemble à la voie à suivre.
Overhink
15
Dans Java8, vous pouvez réduire cela à justeRunnable::run
Jon Freedman
@Juude il fonctionnera toujours sur le thread qui appelle l'exécuteur.
Gustav Karlsson
Le but d'un exécuteur de même thread n'est-il pas de pouvoir planifier plus de tâches à partir de execute ()? Cette réponse ne fera pas l'affaire. Je ne trouve pas de réponse satisfaisante.
haelix
82

Vous pouvez utiliser Guava MoreExecutors.newDirectExecutorService(), ou MoreExecutors.directExecutor()si vous n'avez pas besoin d'un fichier ExecutorService.

Si l'inclusion de Guava est trop lourde, vous pouvez mettre en œuvre quelque chose de presque aussi bon:

public final class SameThreadExecutorService extends ThreadPoolExecutor {
  private final CountDownLatch signal = new CountDownLatch(1);

  private SameThreadExecutorService() {
    super(1, 1, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(),
        new ThreadPoolExecutor.CallerRunsPolicy());
  }

  @Override public void shutdown() {
    super.shutdown();
    signal.countDown();
  }

  public static ExecutorService getInstance() {
    return SingletonHolder.instance;
  }

  private static class SingletonHolder {
    static ExecutorService instance = createInstance();    
  }

  private static ExecutorService createInstance() {
    final SameThreadExecutorService instance
        = new SameThreadExecutorService();

    // The executor has one worker thread. Give it a Runnable that waits
    // until the executor service is shut down.
    // All other submitted tasks will use the RejectedExecutionHandler
    // which runs tasks using the  caller's thread.
    instance.submit(new Runnable() {
        @Override public void run() {
          boolean interrupted = false;
          try {
            while (true) {
              try {
                instance.signal.await();
                break;
              } catch (InterruptedException e) {
                interrupted = true;
              }
            }
          } finally {
            if (interrupted) {
              Thread.currentThread().interrupt();
            }
          }
        }});
    return Executors.unconfigurableScheduledExecutorService(instance);
  }
}
NamshubWriter
la source
1
Pour Android, il renvoie Executors.unconfigurableExecutorService (instance);
Maragues
si nous n'utilisons que le thread actuel , pourquoi des primitives de synchronisation? pourquoi le Latch?
haelix
@haelix le loquet est nécessaire car même si le travail est effectué dans le même thread que celui qui a ajouté le travail, n'importe quel thread pourrait arrêter l'exécuteur.
NamshubWriter
64

Style Java 8:

Executor e = Runnable::run;

lpandzic
la source
7
Absolument sale. J'aime cela.
Rogue
Qu'est-ce que c'est sale? C'est élégant :)
lpandzic
C'est le meilleur type de sale @Ipandzic, c'est inhabituel et succinct.
Rogue
12

J'ai écrit un ExecutorServicebasé sur le AbstractExecutorService.

/**
 * Executes all submitted tasks directly in the same thread as the caller.
 */
public class SameThreadExecutorService extends AbstractExecutorService {

    //volatile because can be viewed by other threads
    private volatile boolean terminated;

    @Override
    public void shutdown() {
        terminated = true;
    }

    @Override
    public boolean isShutdown() {
        return terminated;
    }

    @Override
    public boolean isTerminated() {
        return terminated;
    }

    @Override
    public boolean awaitTermination(long theTimeout, TimeUnit theUnit) throws InterruptedException {
        shutdown(); // TODO ok to call shutdown? what if the client never called shutdown???
        return terminated;
    }

    @Override
    public List<Runnable> shutdownNow() {
        return Collections.emptyList();
    }

    @Override
    public void execute(Runnable theCommand) {
        theCommand.run();
    }
}
Eric Obermühlner
la source
le champ terminé n'est pas protégé par synchronized.
Daneel S.Yaitskov
1
Le terminatedchamp @ DaneelS.Yaitskov ne bénéficiera pas d'un accès synchronisé basé sur le code qui se trouve réellement ici. Les opérations sur les champs 32 bits sont atomiques en Java.
Christopher Schultz
Je suppose que la méthode isTerminated () dans ce qui précède n'est pas tout à fait correcte car isTerminated () n'est censée retourner true que s'il n'y a aucune tâche en cours d'exécution. Guava suit le nombre de tâches dans une autre variable, ce qui explique probablement pourquoi ils protègent les deux variables avec un verrou.
Jeremy K
7

Vous pouvez utiliser RejectedExecutionHandler pour exécuter la tâche dans le thread actuel.

public static final ThreadPoolExecutor CURRENT_THREAD_EXECUTOR = new ThreadPoolExecutor(0, 0, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        r.run();
    }
});

Vous n'en avez besoin que d'un seul.

Peter Lawrey
la source
Intelligent! Dans quelle mesure est-ce sûr (question honnête)? Existe-t-il un moyen de rejeter une tâche alors que vous ne voudriez pas l'exécuter dans le thread actuel? Les tâches sont-elles rejetées si ExecutorService s'arrête ou s'arrête?
Overhink
La taille maximale étant de 0, chaque tâche est rejetée. Cependant, le comportement rejeté consiste à s'exécuter dans le thread actuel. Il n'y aurait un problème que si la tâche n'est PAS rejetée.
Peter Lawrey
8
attention, il existe déjà une implémentation de cette politique, pas besoin de définir la vôtre java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy.
jtahlborn
7
Il n'est plus possible de créer un ThreadPoolExecutor avec une taille de pool maximale de 0. Je suppose qu'il serait possible de reproduire le comportement en utilisant une blockingQueue de taille 0, mais aucune implémentation par défaut ne semble le permettre.
Axelle Ziegler
qui ne se compilera pas à cause de {code} if (corePoolSize <0 || maximumPoolSize <= 0 || maximumPoolSize <corePoolSize || keepAliveTime <0) {code} dans java.util.ThreadPoolExecutor (au moins openJdk 7)
Bogdan
6

J'ai dû utiliser le même "CurrentThreadExecutorService" à des fins de test et, bien que toutes les solutions suggérées soient intéressantes (en particulier celle mentionnant la méthode Guava ), j'ai trouvé quelque chose de similaire à ce que Peter Lawrey a suggéré ici .

Comme mentionné par Axelle Ziegler ici , malheureusement, la solution de Peter ne fonctionnera pas réellement à cause de la vérification introduite dans ThreadPoolExecutorle maximumPoolSizeparamètre du constructeur (c'est maximumPoolSize-à- dire impossible <=0).

Afin de contourner cela, j'ai fait ce qui suit:

private static ExecutorService currentThreadExecutorService() {
    CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
    return new ThreadPoolExecutor(0, 1, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), callerRunsPolicy) {
        @Override
        public void execute(Runnable command) {
            callerRunsPolicy.rejectedExecution(command, this);
        }
    };
}
fabriziocucci
la source