Différence entre les flux Java 8 et les observables RxJava

144

Les flux Java 8 sont-ils similaires aux observables RxJava?

Définition du flux Java 8:

Les classes du nouveau java.util.streampackage fournissent une API Stream pour prendre en charge les opérations de style fonctionnel sur les flux d'éléments.

rahulrv
la source
8
Pour info, il y a des propositions pour introduire plus de classes comme RxJava dans JDK 9. jsr166-concurrency.10961.n7.nabble.com
John Vint
@JohnVint Quel est le statut de cette proposition. Va-t-il réellement prendre son envol?
IgorGanapolsky
2
@IgorGanapolsky Oh oui, il semble que cela deviendra jdk9. cr.openjdk.java.net/~martin/webrevs/openjdk9/… . Il existe même un port pour RxJava vers Flow github.com/akarnokd/RxJavaUtilConcurrentFlow .
John Vint
Je sais que c'est une question très ancienne, mais j'ai récemment assisté à cette excellente conférence de Venkat Subramaniam qui a une vision perspicace du sujet et est mise à jour vers Java9: youtube.com/watch?v=kfSSKM9y_0E . Cela pourrait être intéressant pour les personnes qui explorent RxJava.
Pedro

Réponses:

152

TL; DR : Toutes les bibliothèques de traitement de séquence / flux offrent une API très similaire pour la construction de pipelines. Les différences résident dans l'API pour la gestion du multi-threading et la composition des pipelines.

RxJava est assez différent de Stream. De toutes les choses JDK, le plus proche de rx.Observable est peut-être java.util.stream.Collector Stream + CompletableFuture combo (qui a un coût de traiter une couche monade supplémentaire, c'est-à-dire d'avoir à gérer la conversion entreStream<CompletableFuture<T>> et CompletableFuture<Stream<T>>).

Il existe des différences significatives entre Observable et Stream:

  • Les flux sont basés sur l'extraction, les observables sont basés sur le push. Cela peut sembler trop abstrait, mais cela a des conséquences importantes qui sont très concrètes.
  • Stream ne peut être utilisé qu'une seule fois, Observable peut être abonné plusieurs fois
  • Stream#parallel()divise la séquence en partitions, Observable#subscribeOn()et Observable#observeOn()ne le fait pas; il est difficile d'émuler le Stream#parallel()comportement avec Observable, il y avait autrefois une .parallel()méthode mais cette méthode a causé tellement de confusion que le .parallel()support a été déplacé vers un référentiel séparé sur github, RxJavaParallel. Plus de détails sont dans une autre réponse .
  • Stream#parallel()ne permet pas de spécifier un pool de threads à utiliser, contrairement à la plupart des méthodes RxJava acceptant le Scheduler optionnel. Étant donné que toutes les instances de flux dans une machine virtuelle Java utilisent le même pool de fork-join, l'ajout .parallel()peut affecter accidentellement le comportement dans un autre module de votre programme
  • Les flux manquent d'opérations liées au temps comme Observable#interval(), Observable#window()et bien d'autres; ceci est principalement dû au fait que les flux sont basés sur l'extraction et que l'amont n'a aucun contrôle sur le moment d'émettre l'élément suivant en aval
  • Les flux offrent un ensemble restreint d'opérations par rapport à RxJava. Par exemple, les flux manquent d'opérations de coupure ( takeWhile(), takeUntil()); la solution de contournement Stream#anyMatch()est limitée: il s'agit d'une opération de terminal, vous ne pouvez donc pas l'utiliser plus d'une fois par flux
  • À partir de JDK 8, il n'y a pas d'opération Stream # zip, ce qui est parfois très utile
  • Les flux sont difficiles à construire par vous-même, Observable peut être construit de plusieurs manières EDIT: Comme indiqué dans les commentaires, il existe des moyens de construire Stream. Cependant, comme il n'y a pas de court-circuit non terminal, vous ne pouvez pas, par exemple, générer facilement un flux de lignes dans un fichier (JDK fournit des lignes Files # et BufferedReader # prêtes à l'emploi, et d'autres scénarios similaires peuvent être gérés en construisant Stream de Iterator).
  • Observable offre une fonction de gestion des ressources ( Observable#using()); vous pouvez envelopper le flux IO ou le mutex avec et être sûr que l'utilisateur n'oubliera pas de libérer la ressource - elle sera supprimée automatiquement à la résiliation de l'abonnement; Stream a une onClose(Runnable)méthode, mais vous devez l'appeler manuellement ou via try-with-resources. Par exemple. vous devez garder à l'esprit que Files # lines () doit être inclus dans le bloc try-with-resources.
  • Les observables sont synchronisés tout au long (je n'ai pas vérifié si la même chose est vraie pour les flux). Cela vous évite de vous demander si les opérations de base sont sûres pour les threads (la réponse est toujours `` oui '', sauf s'il y a un bogue), mais la surcharge liée à la concurrence sera là, peu importe si votre code en a besoin ou non.

Résumé: RxJava diffère considérablement des Streams. Les alternatives réelles à RxJava sont d'autres implémentations de ReactiveStreams , par exemple une partie pertinente d'Akka.

Mettre à jour . Il existe une astuce pour utiliser un pool de fork-join non par défaut pour Stream#parallel, voir Pool de threads personnalisé dans le flux parallèle Java 8

Mettre à jour . Tout ce qui précède est basé sur l'expérience avec RxJava 1.x. Maintenant que RxJava 2.x est ici , cette réponse est peut-être obsolète.

Kirill Gamazkov
la source
2
Pourquoi les flux sont-ils difficiles à construire? D'après cet article, cela semble facile: oracle.com/technetwork/articles/java/…
IgorGanapolsky
2
Il existe un certain nombre de classes qui ont une méthode «stream»: collections, flux d'entrée, fichiers de répertoire, etc. Le meilleur moyen que j'ai trouvé jusqu'à présent est de créer un Iterator, de l'envelopper avec Spliterator et enfin d'appeler StreamSupport # fromSpliterator. Trop de colle pour un cas simple IMHO. Il y a aussi Stream.iterate mais il produit un flux infini. Le seul moyen de couper le flux dans ce cas est Stream # anyMatch, mais c'est une opération de terminal, vous ne pouvez donc pas séparer le producteur de flux et le consommateur
Kirill Gamazkov
2
RxJava a Observable.fromCallable, Observable.create et ainsi de suite. Ou vous pouvez produire en toute sécurité Observable infini, puis dites `` .takeWhile (condition) '', et vous êtes d'accord pour envoyer cette séquence aux consommateurs
Kirill Gamazkov
1
Les flux ne sont pas difficiles à construire par vous-même. Vous pouvez simplement appeler Stream.generate()et transmettre votre propre Supplier<U>implémentation, une seule méthode simple à partir de laquelle vous fournissez l'élément suivant dans le flux. Il existe de nombreuses autres méthodes. Pour construire facilement une séquence Streamqui dépend des valeurs précédentes, vous pouvez utiliser la interate()méthode, chacun Collectiona une stream()méthode et Stream.of()construit un à Streampartir d'un varargs ou d'un tableau. Enfin, StreamSupportprend en charge la création de flux plus avancée à l'aide de séparateurs ou de types primitifs de flux.
jbx
"Les flux manquent d'opérations de coupure ( takeWhile(), takeUntil());" - JDK9 a ces derniers, je crois, dans takeWhile () et dropWhile ()
Abdul
50

Java 8 Stream et RxJava sont assez similaires. Ils ont des opérateurs qui se ressemblent (filter, map, flatMap ...) mais ne sont pas construits pour le même usage.

Vous pouvez effectuer des tâches asynchrones en utilisant RxJava.

Avec le flux Java 8, vous parcourez les éléments de votre collection.

Vous pouvez faire à peu près la même chose dans RxJava (traverser les éléments d'une collection) mais, comme RxJava se concentre sur la tâche simultanée, ..., il utilise la synchronisation, le verrouillage, ... Donc la même tâche utilisant RxJava peut être plus lente que avec le flux Java 8.

RxJava peut être comparé à CompletableFuture, mais cela peut être capable de calculer plus d'une valeur.

dwursteisen
la source
12
Il est intéressant de noter que votre déclaration sur la traversée de flux n'est vraie que pour un flux non parallèle. parallelStreamprend en charge la synchronisation similaire des traversées / cartes / filtrage simples, etc.
John Vint
2
Je ne pense pas que "la même tâche avec RxJava peut être plus lente qu'avec le flux Java 8". est vrai universellement, cela dépend fortement de la tâche à accomplir.
daschl
1
Je suis heureux que vous ayez dit que la même tâche en utilisant RxJava peut être plus lente qu'avec le flux Java 8 . C'est une distinction très importante dont de nombreux utilisateurs de RxJava ne sont pas conscients.
IgorGanapolsky
RxJava est synchrone par défaut. Avez-vous des points de repère pour étayer votre affirmation selon laquelle cela pourrait être plus lent?
Marcin Koziński
6
@ marcin-koziński, vous pouvez vérifier ce benchmark: twitter.com/akarnokd/status/752465265091309568
dwursteisen
37

Il existe quelques différences techniques et conceptuelles, par exemple, les flux Java 8 sont des séquences de valeurs synchrones à usage unique, basées sur l'extraction, tandis que les observables RxJava sont des séquences de valeurs ré-observables, basées sur le push-pull, potentiellement asynchrones. RxJava est destiné à Java 6+ et fonctionne également sur Android.

Akarnokd
la source
4
Le code typique impliquant RxJava fait un usage intensif des lambdas qui ne sont disponibles qu'à partir de Java 8. Vous pouvez donc utiliser Rx avec Java 6, mais le code sera bruyant
Kirill Gamazkov
1
Une distinction similaire est que les observables Rx peuvent rester en vie indéfiniment jusqu'à ce qu'ils soient désabonnés. Les flux Java 8 se terminent par des opérations par défaut.
IgorGanapolsky
2
@KirillGamazkov, vous pouvez utiliser retrolambda pour rendre votre code plus joli lorsque vous ciblez Java 6.
Marcin Koziński
Kotlin a l'air encore plus sexy que la rénovation
Kirill Gamazkov
30

Les flux Java 8 sont basés sur l'extraction. Vous itérez sur un flux Java 8 consommant chaque élément. Et cela pourrait être un flux sans fin.

RXJava Observableest par défaut basé sur le push. Vous vous abonnez à un Observable et vous serez averti lorsque le prochain élément arrivera ( onNext), ou lorsque le flux est terminé ( onCompleted), ou lorsqu'une erreur s'est produite ( onError). Parce qu'avec Observablevous recevez onNext, onCompleted, les onErrorévénements, vous pouvez faire quelques fonctions puissantes comme la combinaison de différentes Observables à un nouveau ( zip, merge,concat ). Vous pouvez également faire de la mise en cache, de la limitation, ... et il utilise plus ou moins la même API dans différentes langues (RxJava, RX en C #, RxJS, ...)

Par défaut, RxJava est à thread unique. À moins que vous ne commenciez à utiliser les planificateurs, tout se passera sur le même fil.

Bart De Neuter
la source
dans Stream vous avez forEach, c'est à peu près la même chose que onNext
paul
En fait, les flux sont généralement terminaux. «Les opérations qui ferment un pipeline de flux sont appelées des opérations de terminal. Elles produisent un résultat à partir d'un pipeline tel qu'un List, un Integer ou même void (tout type non-Stream).» ~ oracle.com/technetwork/articles/java/…
IgorGanapolsky
26

Les réponses existantes sont complètes et correctes, mais il manque un exemple clair pour les débutants. Permettez-moi de mettre du concret derrière des termes tels que «basé sur le push / pull» et «ré-observable». Remarque : je déteste le termeObservable (c'est un flux pour l'amour du ciel), je ferai donc simplement référence aux flux J8 vs RX.

Considérons une liste d'entiers,

digits = [1,2,3,4,5]

Un J8 Stream est un utilitaire pour modifier la collection. Par exemple, même les chiffres peuvent être extraits comme suit:

evens = digits.stream().filter(x -> x%2).collect(Collectors.toList())

Il s'agit essentiellement de la carte de Python , filtrer, réduire , un très bel ajout (et attendu depuis longtemps) à Java. Mais que se passerait-il si les chiffres n'étaient pas collectés à l'avance - et si les chiffres étaient diffusés pendant que l'application était en cours d'exécution - pourrions-nous filtrer les paires en temps réel.

Imaginez qu'un processus de thread distinct génère des nombres entiers à des moments aléatoires pendant que l'application est en cours d'exécution ( ---indique le temps)

digits = 12345---6------7--8--9-10--------11--12

Dans RX, evenpeut réagir à chaque nouveau chiffre et appliquer le filtre en temps réel

even = -2-4-----6---------8----10------------12

Il n'est pas nécessaire de stocker des listes d'entrée et de sortie. Si vous voulez une liste de sortie, pas de problème qui soit également diffusable. En fait, tout est un flux.

evens_stored = even.collect()  

C'est pourquoi des termes comme «sans état» et «fonctionnel» sont davantage associés à RX

Adam Hughes
la source
Mais 5 n'est même pas… Et cela ressemble à J8 Stream est synchrone, alors que Rx Stream est asynchrone?
Franklin Yu
1
@FranklinYu merci j'ai corrigé la 5 faute de frappe. Si pensez moins en termes de synchrone vs asyncrhouns, bien que cela puisse être correct, et plus en termes d'impératif vs fonctionnel. Dans J8, vous collectez d'abord tous vos éléments, puis appliquez le filtre en second. Dans RX, vous définissez la fonction de filtrage indépendamment des données, puis vous l'associez à une source paire (un flux en direct ou une collection java) ... c'est un modèle de programmation entièrement différent
Adam Hughes
Je suis très surpris par cela. Je suis presque sûr que les flux Java peuvent être constitués de flux de données. Qu'est-ce qui vous fait penser le contraire?
Vic Seedoubleyew
4

RxJava est également étroitement lié à l' initiative de flux réactifs et se considère comme une simple implémentation de l'API de flux réactifs (par exemple par rapport à l' implémentation de flux Akka ). La principale différence est que les flux réactifs sont conçus pour pouvoir gérer la contre-pression, mais si vous regardez la page des flux réactifs, vous aurez l'idée. Ils décrivent assez bien leurs objectifs et les flux sont également étroitement liés au manifeste réactif .

Les flux Java 8 sont à peu près l'implémentation d'une collection illimitée, assez similaire au Scala Stream ou au Clojure paresseux seq .

Niclas Meier
la source
3

Les flux Java 8 permettent de traiter efficacement de très grandes collections, tout en tirant parti des architectures multicœurs. En revanche, RxJava est monothread par défaut (sans Schedulers). Ainsi, RxJava ne profitera pas des machines multicœurs à moins que vous ne codiez vous-même cette logique.

IgorGanapolsky
la source
4
Stream est également monothread par défaut, sauf si vous appelez .parallel (). En outre, Rx donne plus de contrôle sur la concurrence.
Kirill Gamazkov
@KirillGamazkov Kotlin Coroutines Flow (basé sur Java8 Streams) prend désormais en charge la concurrence structurée: kotlinlang.org/docs/reference/coroutines/flow.html#flows
IgorGanapolsky
C'est vrai, mais je n'ai rien dit sur Flow et la concurrence structurée. Mes deux points étaient: 1) Stream et Rx sont à thread unique à moins que vous ne changiez explicitement cela; 2) Rx vous donne un contrôle précis sur quelle étape effectuer sur quel pool de threads, contrairement à Streams qui vous permet seulement de dire "faites-le en parallèle d'une manière ou d'une autre"
Kirill Gamazkov
Je ne comprends pas vraiment le point de la question "pour quoi avez-vous besoin d'un pool de threads". Comme vous l'avez dit, "pour permettre le traitement efficace de très grandes collections". Ou peut-être que je veux qu'une partie de la tâche liée aux E / S s'exécute sur un pool de threads séparé. Je ne pense pas avoir compris l'intention derrière votre question. Réessayer?
Kirill Gamazkov
1
Les méthodes statiques de la classe Schedulers permettent d'obtenir des pools de threads prédéfinis et d'en créer un à partir d'Executor. Voir reactivex.io/RxJava/2.x/javadoc/io/reactivex/schedulers/…
Kirill Gamazkov le