Le IObserver <T> de .NET était-il destiné à s'abonner à plusieurs IObservables?

9

Il existe des interfaces IObservable et IObserver dans .NET (également ici et ici ). Fait intéressant, l'implémentation concrète de l'IObserver ne contient pas de référence directe à l'IObservable. Il ne sait pas à qui il est abonné. Il ne peut invoquer que l'abonné. "Veuillez retirer l'épingle pour vous désinscrire."

edit: le désabonné implémente le IDisposable. Je pense que ce schéma a été utilisé pour éviter le problème de l'auditeur périmé .

Cependant, deux choses ne sont pas tout à fait claires pour moi.

  1. La classe Unsubscriber interne fournit-elle le comportement d'abonnement et d'oubli? Qui (et quand exactement) fait appel IDisposable.Dispose()au désabonné? Le garbage collector (GC) n'est pas déterministe.
    [Avertissement: dans l'ensemble, j'ai passé plus de temps avec C et C ++ qu'avec C #.]
  2. Que se passe-t-il si je veux abonner un observateur K à un L1 observable et que l'observateur est déjà abonné à un autre L2 observable?

    K.Subscribe(L1);
    K.Subscribe(L2);
    K.Unsubscribe();
    L1.PublishObservation(1003);
    L2.PublishObservation(1004);
    

    Lorsque j'ai exécuté ce code de test contre l'exemple de MSDN, l'observateur est resté abonné à L1. Ce serait particulier dans le développement réel. Potentiellement, il existe 3 pistes pour améliorer cela:

    • Si l'observateur a déjà une instance de désabonnement (c'est-à-dire qu'elle est déjà abonnée), il se désabonne discrètement du fournisseur d'origine avant de s'abonner à une nouvelle. Cette approche masque le fait qu'il n'est plus abonné au fournisseur d'origine, ce qui peut devenir une surprise plus tard.
    • Si l'observateur possède déjà une instance de désabonnement, alors lève une exception. Un code d'appel qui se comporte bien doit désinscrire explicitement l'observateur.
    • Observer est abonné à plusieurs fournisseurs. C'est l'option la plus intrigante, mais peut-elle être mise en œuvre avec IObservable et IObserver? Voyons voir. Il est possible pour l'observateur de conserver une liste des objets non abonnés: un pour chaque source. Malheureusement, IObserver.OnComplete()ne fournit pas de référence au fournisseur qui l'a envoyé. Ainsi, l'implémentation IObserver avec plusieurs fournisseurs ne serait pas en mesure de déterminer lequel désinscrire.
  3. Le serveur IObserver de .NET était-il destiné à s'abonner à plusieurs serveurs IObservables?
    La définition classique du modèle d'observateur exige-t-elle qu'un seul observateur doit être en mesure de s'abonner à plusieurs fournisseurs? Ou est-elle facultative et dépendante de la mise en œuvre?

Nick Alexeev
la source

Réponses:

5

Les deux interfaces font en fait partie des extensions réactives (Rx pour faire court), vous devriez utiliser cette bibliothèque à peu près chaque fois que vous voulez les utiliser.

Les interfaces sont techniquement dans mscrolib, pas dans aucun des assemblys Rx. Je pense que c'est pour faciliter l'interopérabilité: de cette façon, des bibliothèques comme TPL Dataflow peuvent fournir des membres qui travaillent avec ces interfaces , sans référencer réellement Rx.

Si vous utilisez les Rx Subjectcomme implémentation de IObservable, vous Subscriberetournerez un IDisposablequi peut être utilisé pour vous désinscrire:

var observable = new Subject<int>();

var unsubscriber =
    observable.Subscribe(Observer.Create<int>(i => Console.WriteLine("1: {0}", i)));
observable.Subscribe(Observer.Create<int>(i => Console.WriteLine("2: {0}", i)));

unsubscriber.Dispose();

observable.OnNext(1003);
observable.OnNext(1004);
svick
la source
5

Juste pour clarifier certaines choses qui sont bien documentées dans les directives officielles de conception Rx et longuement sur mon site Web IntroToRx.com :

  • Vous ne comptez pas sur le GC pour nettoyer vos abonnements. Couvert en détail ici
  • Il n'y a pas de Unsubscribeméthode. Vous vous abonnez à une séquence observable et recevez un abonnement . Vous pouvez ensuite supprimer cet abonnement en indiquant que vous ne souhaitez plus que vos rappels soient invoqués.
  • Une séquence observable ne peut pas être effectuée plus d'une fois (voir la section 4 des directives de conception Rx).
  • Il existe de nombreuses façons de consommer plusieurs séquences observables. Il y a aussi une mine d'informations à ce sujet sur Reactivex.io et encore sur IntroToRx.

Pour être précis et répondre directement à la question d'origine, votre utilisation est à l'envers. Vous ne poussez pas de nombreuses séquences observables dans un seul observateur. Vous composez des séquences observables en une seule séquence observable. Vous vous abonnez ensuite à cette séquence unique.

Au lieu de

K.Subscribe(L1);
K.Subscribe(L2);
K.Unsubscribe();
L1.PublishObservation(1003);
L2.PublishObservation(1004);

Ce qui est juste un pseudo-code et ne fonctionnerait pas dans l'implémentation .NET de Rx, vous devriez faire ce qui suit:

var source1 = new Subject<int>(); //was L1
var source2 = new Subject<int>(); //was L2

var subscription = source1
    .Merge(source2)
    .Subscribe(value=>Console.WriteLine("OnNext({0})", value));


source1.OnNext(1003);
source2.OnNext(1004);

subscription.Dispose();

Maintenant, cela ne correspond pas exactement à la question initiale, mais je ne sais pas ce qui K.Unsubscribe()était censé faire (se désinscrire de tous, le dernier ou le premier abonnement?!)

Lee Campbell
la source
Puis-je simplement enfermer l'objet d'abonnement dans un bloc "à l'aide"?
Robert Oschler
1
Dans ce cas synchrone, vous pouvez, cependant Rx est censé être asynchrone. Dans le cas asynchrone, vous ne pouvez normalement pas utiliser le usingbloc. Le coût d'une déclaration d'abonnement doit être pratiquement nul, vous devez donc compenser le bloc using, vous abonner, laisser le bloc using (donc vous désinscrire), ce qui rend le code plutôt inutile
Lee Campbell
3

Tu as raison. L'exemple fonctionne mal pour plusieurs IObservables.

Je suppose que OnComplete () ne fournit pas de référence car ils ne veulent pas que IObservable doive le garder. Si j'écrivais que je prendrais probablement en charge plusieurs abonnements en demandant à Subscribe de prendre un identifiant comme deuxième paramètre, qui est renvoyé à l'appel OnComplete (). Vous pourriez donc dire

K.Subscribe(L1,"L1")
K.Subscribe(L2,"L2")
K.Unsubscribe("L1")

En l'état, il semble que le .NET IObserver ne convient pas à plusieurs observateurs. Mais je suppose que votre objet principal (LocationReporter dans l'exemple) pourrait avoir

public Dictionary<String,IObserver> Observers;

et cela vous permettrait de soutenir

K.Subscribe(L1,"L1")
K.Subscribe(L2,"L2")
K.Unsubscribe("L1")

ainsi que.

Je suppose que Microsoft pourrait faire valoir qu'il n'est donc pas nécessaire qu'ils prennent directement en charge plusieurs IObservables dans les interfaces.

psr
la source
Je pensais également que la mise en œuvre observable peut avoir une liste d'observateurs. Moi aussi, j'ai remarqué que cela IObserver.OnComplete()n'identifie pas de qui vient l'appel. Si l'observateur est abonné à plus d'un observable, il ne sait pas de qui se désabonner. Anticlimactique. Je me demande, .NET a-t-il une meilleure interface pour le modèle d'observateur?
Nick Alexeev
Si vous voulez avoir une référence à quelque chose, vous devez en fait utiliser une référence, pas une chaîne.
svick
Cette réponse m'a aidé avec un bug réel. J'utilisais Observable.Create()pour construire un observable, et en enchaînant plusieurs observables sources en utilisant Subscribe(). J'ai par inadvertance passé un observable terminé dans un chemin de code. Ceci a complété mon observable nouvellement créé, même si les autres sources n'étaient pas complètes. Il m'a fallu des siècles à travailler sur ce que je devais faire - interrupteur Observable.Empty()pour Observable.Never().
Olly
0

Je sais que c'est bien tard pour la fête, mais ...

Les interfaces I Observable<T>et neIObserver<T> font pas partie de Rx ... ce sont des types de base ... mais Rx en fait un usage intensif.

Vous êtes libre d'avoir autant (ou aussi peu) d'observateurs que vous le souhaitez. Si vous prévoyez plusieurs observateurs, il est de la responsabilité de l'observable d'acheminer les OnNext()appels vers les observateurs appropriés pour chaque événement observé. L'observable peut avoir besoin d'une liste ou d'un dictionnaire comme vous le suggérez.

Il y a de bons cas pour n'en autoriser qu'un - et de bons pour en autoriser plusieurs. Par exemple, dans une implémentation CQRS / ES, vous pouvez appliquer un seul gestionnaire de commandes par type de commande sur un bus de commandes, tandis que vous pouvez notifier plusieurs transformations côté lecture pour un type d' événement donné dans le magasin d'événements.

Comme indiqué dans d'autres réponses, il n'y a pas Unsubscribe. Jeter ce que l'on vous donne lorsque vous faites Subscribegénéralement le sale boulot. L'observateur, ou un de ses agents, est responsable de conserver le jeton jusqu'à ce qu'il ne souhaite plus recevoir de notifications supplémentaires . (question 1)

Donc, dans votre exemple:

K.Subscribe(L1);
K.Subscribe(L2);
K.Unsubscribe();
L1.PublishObservation(1003);
L2.PublishObservation(1004);

... ce serait plutôt:

using ( var l1Token = K.Subscribe( L1 ) )
{
  using ( var l2Token = K.Subscribe( L2 );
  {
    L1.PublishObservation( 1003 );
    L2.PublishObservation( 1004 );
  } //--> effectively unsubscribing to L2 here

  L2.PublishObservation( 1005 );
}

... où K entendrait 1003 et 1004 mais pas 1005.

Pour moi, cela semble toujours drôle parce que nominalement, les abonnements sont des choses qui durent longtemps ... souvent pour la durée du programme. Ils ne sont pas différents à cet égard des événements .Net normaux.

Dans de nombreux exemples que j'ai vus, le Disposejeton fonctionne pour supprimer l'observateur de la liste d'observateurs de l'observable. Je préfère que le jeton ne transporte pas autant de connaissances ... et j'ai donc généralisé mes jetons d'abonnement pour simplement appeler un lambda transmis (avec des informations d'identification capturées au moment de l'abonnement:

public class SubscriptionToken<T>: IDisposable
{
  private readonly Action unsubscribe;

  private SubscriptionToken( ) { }
  public SubscriptionToken( Action unsubscribe )
  {
    this.unsubscribe = unsubscribe;
  }

  public void Dispose( )
  {
    unsubscribe( );
  }
}

... et l'observable peut installer le comportement de désabonnement pendant l'abonnement:

IDisposable Subscribe<T>( IObserver<T> observer )
{
  var subscriberId = Guid.NewGuid( );
  subscribers.Add( subscriberId, observer );

  return new SubscriptionToken<T>
  (
    ( ) =>
    subscribers.Remove( subscriberId );
  );
}

Si votre observateur capture des événements de plusieurs observables, vous voudrez peut-être vous assurer qu'il existe une sorte d'informations de corrélation dans les événements eux-mêmes ... comme les événements .Net le font avec sender. C'est à vous de décider si cela compte ou non. Ce n'est pas intégré, comme vous l'avez raisonnablement raisonné. (question 3)

Argile
la source