Cas d'utilisation des planificateurs RxJava

253

Dans RxJava, vous avez le choix entre 5 planificateurs différents :

  1. immediate () : crée et renvoie un planificateur qui exécute le travail immédiatement sur le thread actuel.

  2. trampoline () : crée et renvoie un planificateur qui met en file d'attente le travail sur le thread en cours à exécuter une fois le travail en cours terminé.

  3. newThread () : crée et renvoie un planificateur qui crée un nouveau thread pour chaque unité de travail.

  4. computation () : crée et renvoie un planificateur destiné au travail de calcul. Cela peut être utilisé pour les boucles d'événements, le traitement des rappels et d'autres travaux de calcul. N'effectuez pas de travail lié aux E / S sur ce planificateur. Utilisez des planificateurs. io () à la place.

  5. io () : crée et renvoie un planificateur destiné au travail lié aux E / S. L'implémentation est soutenue par un pool de threads Executor qui augmentera selon les besoins. Cela peut être utilisé pour effectuer des E / S de blocage de manière asynchrone. N'effectuez pas de travail de calcul sur ce planificateur. Utilisez des planificateurs. calcul () à la place.

Des questions:

Les 3 premiers ordonnanceurs sont assez explicites; cependant, je suis un peu confus au sujet du calcul et de io .

  1. Qu'est-ce que le "travail lié aux entrées-sorties"? Est-il utilisé pour traiter les flux ( java.io) et les fichiers ( java.nio.files)? Est-il utilisé pour les requêtes de base de données? Est-il utilisé pour télécharger des fichiers ou accéder aux API REST?
  2. En quoi computation () est-elle différente de newThread () ? Est-ce que tout les appels computation () sont sur un seul thread (en arrière-plan) au lieu d'un nouveau thread (en arrière-plan) à chaque fois?
  3. Pourquoi est-il mauvais d'appeler computation () lors d'un travail IO?
  4. Pourquoi est-il mauvais d'appeler io () lors d'un travail de calcul?
bcorso
la source

Réponses:

332

Grandes questions, je pense que la documentation pourrait faire plus de détails.

  1. io()est soutenu par un pool de threads illimité et est le genre de chose que vous utiliseriez pour des tâches à forte intensité de calcul, c'est-à-dire des choses qui ne mettent pas beaucoup de charge sur le processeur. L'interaction avec le système de fichiers, l'interaction avec des bases de données ou des services sur un hôte différent en sont de bons exemples.
  2. computation()est soutenu par un pool de threads limité avec une taille égale au nombre de processeurs disponibles. Si vous avez essayé de planifier un travail intensif en CPU en parallèle sur plus de processeurs disponibles (par exemple, en utilisantnewThread() ), alors vous êtes prêt pour la surcharge de création de threads et la surcharge de changement de contexte car les threads rivalisent pour un processeur et c'est potentiellement un gros coup de performances.
  3. Il vaut mieux partir computation() pour un travail intensif que si vous n'obtiendrez pas une bonne utilisation du processeur.
  4. Il est mauvais d'appeler io()pour un travail de calcul pour la raison discutée en 2. io()n'est pas borné et si vous planifiez un millier de tâches de calcul io()en parallèle, chacune de ces mille tâches aura chacune son propre thread et sera en concurrence pour le CPU entraînant des coûts de changement de contexte.
Dave Moten
la source
5
Grâce à la familiarité avec la source RxJava. Cela a été une source de confusion pour moi pendant longtemps et je pense que la documentation doit être renforcée à cet égard.
Dave Moten
2
@IgorGanapolsky Je suppose que c'est quelque chose que vous voudriez rarement faire. La création d'un nouveau fil pour chaque unité de travail est rarement propice à l'efficacité car les fils sont coûteux à construire et à démonter. Vous voulez généralement réutiliser les threads que font computation () et d'autres ordonnanceurs. Le seul moment où newThread () pourrait avoir une utilisation légitime (du moins je peux y penser) est de lancer des tâches isolées, peu fréquentes et de longue durée. Même alors, je pourrais utiliser io () pour ce scénario.
2015
4
Pourriez-vous montrer un exemple où trampoline () serait utile? Je comprends le concept mais je ne peux pas imaginer un scénario que je l'utiliserais dans la pratique. C'est le seul planificateur qui reste un mystère pour moi
tmn
32
Pour les appels réseau, utilisez Schedulers.io () et si vous devez limiter le nombre d'appels réseau simultanés, utilisez Scheduler.from (Executors.newFixedThreadPool (n)).
Dave Moten
4
Vous pourriez penser que mettre timeoutpar défaut sur computation()vous bloquerait un thread mais ce n'est pas le cas. Sous les couvertures computation()utilise une ScheduledExecutorServicesorte de temps ne bloquent mesures ont été retardées pas. Étant donné que ce fait computation()est une bonne idée, car si c'était sur un autre thread, nous serions soumis à des frais de changement de thread.
Dave Moten
3

Le point le plus important est que Schedulers.io et Schedulers.computation sont soutenus par des pools de threads illimités, contrairement aux autres mentionnés dans la question. Cette caractéristique n'est partagée par Schedulers.from (exécuteur) que dans le cas où l' exécuteur est créé avec newCachedThreadPool (illimité avec un pool de threads à récupération automatique).

Comme abondamment expliqué dans les réponses précédentes et plusieurs articles sur le Web, Schedulers.io et Schedulers.computation doivent être utilisés avec soin car ils sont optimisés pour le type de travail en leur nom. Mais, à mon avis, leur rôle le plus important est de fournir une réelle concurrence aux flux réactifs .

Contrairement à la croyance des nouveaux arrivants, les flux réactifs ne sont pas intrinsèquement concurrents mais intrinsèquement asynchrones et séquentiels. Pour cette même raison, Schedulers.io ne doit être utilisé que lorsque l'opération d'E / S est bloquante (par exemple: en utilisant une commande de blocage telle que Apache IOUtils FileUtils.readFileAsString (...) ) gèlerait ainsi le thread appelant jusqu'à ce que l'opération soit terminé.

L'utilisation d'une méthode asynchrone telle que Java AsynchronousFileChannel (...) ne bloquerait pas le thread appelant pendant l'opération, il est donc inutile d'utiliser un thread séparé. En fait, Schedulers.io threads ne conviennent pas vraiment aux opérations asynchrones car ils n'exécutent pas de boucle d'événement et le rappel ne serait jamais ... appelé.

La même logique s'applique pour l'accès à la base de données ou les appels d'API distants. N'utilisez pas Schedulers.io si vous pouvez utiliser une API asynchrone ou réactive pour passer l'appel.

Retour à la concurrence. Il se peut que vous n'ayez pas accès à une API asynchrone ou réactive pour effectuer des opérations d'E / S de manière asynchrone ou simultanée, donc votre seule alternative est de répartir plusieurs appels sur un thread séparé. Hélas, les flux réactifs sont séquentiels à leurs extrémités, mais la bonne nouvelle est que l' opérateur flatMap () peut introduire la concurrence dans leur cœur .

La concurrence doit être créée dans la construction de flux, généralement à l'aide de l' opérateur flatMap () . Cet opérateur puissant peut être configuré pour fournir en interne un contexte multithread à votre fonction intégrée flatMap () <T, R>. Ce contexte est fourni par un planificateur multithread tel que Scheduler.io ou Scheduler.computation .

Trouvez plus de détails dans les articles sur RxJava2 Schedulers et Concurrency où vous trouverez un exemple de code et des explications détaillées sur la façon d'utiliser les Schedulers séquentiellement et simultanément.

J'espère que cela t'aides,

Softjake

softjake
la source
2

Ce billet de blog fournit une excellente réponse

De l'article de blog:

Schedulers.io () est soutenu par un pool de threads illimité. Il est utilisé pour les travaux de type E / S non gourmands en ressources processeur, notamment l'interaction avec le système de fichiers, l'exécution d'appels réseau, les interactions de base de données, etc. Ce pool de threads est destiné à être utilisé pour effectuer des E / S de blocage de manière asynchrone.

Schedulers.computation () est soutenu par un pool de threads limité dont la taille peut aller jusqu'au nombre de processeurs disponibles. Il est utilisé pour les travaux de calcul ou gourmands en ressources processeur, tels que le redimensionnement d'images, le traitement de grands ensembles de données, etc. temps des processeurs.

Schedulers.newThread () crée un nouveau thread pour chaque unité de travail planifiée. Ce planificateur est coûteux car un nouveau thread est généré à chaque fois et aucune réutilisation ne se produit.

Schedulers.from (Executor executor) crée et renvoie un planificateur personnalisé soutenu par l'exécuteur spécifié. Pour limiter le nombre de threads simultanés dans le pool de threads, utilisez Scheduler.from (Executors.newFixedThreadPool (n)). Cela garantit que si une tâche est planifiée lorsque tous les threads sont occupés, elle sera mise en file d'attente. Les threads du pool existeront jusqu'à ce qu'il soit explicitement arrêté.

Le thread principal ou AndroidSchedulers.mainThread () est fourni par la bibliothèque d'extension RxAndroid à RxJava. Le thread principal (également appelé thread d'interface utilisateur) est l'endroit où l'interaction utilisateur se produit. Des précautions doivent être prises pour ne pas surcharger ce fil pour éviter une interface utilisateur non réactive janky ou, pire, Application Not Responding ”(ANR).

Schedulers.single () est nouveau dans RxJava 2. Ce planificateur est soutenu par un seul thread exécutant les tâches séquentiellement dans l'ordre demandé.

Schedulers.trampoline () exécute les tâches de manière FIFO (First In, First Out) par l'un des threads de travail participants. Il est souvent utilisé lors de l'implémentation de la récursivité pour éviter d'augmenter la pile d'appels.

Joe
la source