RxJS: Comment mettre à jour «manuellement» un observable?

139

Je pense que je dois mal comprendre quelque chose de fondamental, car dans mon esprit, cela devrait être le cas le plus élémentaire pour un observable, mais pour la vie de mon, je ne peux pas comprendre comment le faire à partir de la documentation.

Fondamentalement, je veux pouvoir faire ceci:

// create a dummy observable, which I would update manually
var eventObservable = rx.Observable.create(function(observer){});
var observer = eventObservable.subscribe(
   function(x){
     console.log('next: ' + x);
   }
...
var my_function = function(){
  eventObservable.push('foo'); 
  //'push' adds an event to the datastream, the observer gets it and prints 
  // next: foo
}

Mais je n'ai pas pu trouver une méthode comme push. J'utilise ceci pour un gestionnaire de clics, et je sais qu'ils l'ont fait Observable.fromEventpour cela, mais j'essaie de l'utiliser avec React et je préfère pouvoir simplement mettre à jour le flux de données dans un rappel, au lieu d'utiliser un complètement différent système de gestion d'événements. Donc, fondamentalement, je veux ceci:

$( "#target" ).click(function(e) {
  eventObservable.push(e.target.text()); 
});

Le plus proche que j'ai obtenu était l'utilisation observer.onNext('foo'), mais cela ne semblait pas fonctionner et cela a été appelé l'observateur, ce qui ne semble pas correct. L'observateur devrait être la chose qui réagit au flux de données, pas le changer, non?

Est-ce que je ne comprends tout simplement pas la relation observateur / observable?

LiamD
la source
Jetez un œil à ceci pour clarifier votre idée (L'introduction à la programmation réactive vous manque): gist.github.com/staltz/868e7e9bc2a7b8c1f754 . Ici aussi, il y a un tas de ressources à partir desquelles vous pouvez améliorer votre compréhension: github.com/Reactive-Extensions/RxJS#resources
user3743222
J'avais vérifié le premier, semble être une ressource solide. La deuxième est une excellente liste, sur laquelle j'ai trouvé aaronstacy.com/writings/reactive-programming-and-mvc qui m'a aidé à découvrir Rx.Subject, qui résout mon problème. Donc merci! Une fois que j'aurai écrit un peu plus d'application, je publierai ma solution, je veux juste la tester un peu.
LiamD
Hehe, merci beaucoup d'avoir posé cette question, j'étais sur le point de poser la même question avec le même exemple de code en tête :-)
arturh

Réponses:

154

Dans RX, Observer et Observable sont des entités distinctes. Un observateur souscrit à un observable. Un observable émet des éléments à ses observateurs en appelant les méthodes des observateurs. Si vous devez appeler les méthodes d'observateur en dehors de la portée deObservable.create() vous pouvez utiliser un objet, qui est un proxy qui agit en même temps comme observateur et observable.

Vous pouvez faire comme ceci:

var eventStream = new Rx.Subject();

var subscription = eventStream.subscribe(
   function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

var my_function = function() {
  eventStream.next('foo'); 
}

Vous pouvez trouver plus d'informations sur les sujets ici:

Luisgabriel
la source
1
C'est en fait exactement ce que j'ai fini par faire! J'ai continué à travailler dessus pour voir si je pouvais trouver une meilleure façon de faire ce que j'avais à faire, mais c'est définitivement une solution viable. Je l'ai vu en premier dans cet article: aaronstacy.com/writings/reactive-programming-and-mvc .
LiamD
34

Je crois Observable.create()que ne prend pas un observateur comme paramètre de rappel, mais un émetteur. Donc, si vous souhaitez ajouter une nouvelle valeur à votre Observable, essayez plutôt ceci:

var emitter;
var observable = Rx.Observable.create(e => emitter = e);
var observer = {
  next: function(next) {
    console.log(next);
  },
  error: function(error) {
    console.log(error);
  },
  complete: function() {
    console.log("done");
  }
}
observable.subscribe(observer);
emitter.next('foo');
emitter.next('bar');
emitter.next('baz');
emitter.complete();

//console output
//"foo"
//"bar"
//"baz"
//"done"

Oui Subject facilite les choses, en fournissant Observable et Observer dans le même objet, mais ce n'est pas exactement la même chose, car Subject vous permet d'abonner plusieurs observateurs au même observable lorsqu'un observable n'envoie des données qu'au dernier observateur abonné, alors utilisez-le consciemment . Voici un JsBin si vous voulez le bricoler.

theFreedomBanane
la source
la possibilité d'écraser la propriété de l'émetteur est-elle documentée quelque part dans les manuels RxJS?
Tomas
Dans ce cas emitter, seules les next()nouvelles valeurs pour l'observateur qui a souscrit le dernier. Une meilleure approche serait de collecter tous les emitters dans un tableau et de les parcourir tous et nextla valeur sur chacun d'eux
Eric Gopak