Pourquoi les sujets ne sont-ils pas recommandés dans les extensions réactives .NET?

111

Je suis actuellement en train de me familiariser avec le framework Reactive Extensions pour .NET et je suis en train de parcourir les différentes ressources d'introduction que j'ai trouvées (principalement http://www.introtorx.com )

Notre application implique un certain nombre d'interfaces matérielles qui détectent les trames réseau, ce seront mes IObservables, j'ai alors une variété de composants qui consommeront ces trames ou effectueront une certaine manière de transformer les données et de produire un nouveau type de trame. Il y aura également d'autres composants qui doivent afficher chaque nième image par exemple. Je suis convaincu que Rx va être utile pour notre application, mais j'ai du mal avec les détails d'implémentation de l'interface IObserver.

La plupart (sinon la totalité) des ressources que j'ai lues ont dit que je ne devrais pas implémenter l'interface IObservable moi-même mais utiliser l'une des fonctions ou classes fournies. D'après mes recherches, il semble que la création d'un Subject<IBaseFrame>me fournirait ce dont j'ai besoin, j'aurais mon thread unique qui lit les données de l'interface matérielle, puis appelle la fonction OnNext de mon Subject<IBaseFrame>instance. Les différents composants IObserver recevraient alors leurs notifications de ce sujet.

Ma confusion vient des conseils donnés en annexe de ce tutoriel où il est dit:

Évitez d'utiliser les types de sujet. Rx est en fait un paradigme de programmation fonctionnel. Utiliser des sujets signifie que nous gérons maintenant l'état, qui est potentiellement en train de muter. Il est très difficile de gérer à la fois l'état mutant et la programmation asynchrone. De plus, de nombreux opérateurs (méthodes d'extension) ont été soigneusement écrits pour garantir que la durée de vie correcte et cohérente des abonnements et des séquences est maintenue; lorsque vous introduisez des sujets, vous pouvez rompre cela. Les versions futures peuvent également voir une dégradation significative des performances si vous utilisez explicitement des sujets.

Mon application est assez critique pour les performances, je vais évidemment tester les performances de l'utilisation des modèles Rx avant d'entrer dans le code de production; cependant, je crains de faire quelque chose qui va à l'encontre de l'esprit du framework Rx en utilisant la classe Subject et qu'une future version du framework va nuire aux performances.

Existe-t-il une meilleure façon de faire ce que je veux? Le thread d'interrogation matériel va fonctionner en continu, qu'il y ait des observateurs ou non (le tampon matériel sauvegardera autrement), c'est donc une séquence très chaude. Je dois ensuite transmettre les trames reçues à plusieurs observateurs.

Tout avis serait grandement apprécié.

Anthony
la source
1
Cela m'a vraiment aidé à comprendre le sujet, je suis juste en train de comprendre comment l'utiliser dans mon application. Je sais qu'ils sont la bonne chose - j'ai un pipeline de composants qui sont très orientés push et je dois faire toutes sortes de filtrage et d'invocation sur le thread de l'interface utilisateur pour afficher dans une interface graphique ainsi que la mise en mémoire tampon de la dernière image reçue, etc. etc - Je dois juste m'assurer de bien le faire du premier coup!
Anthony

Réponses:

70

Ok, si nous ignorons mes manières dogmatiques et ignorons "les sujets sont bons / mauvais" tous ensemble. Examinons l'espace des problèmes.

Je parie que vous avez l'un des deux styles de système auxquels vous devez vous féliciter.

  1. Le système déclenche un événement ou un rappel lorsqu'un message arrive
  2. Vous devez interroger le système pour voir s'il y a un message à traiter

Pour l'option 1, facile, nous l'enveloppons simplement avec la méthode FromEvent appropriée et nous avons terminé. Au bar!

Pour l'option 2, nous devons maintenant examiner comment nous sondons cela et comment le faire efficacement. Aussi, lorsque nous obtenons la valeur, comment la publier?

J'imagine que vous voudriez un fil dédié pour le sondage. Vous ne voudriez pas qu'un autre codeur martèle le ThreadPool / TaskPool et vous laisse dans une situation de famine ThreadPool. Sinon, vous ne voulez pas les tracas de la commutation de contexte (je suppose). Supposons donc que nous ayons notre propre thread, nous aurons probablement une sorte de boucle While / Sleep dans laquelle nous nous asseyons pour interroger. Lorsque le chèque trouve des messages, nous les publions. Eh bien, tout cela semble parfait pour Observable.Create. Maintenant, nous ne pouvons probablement pas utiliser une boucle While car cela ne nous permettra pas de retourner un jetable pour permettre l'annulation. Heureusement, vous avez lu tout le livre, alors soyez avertis avec la planification récursive!

J'imagine que quelque chose comme ça pourrait fonctionner. #Pas testé

public class MessageListener
{
    private readonly IObservable<IMessage> _messages;
    private readonly IScheduler _scheduler;

    public MessageListener()
    {
        _scheduler = new EventLoopScheduler();

        var messages = ListenToMessages()
                                    .SubscribeOn(_scheduler)
                                    .Publish();

        _messages = messages;
        messages.Connect();
    }

    public IObservable<IMessage> Messages
    {
        get {return _messages;}
    }

    private IObservable<IMessage> ListenToMessages()
    {
        return Observable.Create<IMessage>(o=>
        {
                return _scheduler.Schedule(recurse=>
                {
                    try
                    {           
                        var messages = GetMessages();
                        foreach (var msg in messages)
                        {
                            o.OnNext(msg);
                        }   
                        recurse();
                    }
                    catch (Exception ex)
                    {
                        o.OnError(ex);
                    }                   
                });
        });
    }

    private IEnumerable<IMessage> GetMessages()
    {
         //Do some work here that gets messages from a queue, 
         // file system, database or other system that cant push 
         // new data at us.
         // 
         //This may return an empty result when no new data is found.
    }
}

La raison pour laquelle je n'aime vraiment pas les sujets, c'est que le développeur n'a généralement pas une conception claire du problème. Piratez un sujet, piquez-le ici là-bas et partout, puis laissez le pauvre développeur de soutien deviner que WTF se passait. Lorsque vous utilisez les méthodes Créer / Générer, etc., vous localisez les effets sur la séquence. Vous pouvez tout voir dans une seule méthode et vous savez que personne d'autre ne lance un effet secondaire désagréable. Si je vois un champ de sujet, je dois maintenant chercher tous les endroits dans une classe où il est utilisé. Si un MFer en expose un publiquement, alors tous les paris sont ouverts, qui sait comment cette séquence est utilisée! Async / Concurrency / Rx est difficile. Vous n'avez pas besoin de rendre les choses plus difficiles en permettant aux effets secondaires et à la programmation de causalité de vous faire encore plus tourner la tête.

Lee Campbell
la source
10
Je ne fais que lire cette réponse maintenant, mais je pensais que je devais souligner que je n'envisagerais jamais d'exposer l'interface Subject! Je l'utilise pour fournir l'implémentation IObservable <> dans une classe scellée (qui expose IObservable <>). Je peux certainement voir pourquoi exposer l'interface Subject <> serait une mauvaise chose ™
Anthony
hé, désolé d'être épais, mais je ne comprends pas vraiment votre code. que font et retournent ListenToMessages () et GetMessages ()?
user10479
1
Pour votre projet personnel @jeromerg, cela peut convenir. Cependant, d'après mon expérience, les développeurs ont du mal avec WPF, MVVM, la conception de l'interface graphique de test unitaire, puis le lancement de Rx peuvent rendre les choses plus compliquées. J'ai essayé le modèle BehaviourSubject-as-a-property. Cependant, j'ai trouvé qu'il était beaucoup plus adoptable pour les autres si nous utilisions les propriétés INPC standard, puis en utilisant une méthode d'extension simple pour convertir cela en IObservable. En outre, vous aurez besoin de liaisons WPF personnalisées pour travailler avec vos sujets de comportement. Maintenant, votre pauvre équipe doit apprendre WPF, MVVM, Rx et votre nouveau framework.
Lee Campbell
2
@LeeCampbell, pour le mettre en fonction de votre exemple de code, la manière normale serait que MessageListener soit construit par le système (vous enregistrez probablement le nom de la classe d'une manière ou d'une autre), et on vous dit que le système appellera alors OnCreate () et OnGoodbye (), et appellera message1 (), message2 () et message3 () lorsque les messages sont générés. Il semble que messageX [123] appellerait OnNext sur un sujet, mais y a-t-il un meilleur moyen?
James Moore
1
@JamesMoore car ces choses sont beaucoup plus faciles à expliquer avec des exemples concrets. Si vous connaissez une application Android Open Source qui utilise Rx et Subjects, alors je peux peut-être trouver le temps de voir si je peux fournir un meilleur moyen. Je comprends qu'il n'est pas très utile de se tenir sur un piédestal et de dire que les sujets sont mauvais. Mais je pense que des choses comme IntroToRx, RxCookbook et ReactiveTrader donnent tous différents niveaux d'exemple sur la façon d'utiliser Rx.
Lee Campbell
38

En général, vous devriez éviter d'utiliser Subject, mais pour ce que vous faites ici, je pense qu'ils fonctionnent très bien. J'ai posé une question similaire quand je suis tombé sur le message "éviter les sujets" dans les tutoriels Rx.

Pour citer Dave Sexton (de Rxx)

«Les sujets sont les composants avec état de Rx. Ils sont utiles lorsque vous devez créer une observable semblable à un événement en tant que champ ou variable locale.»

J'ai tendance à les utiliser comme point d'entrée dans Rx. Donc, si j'ai un code qui a besoin de dire «quelque chose s'est passé» (comme vous l'avez fait), j'utiliserais un Subjectand call OnNext. Ensuite, exposez-le IObservablepour que les autres puissent s'y abonner (vous pouvez l'utiliser AsObservable()sur votre sujet pour vous assurer que personne ne peut lancer un casting sur un sujet et gâcher les choses).

Vous pouvez également y parvenir avec un événement .NET et une utilisation FromEventPattern, mais si je ne fais que transformer l'événement en un événement de IObservabletoute façon, je ne vois pas l'avantage d'avoir un événement au lieu d'un Subject(ce qui pourrait signifier que je suis absent quelque chose ici)

Cependant, ce que vous devriez éviter assez fortement est de souscrire à un IObservableavec a Subject, c'est-à-dire ne pas passer a Subjectdans la IObservable.Subscribeméthode.

Wilka
la source
Pourquoi avez-vous besoin de l'état? Comme le montre ma réponse, si vous décomposez le problème en plusieurs parties, vous n'avez pas vraiment à gérer l'état du tout. Les sujets ne doivent pas être utilisés dans ce cas.
casperOne
8
@casperOne Vous n'avez pas besoin d'un état en dehors du Subject <T> ou de l'événement (qui ont tous deux des collections d'objets à appeler, des observateurs ou des gestionnaires d'événements). Je préfère simplement utiliser un sujet si la seule raison d'ajouter un événement est de l'envelopper avec FromEventPattern. Mis à part un changement dans les schémas d'exceptions, qui pourrait être important pour vous, je ne vois aucun avantage à éviter le sujet de cette manière. Encore une fois, je pourrais manquer quelque chose d'autre ici que l'événement est préférable au sujet. La mention de l'état faisait simplement partie de la citation, et il semblait préférable de la laisser dedans. Peut-être que c'est plus clair sans cette partie?
Wilka
@casperOne - mais vous ne devriez pas non plus créer un événement juste pour l'envelopper avec FromEventPattern. C'est évidemment une idée terrible.
James Moore
3
J'ai expliqué ma citation plus en détail dans ce billet de blog .
Dave Sexton
J'ai tendance à les utiliser comme point d'entrée dans Rx. Cela a frappé dans le mille pour moi. J'ai une situation où il y a une API qui, lorsqu'elle est invoquée, génère des événements que j'aimerais passer par un pipeline de traitement réactif. Le sujet était la réponse pour moi, puisque le FromEventPattern ne semble pas exister dans RxJava AFAICT.
scorpiodawg
31

Souvent, lorsque vous gérez un sujet, vous ne faites que réimplémenter des fonctionnalités déjà dans Rx, et probablement d'une manière pas aussi robuste, simple et extensible.

Lorsque vous essayez d'adapter un flux de données asynchrone dans Rx (ou de créer un flux de données asynchrone à partir d'un flux qui n'est pas actuellement asynchrone), les cas les plus courants sont généralement:

  • La source des données est un événement : comme le dit Lee, c'est le cas le plus simple: utilisez FromEvent et dirigez-vous vers le pub.

  • La source des données provient d'une opération synchrone et vous voulez des mises à jour interrogées (par exemple, un service Web ou un appel de base de données): Dans ce cas, vous pouvez utiliser l'approche suggérée par Lee, ou pour des cas simples, vous pouvez utiliser quelque chose comme Observable.Interval.Select(_ => <db fetch>). Vous souhaiterez peut-être utiliser DistinctUntilChanged () pour empêcher la publication de mises à jour lorsque rien n'a changé dans les données source.

  • La source des données est une sorte d'API asynchrone qui appelle votre rappel : dans ce cas, utilisez Observable.Create pour connecter votre rappel afin d'appeler OnNext / OnError / OnComplete sur l'observateur.

  • La source des données est un appel qui bloque jusqu'à ce que de nouvelles données soient disponibles (par exemple, certaines opérations de lecture de socket synchrone): Dans ce cas, vous pouvez utiliser Observable.Create pour envelopper le code impératif qui lit depuis le socket et publie sur l'Observer. lorsque les données sont lues. Cela peut être similaire à ce que vous faites avec le sujet.

Utiliser Observable.Create vs créer une classe qui gère un Subject équivaut assez à utiliser le mot-clé yield vs créer une classe entière qui implémente IEnumerator. Bien sûr, vous pouvez écrire un IEnumerator pour être aussi propre et aussi bon citoyen que le code de rendement, mais lequel est le mieux encapsulé et offre un design plus soigné? La même chose est vraie pour Observable.Create vs la gestion des sujets.

Observable.Create vous donne un modèle propre pour une configuration paresseuse et un démontage propre. Comment y parvenir avec une classe enveloppant un sujet? Vous avez besoin d'une sorte de méthode Start ... comment savoir quand l'appeler? Ou le démarrez-vous toujours, même si personne n'écoute? Et quand vous avez terminé, comment arrêtez-vous de lire à partir du socket / d'interroger la base de données, etc.? Vous devez avoir une sorte de méthode Stop, et vous devez toujours avoir accès non seulement à l'IObservable auquel vous êtes abonné, mais à la classe qui a créé le sujet en premier lieu.

Avec Observable.Create, tout est regroupé au même endroit. Le corps de Observable.Create n'est pas exécuté tant que quelqu'un ne s'est pas abonné, donc si personne ne s'abonne, vous n'utilisez jamais votre ressource. Et Observable.Create renvoie un jetable qui peut arrêter proprement votre ressource / vos rappels, etc. - cela est appelé lorsque l'observateur se désabonne. La durée de vie des ressources que vous utilisez pour générer l'observable est parfaitement liée à la durée de vie de l'observable lui-même.

Niall Connaughton
la source
1
Explication très claire de Observable.Create. Je vous remercie!
Evan Moran
1
J'ai encore des cas où j'utilise un sujet, où un objet courtier expose l'observable (disons que c'est juste une propriété modifiable). Différents composants appelleront le courtier pour indiquer quand cette propriété change (avec un appel de méthode), et cette méthode effectue un OnNext. Les consommateurs s'abonnent. Je pense que j'utiliserais un BehaviorSubject dans ce cas, est-ce approprié?
Frank Schwieterman
1
Ça dépend de la situation. Une bonne conception Rx a tendance à transformer le système vers une architecture asynchrone / réactive. Il peut être difficile d'intégrer proprement de petits composants de code réactif avec un système de conception impérative. La solution rapide consiste à utiliser des sujets pour transformer des actions impératives (appels de fonctions, ensembles de propriétés) en événements observables. Ensuite, vous vous retrouvez avec de petites poches de code réactif et pas de vrai "aha!" moment. Changer la conception pour modéliser le flux de données et y réagir donne généralement une meilleure conception, mais c'est un changement omniprésent et nécessite un changement de mentalité et l'adhésion de l'équipe.
Niall Connaughton le
1
Je dirais ici (comme Rx inexpérimenté) que: En utilisant des sujets, vous pouvez entrer dans le monde de Rx dans une application impérative développée et la transformer lentement. Aussi pour gagner des premières expériences ... et certainement plus tard changer votre code pour qu'il aurait dû être depuis le début (lol). Mais pour commencer, je pense qu'il pourrait être intéressant d'utiliser des sujets.
Robetto
9

Le texte de bloc cité explique à peu près pourquoi vous ne devriez pas utiliser Subject<T>, mais pour le dire plus simple, vous combinez les fonctions d'observateur et d'observable, tout en injectant une sorte d'état entre les deux (que vous encapsuliez ou étendiez).

C'est là que vous rencontrez des problèmes; ces responsabilités devraient être séparées et distinctes les unes des autres.

Cela dit, dans votre cas particulier , je vous recommande de diviser vos préoccupations en parties plus petites.

Tout d'abord, votre thread est chaud et surveille toujours le matériel pour les signaux à émettre des notifications. Comment feriez-vous cela normalement? Événements . Alors commençons par ça.

Définissons le EventArgsdéclenchement de votre événement.

// The event args that has the information.
public class BaseFrameEventArgs : EventArgs
{
    public BaseFrameEventArgs(IBaseFrame baseFrame)
    {
        // Validate parameters.
        if (baseFrame == null) throw new ArgumentNullException("IBaseFrame");

        // Set values.
        BaseFrame = baseFrame;
    }

    // Poor man's immutability.
    public IBaseFrame BaseFrame { get; private set; }
}

Maintenant, la classe qui déclenchera l'événement. Remarque, cela pourrait être une classe statique (puisque vous avez toujours un fil conducteur suivi de la mémoire tampon du matériel), ou quelque chose que vous appelez à la demande qui est abonnée à ce que . Vous devrez le modifier le cas échéant.

public class BaseFrameMonitor
{
    // You want to make this access thread safe
    public event EventHandler<BaseFrameEventArgs> HardwareEvent;

    public BaseFrameMonitor()
    {
        // Create/subscribe to your thread that
        // drains hardware signals.
    }
}

Alors maintenant, vous avez une classe qui expose un événement. Les observables fonctionnent bien avec les événements. À tel point qu'il existe un support de première classe pour la conversion de flux d'événements (pensez à un flux d'événements comme plusieurs déclenchements d'un événement) en IObservable<T>implémentations si vous suivez le modèle d'événement standard, via la méthode statiqueFromEventPattern sur la Observableclasse .

Avec la source de vos événements et la FromEventPatternméthode, nous pouvons créer IObservable<EventPattern<BaseFrameEventArgs>>facilement un (la EventPattern<TEventArgs>classe incarne ce que vous verriez dans un événement .NET, notamment, une instance dérivée de EventArgset un objet représentant l'expéditeur), comme ceci:

// The event source.
// Or you might not need this if your class is static and exposes
// the event as a static event.
var source = new BaseFrameMonitor();

// Create the observable.  It's going to be hot
// as the events are hot.
IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
    FromEventPattern<BaseFrameEventArgs>(
        h => source.HardwareEvent += h,
        h => source.HardwareEvent -= h);

Bien sûr, vous voulez un IObservable<IBaseFrame>, mais c'est facile, en utilisant la Selectméthode d'extension sur la Observableclasse pour créer une projection (comme vous le feriez dans LINQ, et nous pouvons envelopper tout cela dans une méthode facile à utiliser):

public IObservable<IBaseFrame> CreateHardwareObservable()
{
    // The event source.
    // Or you might not need this if your class is static and exposes
    // the event as a static event.
    var source = new BaseFrameMonitor();

    // Create the observable.  It's going to be hot
    // as the events are hot.
    IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
        FromEventPattern<BaseFrameEventArgs>(
            h => source.HardwareEvent += h,
            h => source.HardwareEvent -= h);

    // Return the observable, but projected.
    return observable.Select(i => i.EventArgs.BaseFrame);
}
casperOne
la source
7
Merci pour votre réponse @casperOne, c'était mon approche initiale mais cela me semblait "mal" d'ajouter un événement juste pour que je puisse l'envelopper avec Rx. J'utilise actuellement des délégués (et oui, je sais que c'est exactement ce qu'est un événement!) Pour s'adapter au code utilisé pour charger et enregistrer la configuration, cela doit être en mesure de reconstruire les pipelines de composants et le système de délégués m'a donné le plus la flexibilité. Rx me donne un mal de tête dans ce domaine maintenant, mais la puissance de tout le reste dans le cadre rend la résolution du problème de configuration très utile.
Anthony
@Anthony Si vous pouvez faire fonctionner son exemple de code, c'est parfait, mais comme je l'ai commenté, cela n'a aucun sens. Quant à se sentir "mal", je ne sais pas pourquoi subdiviser les choses en parties logiques semble "faux", mais vous n'avez pas donné suffisamment de détails dans votre message d'origine pour indiquer comment traduire au mieux cela IObservable<T>car aucune information sur la façon dont vous " re signalant actuellement avec cette information est donnée.
casperOne
@casperOne À votre avis, l'utilisation de sujets serait-elle appropriée pour un bus de messages / agrégateur d'événements?
kitsune
1
@kitsune Non, je ne vois pas pourquoi ils le feraient. Si vous pensez "optimisation", vous devez vous poser la question de savoir si c'est le problème ou non, avez-vous mesuré Rx comme étant la cause du problème?
casperOne
2
Je suis d'accord ici avec casperOne que le partage des préoccupations est une bonne idée. Je tiens à souligner que si vous utilisez le modèle Hardware to Event to Rx, vous perdez la sémantique d'erreur. Les connexions ou sessions perdues, etc. ne seront pas exposées au consommateur. Désormais, le consommateur ne peut pas décider s'il souhaite réessayer, se déconnecter, s'abonner à une autre séquence ou à autre chose.
Lee Campbell
0

Il est mauvais de généraliser que les sujets ne sont pas bons à utiliser pour une interface publique. S'il est certainement vrai que ce n'est pas à quoi devrait ressembler une approche de programmation réactive, c'est définitivement une bonne option d'amélioration / refactorisation pour votre code classique.

Si vous avez une propriété normale avec un accesseur d'ensemble public et que vous souhaitez notifier les modifications, rien ne s'oppose à son remplacement par un BehaviorSubject. INPC ou d'autres événements supplémentaires ne sont tout simplement pas aussi propres et cela m'épuise personnellement. À cette fin, vous pouvez et devez utiliser BehaviorSubjects comme propriétés publiques au lieu de propriétés normales et abandonner INPC ou d'autres événements.

De plus, l'interface Objet rend les utilisateurs de votre interface plus conscients de la fonctionnalité de vos propriétés et sont plus susceptibles de s'abonner au lieu d'obtenir simplement la valeur.

C'est le meilleur à utiliser si vous souhaitez que d'autres écoutent / s'abonnent aux modifications d'une propriété.

Felix Keil
la source