Après avoir fait quelques (plus ou moins) « bas niveau » async socket
il y a quelques années de programmation (dans un modèle asynchrone basé sur des événements (EAP) la mode) et récemment en mouvement « up » à un TcpListener
(modèle de programmation asynchrone (APM) ) puis en essayant de passer à async/await
(Modèle asynchrone basé sur les tâches (TAP) ), je l'ai à peu près avec avoir à me soucier de toute cette `` plomberie de bas niveau ''. Je pensais donc; pourquoi ne pas essayerRX
( Reactive Extensions ) car il pourrait s'adapter mieux à mon domaine de problème.
Beaucoup de code que j'écris doit être utilisé par de nombreux clients qui se connectent via Tcp à mon application, puis démarrent une communication bidirectionnelle (asynchrone). Le client ou le serveur peut à tout moment décider qu'un message doit être envoyé et le faire, ce n'est donc pas votre request/response
configuration classique mais plutôt une "ligne" bidirectionnelle en temps réel ouverte aux deux parties pour envoyer ce qu'elles veulent , quand ils veulent. (Si quelqu'un a un nom décent pour décrire cela, je serais heureux de l'entendre!).
Le «protocole» diffère selon l'application (et n'est pas vraiment pertinent pour ma question). J'ai cependant une première question:
- Étant donné qu'un seul "serveur" est en cours d'exécution, mais qu'il doit garder une trace de plusieurs (généralement des milliers) de connexions (par exemple des clients) ayant chacune (faute d'une meilleure description) leur propre "machine d'état" pour garder une trace de leur interne États, etc., quelle approche préféreriez-vous? EAP / TAP / APM? Le RX serait-il même considéré comme une option? Sinon, pourquoi?
Donc, je dois travailler Async car a) ce n'est pas un protocole de demande / réponse, donc je ne peux pas avoir de thread / client dans un appel de blocage "en attente de message" ou d'appel de blocage "d'envoi de message" (cependant, si l'envoi est blocage pour ce client seulement, je pouvais vivre avec) et b) je dois gérer de nombreuses connexions simultanées. Je ne vois aucun moyen de faire cela (de manière fiable) en utilisant des appels de blocage.
La plupart de mes applications sont liées à VoiP; que ce soit des messages SIP provenant de clients SIP ou des messages PBX (liés) provenant d'applications comme FreeSwitch / OpenSIPS, etc. La plupart des protocoles sont basés sur du texte (ASCII).
Donc, après avoir mis en œuvre de nombreuses permutations différentes des techniques susmentionnées, je voudrais simplifier mon travail en créant un objet que je peux simplement instancier, lui dire sur quoi IPEndpoint
écouter et le faire me dire chaque fois que quelque chose d'intéressant se passe (ce que je fais habituellement utiliser des événements pour, donc certains PAE sont généralement mélangés avec les deux autres techniques). La classe ne devrait pas prendre la peine d'essayer de «comprendre» le protocole; il doit simplement gérer les chaînes entrantes / sortantes. Et donc, ayant l'œil sur RX en espérant que cela (à la fin) simplifierait le travail, j'ai créé un nouveau "violon" à partir de zéro:
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Reactive.Linq;
using System.Text;
class Program
{
static void Main(string[] args)
{
var f = new FiddleServer(new IPEndPoint(IPAddress.Any, 8084));
f.Start();
Console.ReadKey();
f.Stop();
Console.ReadKey();
}
}
public class FiddleServer
{
private TcpListener _listener;
private ConcurrentDictionary<ulong, FiddleClient> _clients;
private ulong _currentid = 0;
public IPEndPoint LocalEP { get; private set; }
public FiddleServer(IPEndPoint localEP)
{
this.LocalEP = localEP;
_clients = new ConcurrentDictionary<ulong, FiddleClient>();
}
public void Start()
{
_listener = new TcpListener(this.LocalEP);
_listener.Start();
Observable.While(() => true, Observable.FromAsync(_listener.AcceptTcpClientAsync)).Subscribe(
//OnNext
tcpclient =>
{
//Create new FSClient with unique ID
var fsclient = new FiddleClient(_currentid++, tcpclient);
//Keep track of clients
_clients.TryAdd(fsclient.ClientId, fsclient);
//Initialize connection
fsclient.Send("connect\n\n");
Console.WriteLine("Client {0} accepted", fsclient.ClientId);
},
//OnError
ex =>
{
},
//OnComplete
() =>
{
Console.WriteLine("Client connection initialized");
//Accept new connections
_listener.AcceptTcpClientAsync();
}
);
Console.WriteLine("Started");
}
public void Stop()
{
_listener.Stop();
Console.WriteLine("Stopped");
}
public void Send(ulong clientid, string rawmessage)
{
FiddleClient fsclient;
if (_clients.TryGetValue(clientid, out fsclient))
{
fsclient.Send(rawmessage);
}
}
}
public class FiddleClient
{
private TcpClient _tcpclient;
public ulong ClientId { get; private set; }
public FiddleClient(ulong id, TcpClient tcpclient)
{
this.ClientId = id;
_tcpclient = tcpclient;
}
public void Send(string rawmessage)
{
Console.WriteLine("Sending {0}", rawmessage);
var data = Encoding.ASCII.GetBytes(rawmessage);
_tcpclient.GetStream().WriteAsync(data, 0, data.Length); //Write vs WriteAsync?
}
}
Je suis conscient que, dans ce "violon", il y a un petit détail spécifique à l'implémentation; dans ce cas, je travaille avec FreeSwitch ESL donc le "connect\n\n"
dans le violon devrait, lors du refactoring à une approche plus générique, être supprimé.
Je suis également conscient que je dois refactoriser les méthodes anonymes en méthodes d'instance privées sur la classe Server; Je ne suis simplement pas sûr de la convention (par exemple " OnSomething
" par exemple) à utiliser pour leurs noms de méthode?
Ceci est ma base / point de départ / fondation (qui a besoin de quelques ajustements). J'ai quelques questions à ce sujet:
- Voir la question "1" ci-dessus
- Suis-je sur la bonne voie? Ou mes décisions de "conception" sont-elles injustes?
- Concurrentiel: cela pourrait-il faire face à des milliers de clients (analyser / gérer les messages réels à part)
- Sur les exceptions: je ne sais pas comment obtenir des exceptions déclenchées au sein des clients "jusqu'au serveur" ("RX -wise"); quel serait un bon moyen?
- Je peux maintenant obtenir n'importe quel client connecté à partir de ma classe de serveur (en l'utilisant
ClientId
), en supposant que j'expose les clients d'une manière ou d'une autre et que j'appelle des méthodes directement sur eux. Je peux également appeler des méthodes via la classe Server (par exemple, laSend(clientId, rawmessage)
méthode (alors que cette dernière approche serait une méthode «pratique» pour obtenir rapidement un message de l'autre côté). - Je ne sais pas trop où (et comment) aller d'ici:
- a) J'ai besoin de gérer les messages entrants; comment pourrais-je mettre cela en place? Je peux obtenir le flux de cours, mais où puis -je gérer la récupération des octets reçus? Je pense que j'ai besoin d'une sorte de "ObservableStream" quelque chose à quoi je peux m'abonner? Dois-je mettre cela dans le
FiddleClient
ouFiddleServer
? - b) En supposant que je veux éviter d' utiliser l' événement jusqu'à ce que ces
FiddleClient
/FiddleServer
classes sont mises en œuvre plus spécifiquement pour adapter leur protocole d'application spécifique de manipulation etc. , en utilisant plus spécifiquesFooClient
/FooServer
cours: comment pourrais - je aller d'obtenir les données dans les sous - jacents « Fiddle' classes à leur homologues plus spécifiques?
- a) J'ai besoin de gérer les messages entrants; comment pourrais-je mettre cela en place? Je peux obtenir le flux de cours, mais où puis -je gérer la récupération des octets reçus? Je pense que j'ai besoin d'une sorte de "ObservableStream" quelque chose à quoi je peux m'abonner? Dois-je mettre cela dans le
Articles / liens que j'ai déjà lus / parcourus / utilisés comme référence:
- Consommer un socket à l'aide de l'extension réactive
- Création et abonnement à des séquences simples observables
- Comment ajouter ou associer un contexte à chaque connexion / session ObservableTcpListener
- Programmation asynchrone avec le cadre réactif et la bibliothèque parallèle de tâches - parties 1 , 2 et 3 .
- Utilisation des extensions réactives (Rx) pour la programmation de socket pratique?
- Un serveur Web piloté .NET Rx et un serveur Web piloté .NET Rx, Take 2
- Quelques tutoriels Rx
Réponses:
Après avoir lu cette déclaration, j'ai immédiatement pensé aux "acteurs". Les acteurs sont très similaires aux objets, sauf qu'ils n'ont qu'une seule entrée où vous lui passez le message (au lieu d'appeler directement les méthodes de l'objet) et ils fonctionnent de manière asynchrone. Dans un exemple très simplifié ... vous créeriez l'acteur et lui enverriez un message avec l'IPEndpoint et l'adresse de l'acteur auquel envoyer le résultat. Il s'éteint et fonctionne en arrière-plan. Vous n'en entendez parler que lorsque "quelque chose d'intéressant" se produit. Vous pouvez instancier autant d'acteurs que nécessaire pour gérer la charge.
Je ne connais aucune bibliothèque d'acteurs dans .Net bien que je sache qu'il y en a. Je connais la bibliothèque TPL Dataflow (il y aura une section qui la couvrira dans mon livre http://DataflowBook.com ) et il devrait être facile d'implémenter un modèle d'acteur simple avec cette bibliothèque.
la source