Observable vs fluide rxJava2

128

J'ai regardé le nouveau rx java 2 et je ne suis pas tout à fait sûr de comprendre l'idée de backpressureplus ...

Je sais que nous avons Observablequi n'a pas de backpressuresoutien et Flowablequi en a.

Donc, basé sur l'exemple, disons que j'ai flowableavec interval:

        Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

Cela va planter après environ 128 valeurs, et c'est assez évident que je consomme plus lentement que d'obtenir des articles.

Mais alors nous avons la même chose avec Observable

     Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

Cela ne plantera pas du tout, même si je tarde à le consommer, cela fonctionne toujours. Pour faire du Flowabletravail, disons que je mets un onBackpressureDropopérateur, le crash est parti mais toutes les valeurs ne sont pas non plus émises.

Donc, la question de base à laquelle je ne trouve pas de réponse actuellement dans ma tête est pourquoi devrais-je me soucier du backpressuremoment où je peux utiliser plain quand Observablemême recevoir toutes les valeurs sans gérer le buffer? Ou peut-être de l'autre côté, quels avantages backpressureme procurent la gestion et la gestion de la consommation?

user2141889
la source

Réponses:

123

Ce que la contre-pression se manifeste dans la pratique, ce sont des tampons limités, Flowable.observeOnun tampon de 128 éléments qui est drainé aussi vite que le courant aval peut le prendre. Vous pouvez augmenter cette taille de tampon individuellement pour gérer la source en rafale et toutes les pratiques de gestion de la contre-pression s'appliquent toujours à partir de 1.x. Observable.observeOna un tampon illimité qui continue de collecter les éléments et votre application peut manquer de mémoire.

Vous pouvez utiliser Observablepar exemple:

  • gestion des événements GUI
  • travailler avec des séquences courtes (moins de 1000 éléments au total)

Vous pouvez utiliser Flowablepar exemple:

  • sources froides et non chronométrées
  • générateur comme des sources
  • accesseurs réseau et base de données
Akarnokd
la source
Puisque cela a été soulevé dans une autre question - est-il correct que des types plus restreints aiment Maybe, Singleet Completablepuissent toujours être utilisés à la place de Flowablequand ils sont sémantiquement appropriés?
david.mihola
1
Oui, Maybe, Singleet Completablesont beaucoup trop petites pour avoir besoin du concept de contre - pression. Il n'y a aucune chance qu'un producteur puisse émettre des articles plus rapidement qu'ils ne peuvent être consommés, puisque 0 à 1 article sera jamais produit ou consommé.
AndrewF
Peut-être que je n'ai pas raison, mais pour moi, les exemples de Flowable et Observable devraient être échangés.
Yura Galavay
Je pense que dans la question, il manque la stratégie de contre-pression que nous devons fournir au Flowable, ce qui explique pourquoi l'exception de contre-pression manquante est levée, explique également pourquoi cette exception disparaît après avoir appliqué .onBackpressureDrop (). Et pour Observable, comme il n'a pas cette stratégie et ne peut pas en recevoir une, il échouera tout simplement plus tard en raison du MOO
Haomin
111

La contre-pression se produit lorsque votre observable (éditeur) crée plus d'événements que votre abonné ne peut gérer. Ainsi, vous pouvez obtenir des événements manquants pour les abonnés, ou vous pouvez obtenir une énorme file d'attente d'événements qui finissent par entraîner une perte de mémoire. Flowableprend en compte la contre-pression. Observablene fait pas. C'est tout.

cela me rappelle un entonnoir qui, lorsqu'il a trop de liquide, déborde. Flowable peut aider à ne pas y arriver:

avec une contre-pression énorme:

entrez la description de l'image ici

mais avec l'utilisation de fluide, il y a beaucoup moins de contre-pression:

entrez la description de l'image ici

Rxjava2 a quelques stratégies de contre-pression que vous pouvez utiliser en fonction de votre cas d'utilisation. par stratégie, je veux dire que Rxjava2 fournit un moyen de gérer les objets qui ne peuvent pas être traités en raison du débordement (contre-pression).

voici les stratégies. Je ne les passerai pas tous en revue, mais par exemple, si vous ne voulez pas vous soucier des objets qui sont débordés, vous pouvez utiliser une stratégie de dépôt comme celle-ci:

observable.toFlowable (BackpressureStrategy.DROP)

Autant que je sache, il devrait y avoir une limite de 128 éléments dans la file d'attente, après cela, il peut y avoir un débordement (contre-pression). Même si ce n'est pas 128, c'est proche de ce nombre. J'espère que cela aide quelqu'un.

si vous avez besoin de changer la taille de la mémoire tampon de 128, il semble que cela puisse être fait comme ceci (mais attention aux contraintes de mémoire:

myObservable.toFlowable(BackpressureStrategy.MISSING).buffer(256); //but using MISSING might be slower.  

dans le développement de logiciels, une stratégie de contre-pression signifie généralement que vous dites à l'émetteur de ralentir un peu car le consommateur ne peut pas gérer la vitesse de vos événements d'émission.

j2emanue
la source
J'ai toujours pensé que la contre-pression était le nom d'une famille de mécanismes qui permettraient au consommateur d'informer le producteur de ralentir ...
kboom
Cela pourrait être le cas. Oui
j2emanue
Y a-t-il des inconvénients à utiliser un Flowable?
IgorGanapolsky
Ces images me mentent. L'abandon d'événements ne se terminera pas par «plus d'argent» en bas.
EpicPandaForce
1
@ j2emanue, vous confondez la taille du tampon pour les opérateurs et l'opérateur Flowable.buffer (int). Veuillez lire les javadocs cafefully et corrigez votre réponse en conséquence: reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html
tomek
15

Le fait que votre Flowableplantage après avoir émis 128 valeurs sans gestion de la contre-pression ne signifie pas qu'il plantera toujours après exactement 128 valeurs: parfois il plantera après 10, et parfois il ne plantera pas du tout. Je crois que c'est ce qui s'est passé lorsque vous avez essayé l'exemple avec Observable- il s'est avéré qu'il n'y avait pas de contre-pression, donc votre code fonctionnait normalement, la prochaine fois ce ne sera peut-être pas le cas. La différence dans RxJava 2 est qu'il n'y a plus de concept de contre-pression dans Observables, et aucun moyen de la gérer. Si vous concevez une séquence réactive qui nécessitera probablement une gestion explicite de la contre-pression, Flowablec'est votre meilleur choix.

Egor
la source
Oui, j'ai observé que parfois il cassait après moins de valeurs, parfois non. Mais encore une fois si, par exemple, je ne gère que intervalsans est backpressure-ce que je m'attendrais à un comportement ou à des problèmes étranges?
user2141889
Si vous êtes sûr qu'il n'y a aucun moyen que des problèmes de contre-pression puissent survenir dans une séquence Observable spécifique - alors je suppose que c'est bien d'ignorer la contre-pression.
Egor