Utilisation de SignalR avec le basculement du bus de messages Redis à l'aide de ConnectionUtils.Connect () de BookSleeve

112

J'essaie de créer un scénario de basculement de bus de messages Redis avec une application SignalR.

Au début, nous avons essayé un simple basculement de l'équilibreur de charge matériel, qui surveillait simplement deux serveurs Redis. L'application SignalR a indiqué le point de terminaison HLB singulier. J'ai ensuite échoué sur un serveur, mais je n'ai pas réussi à faire passer les messages sur le deuxième serveur Redis sans recycler le pool d'applications SignalR. Ceci est probablement dû au fait qu'il doit émettre les commandes de configuration vers le nouveau bus de messages Redis.

À partir de SignalR RC1, Microsoft.AspNet.SignalR.Redis.RedisMessageBusutilise Booksleeve RedisConnection()pour se connecter à un seul Redis pour pub / sub.

J'ai créé une nouvelle classe, RedisMessageBusCluster()qui utilise Booksleeve ConnectionUtils.Connect()pour se connecter à un dans un cluster de serveurs Redis.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BookSleeve;
using Microsoft.AspNet.SignalR.Infrastructure;

namespace Microsoft.AspNet.SignalR.Redis
{
    /// <summary>
    /// WIP:  Getting scaleout for Redis working
    /// </summary>
    public class RedisMessageBusCluster : ScaleoutMessageBus
    {
        private readonly int _db;
        private readonly string[] _keys;
        private RedisConnection _connection;
        private RedisSubscriberConnection _channel;
        private Task _connectTask;

        private readonly TaskQueue _publishQueue = new TaskQueue();

        public RedisMessageBusCluster(string serverList, int db, IEnumerable<string> keys, IDependencyResolver resolver)
            : base(resolver)
        {
            _db = db;
            _keys = keys.ToArray();

            // uses a list of connections
            _connection = ConnectionUtils.Connect(serverList);

            //_connection = new RedisConnection(host: server, port: port, password: password);

            _connection.Closed += OnConnectionClosed;
            _connection.Error += OnConnectionError;


            // Start the connection - TODO:  can remove this Open as the connection is already opened, but there's the _connectTask is used later on
            _connectTask = _connection.Open().Then(() =>
            {
                // Create a subscription channel in redis
                _channel = _connection.GetOpenSubscriberChannel();

                // Subscribe to the registered connections
                _channel.Subscribe(_keys, OnMessage);

                // Dirty hack but it seems like subscribe returns before the actual
                // subscription is properly setup in some cases
                while (_channel.SubscriptionCount == 0)
                {
                    Thread.Sleep(500);
                }
            });
        }


        protected override Task Send(Message[] messages)
        {
            return _connectTask.Then(msgs =>
            {
                var taskCompletionSource = new TaskCompletionSource<object>();

                // Group messages by source (connection id)
                var messagesBySource = msgs.GroupBy(m => m.Source);

                SendImpl(messagesBySource.GetEnumerator(), taskCompletionSource);

                return taskCompletionSource.Task;
            },
            messages);
        }

        private void SendImpl(IEnumerator<IGrouping<string, Message>> enumerator, TaskCompletionSource<object> taskCompletionSource)
        {
            if (!enumerator.MoveNext())
            {
                taskCompletionSource.TrySetResult(null);
            }
            else
            {
                IGrouping<string, Message> group = enumerator.Current;

                // Get the channel index we're going to use for this message
                int index = Math.Abs(group.Key.GetHashCode()) % _keys.Length;

                string key = _keys[index];

                // Increment the channel number
                _connection.Strings.Increment(_db, key)
                                   .Then((id, k) =>
                                   {
                                       var message = new RedisMessage(id, group.ToArray());

                                       return _connection.Publish(k, message.GetBytes());
                                   }, key)
                                   .Then((enumer, tcs) => SendImpl(enumer, tcs), enumerator, taskCompletionSource)
                                   .ContinueWithNotComplete(taskCompletionSource);
            }
        }

        private void OnConnectionClosed(object sender, EventArgs e)
        {
            // Should we auto reconnect?
            if (true)
            {
                ;
            }
        }

        private void OnConnectionError(object sender, BookSleeve.ErrorEventArgs e)
        {
            // How do we bubble errors?
            if (true)
            {
                ;
            }
        }

        private void OnMessage(string key, byte[] data)
        {
            // The key is the stream id (channel)
            var message = RedisMessage.Deserialize(data);

            _publishQueue.Enqueue(() => OnReceived(key, (ulong)message.Id, message.Messages));
        }

        protected override void Dispose(bool disposing)
        {
            if (disposing)
            {
                if (_channel != null)
                {
                    _channel.Unsubscribe(_keys);
                    _channel.Close(abort: true);
                }

                if (_connection != null)
                {
                    _connection.Close(abort: true);
                }                
            }

            base.Dispose(disposing);
        }
    }
}

Booksleeve a son propre mécanisme pour déterminer un maître, et basculera automatiquement vers un autre serveur, et je le teste maintenant avec SignalR.Chat .

Dans web.config, j'ai défini la liste des serveurs disponibles:

<add key="redis.serverList" value="dbcache1.local:6379,dbcache2.local:6379"/>

Puis dans Application_Start():

        // Redis cluster server list
        string redisServerlist = ConfigurationManager.AppSettings["redis.serverList"];

        List<string> eventKeys = new List<string>();
        eventKeys.Add("SignalR.Redis.FailoverTest");
        GlobalHost.DependencyResolver.UseRedisCluster(redisServerlist, eventKeys);

J'ai ajouté deux méthodes supplémentaires pour Microsoft.AspNet.SignalR.Redis.DependencyResolverExtensions:

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, IEnumerable<string> eventKeys)
{
    return UseRedisCluster(resolver, serverList, db: 0, eventKeys: eventKeys);
}

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, int db, IEnumerable<string> eventKeys)
{
    var bus = new Lazy<RedisMessageBusCluster>(() => new RedisMessageBusCluster(serverList, db, eventKeys, resolver));
    resolver.Register(typeof(IMessageBus), () => bus.Value);

    return resolver;
}

Maintenant, le problème est que lorsque plusieurs points d'arrêt sont activés, jusqu'à ce qu'un nom d'utilisateur ait été ajouté, puis désactivez tous les points d'arrêt, l'application fonctionne comme prévu. Cependant, avec les points d'arrêt désactivés depuis le début, il semble y avoir une condition de concurrence qui peut échouer pendant le processus de connexion.

Ainsi, dans RedisMessageCluster():

    // Start the connection
    _connectTask = _connection.Open().Then(() =>
    {
        // Create a subscription channel in redis
        _channel = _connection.GetOpenSubscriberChannel();

        // Subscribe to the registered connections
        _channel.Subscribe(_keys, OnMessage);

        // Dirty hack but it seems like subscribe returns before the actual
        // subscription is properly setup in some cases
        while (_channel.SubscriptionCount == 0)
        {
            Thread.Sleep(500);
        }
    });

J'ai essayé d'ajouter à la fois un Task.Wait, et même unSleep() (non montré ci-dessus) - qui attendaient / etc, mais obtenaient toujours des erreurs.

L'erreur récurrente semble être dans Booksleeve.MessageQueue.cs~ ln 71:

A first chance exception of type 'System.InvalidOperationException' occurred in BookSleeve.dll
iisexpress.exe Error: 0 : SignalR exception thrown by Task: System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: The queue is closed
   at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71
   at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910
   at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826
   at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277
   at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821
   --- End of inner exception stack trace ---
---> (Inner Exception #0) System.InvalidOperationException: The queue is closed
   at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71
   at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910
   at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826
   at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277
   at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821<---



public void Enqueue(RedisMessage item, bool highPri)
{
    lock (stdPriority)
    {
        if (closed)
        {
            throw new InvalidOperationException("The queue is closed");
        }

Où une exception de file d'attente fermée est levée.

Je prévois un autre problème: comme la connexion Redis est établie, Application_Start()il peut y avoir des problèmes de "reconnexion" à un autre serveur. Cependant, je pense que cela est valable lors de l'utilisation du singulier RedisConnection(), où il n'y a qu'une seule connexion à choisir. Cependant, avec l'intorduction de ConnectionUtils.Connect()j'aimerais entendre parler @dfowlerou les autres gars de SignalR comment ce scénario est géré dans SignalR.

ElHaix
la source
Je vais jeter un oeil, mais: la première chose qui se produit est que vous n'avez pas besoin d'appeler Openpuisque la connexion que vous avez devrait déjà être ouverte. Je ne pourrai pas regarder tout de suite, car je me prépare pour un vol
Marc Gravell
Je crois qu'il y a deux problèmes ici. 1) comment Booksleeve gère un basculement; 2) Comment SignalR utilise les curseurs pour suivre les clients. Lorsqu'un nouveau bus de messages est initialisé, tous les curseurs de mb1 ne sortent pas sur mb2. Par conséquent, lors de la réinitialisation du pool d'applications SignalR, il commencera à fonctionner - pas avant, ce qui n'est évidemment pas une option viable.
ElHaix
2
Lien décrivant comment SignalR utilise les curseurs: stackoverflow.com/questions/13054592/…
ElHaix
Essayez d'utiliser la dernière version du bus de messages Redis. Il prend en charge le passage d'une fabrique de connexions et gère les tentatives de connexion lorsque le serveur tombe en panne.
davidfowl
Avez-vous un lien vers les notes de publication? Merci.
ElHaix

Réponses:

13

L'équipe SignalR a maintenant implémenté la prise en charge d'une fabrique de connexions personnalisées avec StackExchange.Redis , le successeur de BookSleeve, qui prend en charge les connexions Redis redondantes via ConnectionMultiplexer.

Le problème initial rencontré était que malgré la création de mes propres méthodes d'extension dans BookSleeve pour accepter une collection de serveurs, le basculement n'était pas possible.

Désormais, avec l'évolution de BookSleeve vers StackExchange.Redis, nous pouvons désormais configurer la collection de serveurs / ports dès l' Connectinitialisation.

La nouvelle implémentation est beaucoup plus simple que la route que je UseRedisClustersuivais , en créant une méthode, et le pluming back-end prend désormais en charge le vrai basculement:

var conn = ConnectionMultiplexer.Connect("redisServer1:6380,redisServer2:6380,redisServer3:6380,allowAdmin=true");

StackExchange.Redis permet également une configuration manuelle supplémentaire comme indiqué dans la Automatic and Manual Configurationsection de la documentation:

ConfigurationOptions config = new ConfigurationOptions
{
    EndPoints =
    {
        { "redis0", 6379 },
        { "redis1", 6380 }
    },
    CommandMap = CommandMap.Create(new HashSet<string>
    { // EXCLUDE a few commands
        "INFO", "CONFIG", "CLUSTER",
        "PING", "ECHO", "CLIENT"
    }, available: false),
    KeepAlive = 180,
    DefaultVersion = new Version(2, 8, 8),
    Password = "changeme"
};

En substance, la possibilité d'initialiser notre environnement évolutif SignalR avec un ensemble de serveurs résout désormais le problème initial.

ElHaix
la source
Dois-je récompenser votre réponse avec une prime de 500 rep? ;)
nicael
Eh bien, si vous pensez que c'est maintenant la réponse :)
ElHaix
@ElHaix puisque vous avez posé la question, vous êtes probablement le plus qualifié pour dire si votre réponse est concluante ou si c'est juste une pièce du puzzle - je suggère d'ajouter une phrase pour indiquer si et éventuellement comment cela a résolu votre problème
Lars Höppner
Alors? Récompensez-vous? Ou je peux attendre que cela attire plus d'attention.
nicael
Est-ce que je manque quelque chose ou est-ce uniquement dans une branche de fonctionnalité, pas dans le package nuget main (2.1)? De plus, il semble que dans la branche bug-stackexchange ( github.com/SignalR/SignalR/tree/bug-stackexchange/src/… ), il n'y a pas encore de moyen dans la classe RedisScaleoutConfiguration de fournir votre propre multiplexeur.
Steve