Est-il possible de spécifier un pool de threads personnalisé pour le flux parallèle Java 8 ? Je ne peux pas le trouver nulle part.
Imaginez que j'ai une application serveur et que j'aimerais utiliser des flux parallèles. Mais l'application est grande et multithread donc je veux la compartimenter. Je ne veux pas d'une tâche d'exécution lente dans un module des tâches de blocage d'application d'un autre module.
Si je ne peux pas utiliser différents pools de threads pour différents modules, cela signifie que je ne peux pas utiliser en toute sécurité des flux parallèles dans la plupart des situations du monde réel.
Essayez l'exemple suivant. Certaines tâches gourmandes en CPU sont exécutées dans des threads séparés. Les tâches exploitent des flux parallèles. La première tâche est interrompue, chaque étape dure donc 1 seconde (simulée par le sommeil du thread). Le problème est que d'autres threads se bloquent et attendent que la tâche interrompue se termine. C'est un exemple artificiel, mais imaginez une application de servlet et quelqu'un soumettant une tâche de longue durée au pool de jointure de fourches partagé.
public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}
private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
}
Réponses:
Il y a en fait une astuce pour exécuter une opération parallèle dans un pool de jointure spécifique. Si vous l'exécutez en tant que tâche dans un pool de jointures de fourches, il y reste et n'utilise pas le plus commun.
L'astuce est basée sur ForkJoinTask.fork qui spécifie: "Arrange pour exécuter de manière asynchrone cette tâche dans le pool dans lequel la tâche en cours s'exécute, le cas échéant, ou en utilisant ForkJoinPool.commonPool () sinon dansForkJoinPool ()"
la source
ForkJoinPool
ou est-ce un détail d'implémentation? Un lien vers la documentation serait bien.ForkJoinPool
instance devrait êtreshutdown()
quand elle n'est plus nécessaire pour éviter une fuite de thread. (exemple)Les flux parallèles utilisent la valeur par défaut
ForkJoinPool.commonPool
qui, par défaut, a un thread de moins car vous avez des processeurs , comme retourné parRuntime.getRuntime().availableProcessors()
(Cela signifie que les flux parallèles utilisent tous vos processeurs car ils utilisent également le thread principal):Cela signifie également que si vous avez des flux parallèles imbriqués ou plusieurs flux parallèles démarrés simultanément, ils partageront tous le même pool. Avantage: vous n'utiliserez jamais plus que la valeur par défaut (nombre de processeurs disponibles). Inconvénient: vous ne pouvez pas obtenir "tous les processeurs" affectés à chaque flux parallèle que vous lancez (si vous en avez plusieurs). (Apparemment, vous pouvez utiliser un ManagedBlocker pour contourner cela.)
Pour modifier la façon dont les flux parallèles sont exécutés, vous pouvez soit
yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();
ouSystem.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")
pour un parallélisme cible de 20 threads. Cependant, cela ne fonctionne plus après le correctif rétroporté https://bugs.openjdk.java.net/browse/JDK-8190974 .Exemple de ce dernier sur ma machine qui dispose de 8 processeurs. Si j'exécute le programme suivant:
La sortie est:
Vous pouvez donc voir que le flux parallèle traite 8 éléments à la fois, c'est-à-dire qu'il utilise 8 threads. Cependant, si je décommente la ligne commentée, la sortie est:
Cette fois, le flux parallèle a utilisé 20 threads et les 20 éléments du flux ont été traités simultanément.
la source
commonPool
a en fait un de moins queavailableProcessors
, ce qui entraîne un parallélisme total égal àavailableProcessors
parce que le thread appelant compte comme un.ForkJoinTask
. Pour imiterparallel()
get()
est nécessaire:stream.parallel().forEach(soSomething)).get();
ForkJoinPool.submit(() -> stream.forEach(...))
mes actions Stream seront exécutées avec le donnéForkJoinPool
. Je m'attendrais à ce que l'ensemble Stream-Action soit exécuté dans le ForJoinPool comme une action, mais en utilisant toujours en interne le ForkJoinPool par défaut / commun. Où avez-vous vu que ForkJoinPool.submit () ferait ce que vous dites?Alternativement à l'astuce de déclenchement du calcul parallèle dans votre propre forkJoinPool, vous pouvez également passer ce pool à la méthode CompletableFuture.supplyAsync comme dans:
la source
La solution d'origine (définition de la propriété de parallélisme commun ForkJoinPool) ne fonctionne plus. En regardant les liens dans la réponse d'origine, une mise à jour qui rompt cela a été renvoyée vers Java 8. Comme mentionné dans les threads liés, cette solution n'était pas garantie de fonctionner indéfiniment. Sur cette base, la solution est la solution forkjoinpool.submit with .get discutée dans la réponse acceptée. Je pense que le backport corrige également le manque de fiabilité de cette solution.
la source
ForkJoinPool.commonPool().getParallelism()
en mode débogage.unreported exception InterruptedException; must be caught or declared to be thrown
même avec toutes lescatch
exceptions dans la boucle.Nous pouvons changer le parallélisme par défaut en utilisant la propriété suivante:
qui peut configurer pour utiliser plus de parallélisme.
la source
Pour mesurer le nombre réel de threads utilisés, vous pouvez vérifier
Thread.activeCount()
:Cela peut produire sur un processeur à 4 cœurs une sortie comme:
Sans
.parallel()
cela donne:la source
Jusqu'à présent, j'ai utilisé les solutions décrites dans les réponses à cette question. Maintenant, j'ai trouvé une petite bibliothèque appelée Parallel Stream Support pour cela:
Mais comme @PabloMatiasGomez l'a souligné dans les commentaires, il existe des inconvénients concernant le mécanisme de fractionnement des flux parallèles qui dépend fortement de la taille du pool commun. Voir Le flux parallèle à partir d'un HashSet ne s'exécute pas en parallèle .
J'utilise cette solution uniquement pour avoir des pools distincts pour différents types de travail, mais je ne peux pas définir la taille du pool commun à 1 même si je ne l'utilise pas.
la source
Remarque: Il semble y avoir un correctif implémenté dans JDK 10 qui garantit que le pool de threads personnalisé utilise le nombre attendu de threads.
L'exécution de flux parallèle dans un ForkJoinPool personnalisé doit obéir au parallélisme https://bugs.openjdk.java.net/browse/JDK-8190974
la source
J'ai essayé le ForkJoinPool personnalisé comme suit pour ajuster la taille de la piscine:
Voici la sortie indiquant que le pool utilise plus de threads que la valeur par défaut 4 .
Mais en fait, il y a un bizarre , quand j'ai essayé d'obtenir le même résultat en utilisant
ThreadPoolExecutor
comme suit:mais j'ai échoué.
Il ne démarrera le parallelStream que dans un nouveau thread, puis tout le reste sera le même, ce qui prouve à nouveau que
parallelStream
va utiliser ForkJoinPool pour démarrer ses threads enfants.la source
Allez chercher AbacusUtil . Le numéro de thread peut être spécifié pour le flux parallèle. Voici l exemple de code:
Divulgation : Je suis le développeur d'AbacusUtil.
la source
Si vous ne voulez pas vous fier aux hacks d'implémentation, il existe toujours un moyen d'y parvenir en implémentant des collecteurs personnalisés qui combineront
map
etcollect
sémantiques ... et vous ne serez pas limité à ForkJoinPool:Heureusement, c'est déjà fait ici et disponible sur Maven Central: http://github.com/pivovarit/parallel-collectors
Avertissement: je l'ai écrit et j'en assume la responsabilité.
la source
Si cela ne vous dérange pas d'utiliser une bibliothèque tierce, avec cyclops-react vous pouvez mélanger des flux séquentiels et parallèles dans le même pipeline et fournir des ForkJoinPools personnalisés. Par exemple
Ou si nous souhaitions continuer le traitement dans un flux séquentiel
[Divulgation Je suis le développeur principal de cyclops-react]
la source
Si vous n'avez pas besoin d'un ThreadPool personnalisé mais que vous souhaitez plutôt limiter le nombre de tâches simultanées, vous pouvez utiliser:
(La question en double demandant cela est verrouillée, alors veuillez me porter ici)
la source
vous pouvez essayer d'implémenter ce ForkJoinWorkerThreadFactory et l'injecter à la classe Fork-Join.
vous pouvez utiliser ce constructeur de pool Fork-Join pour ce faire.
notes: - 1. si vous utilisez ceci, tenez compte du fait qu'en fonction de votre implémentation de nouveaux threads, la planification à partir de la JVM sera affectée, ce qui planifie généralement les threads de jointure en fourche sur différents cœurs (traités comme un thread de calcul). 2. La planification des tâches par fork-join aux threads ne sera pas affectée. 3. Je n'ai pas vraiment compris comment le flux parallèle sélectionne les threads à partir de la jointure en fourche (impossible de trouver la documentation appropriée), alors essayez d'utiliser une autre usine threadNaming afin de vous assurer que si les threads du flux parallèle sont sélectionnés de customThreadFactory que vous fournissez. 4. commonThreadPool n'utilisera pas cette customThreadFactory.
la source