File d'attente de taille fixe qui extrait automatiquement les anciennes valeurs sur les nouvelles enques

121

J'utilise ConcurrentQueuepour une structure de données partagée dont le but est de contenir les N derniers objets qui lui sont passés (genre d'histoire).

Supposons que nous ayons un navigateur et que nous souhaitons avoir les 100 dernières URL consultées. Je veux une file d'attente qui supprime automatiquement (dequeue) la plus ancienne (première) entrée lors de l'insertion d'une nouvelle entrée (mise en file d'attente) lorsque la capacité est pleine (100 adresses dans l'historique).

Comment puis-je accomplir cela en utilisant System.Collections?

Xaqron
la source
Il n'était pas destiné spécifiquement à vous, mais à toute personne qui rencontre cette question et pourrait la trouver utile. btw, il parle aussi de C #. Avez-vous réussi à lire toutes les réponses (en 2 minutes) et à comprendre qu'il n'y a pas de code C # là-bas? Quoi qu'il en soit, je ne suis pas sûr moi-même, et donc c'est un commentaire ...
Vous pouvez simplement envelopper les méthodes dans un verrou. Étant donné qu'ils sont rapides, vous pouvez simplement verrouiller l'ensemble du tableau. C'est probablement une dupe cependant. La recherche d'implémentations de tampon circulaire avec du code C # peut vous trouver quelque chose. De toute façon bonne chance.

Réponses:

111

J'écrirais une classe wrapper qui, sur Enqueue, vérifierait le compte, puis Dequeue lorsque le nombre dépasse la limite.

 public class FixedSizedQueue<T>
 {
     ConcurrentQueue<T> q = new ConcurrentQueue<T>();
     private object lockObject = new object();

     public int Limit { get; set; }
     public void Enqueue(T obj)
     {
        q.Enqueue(obj);
        lock (lockObject)
        {
           T overflow;
           while (q.Count > Limit && q.TryDequeue(out overflow)) ;
        }
     }
 }
Richard Schneider
la source
4
qest privé pour l'objet, de sorte que le lockempêchera les autres threads d'accéder simultanément.
Richard Schneider
14
Ce n'est pas une bonne idée de verrouiller. L'objectif global des collections simultanées BCL est de fournir une concurrence d'accès sans verrouillage pour des raisons de performances. Le verrouillage de votre code compromet cet avantage. En fait, je ne vois pas de raison pour laquelle vous devez verrouiller le deq.
KFL
2
@KFL, besoin de verrouiller car Countet ce TryDequeuesont deux opérations indépendantes qui ne sont pas synchronisées par BCL Concurrent.
Richard Schneider
9
@RichardSchneider Si vous avez besoin de gérer vous-même les problèmes de concurrence, ce serait une bonne idée d'échanger l' ConcurrentQueue<T>objet contre un Queue<T>objet plus léger.
0b101010
6
Ne définissez pas votre propre file d'attente, utilisez simplement celle héritée. Si vous faites comme vous, vous ne pouvez en fait rien faire d'autre avec les valeurs de la file d'attente, toutes les autres fonctions, à l'exception de votre nouvelle, Enqueueappelleront toujours la file d'attente d'origine. En d'autres termes, bien que cette réponse soit marquée comme acceptée, elle est complètement et complètement brisée.
Gábor
104

J'opterais pour une légère variante ... étendre ConcurrentQueue afin de pouvoir utiliser les extensions Linq sur FixedSizeQueue

public class FixedSizedQueue<T> : ConcurrentQueue<T>
{
    private readonly object syncObject = new object();

    public int Size { get; private set; }

    public FixedSizedQueue(int size)
    {
        Size = size;
    }

    public new void Enqueue(T obj)
    {
        base.Enqueue(obj);
        lock (syncObject)
        {
            while (base.Count > Size)
            {
                T outObj;
                base.TryDequeue(out outObj);
            }
        }
    }
}
Dave Lawrence
la source
1
que se passe-t-il lorsque quelqu'un connaît statiquement l'instance en tant que ConcurrentQueue <T>, il vient de contourner votre «nouveau» mot-clé.
mhand
6
@mhand Si «quelqu'un» voulait faire ça; ils auraient alors choisi d'utiliser un objet ConcurrentQueue <T> pour commencer ... Il s'agit d'une classe de stockage personnalisée. Personne ne cherche à ce que cela soit soumis au framework .NET. Vous avez cherché à créer un problème pour le plaisir.
Dave Lawrence
9
mon point est au lieu de sous-classer, peut-être que vous devriez simplement envelopper la file d'attente ... cela applique le comportement souhaité dans tous les cas. De plus, comme il s'agit d'une classe de stockage personnalisée, rendons-la complètement personnalisée, n'exposons que les opérations dont nous avons besoin, le sous-classement est le mauvais outil ici à mon humble avis.
mhand
3
@mhand Oui, je comprends ce que vous dites .. Je pourrais encapsuler une file d'attente et exposer l'énumérateur de la file d'attente afin d'utiliser les extensions Linq.
Dave Lawrence
1
Je suis d'accord avec @mhand, vous ne devriez pas hériter de ConcurrentQueue car la méthode Enqueue n'est pas virtuelle. Vous devez proxy la file d'attente et implémenter toute l'interface si vous le souhaitez.
Chris Marisic
29

Pour tous ceux qui le trouvent utile, voici un code de travail basé sur la réponse de Richard Schneider ci-dessus:

public class FixedSizedQueue<T>
{
    readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public int Size { get; private set; }

    public FixedSizedQueue(int size)
    {
        Size = size;
    }

    public void Enqueue(T obj)
    {
        queue.Enqueue(obj);

        while (queue.Count > Size)
        {
            T outObj;
            queue.TryDequeue(out outObj);
        }
    }
}
Tod Thomson
la source
1
Voter pour les raisons mentionnées (le verrouillage lors de l'utilisation d'un ConcurrentQueue est mauvais) en plus de ne pas implémenter l'une des interfaces requises pour que ce soit une vraie collection.
Josh
11

Pour ce que cela vaut, voici un tampon circulaire léger avec certaines méthodes marquées pour une utilisation sûre et non sécurisée.

public class CircularBuffer<T> : IEnumerable<T>
{
    readonly int size;
    readonly object locker;

    int count;
    int head;
    int rear;
    T[] values;

    public CircularBuffer(int max)
    {
        this.size = max;
        locker = new object();
        count = 0;
        head = 0;
        rear = 0;
        values = new T[size];
    }

    static int Incr(int index, int size)
    {
        return (index + 1) % size;
    }

    private void UnsafeEnsureQueueNotEmpty()
    {
        if (count == 0)
            throw new Exception("Empty queue");
    }

    public int Size { get { return size; } }
    public object SyncRoot { get { return locker; } }

    #region Count

    public int Count { get { return UnsafeCount; } }
    public int SafeCount { get { lock (locker) { return UnsafeCount; } } }
    public int UnsafeCount { get { return count; } }

    #endregion

    #region Enqueue

    public void Enqueue(T obj)
    {
        UnsafeEnqueue(obj);
    }

    public void SafeEnqueue(T obj)
    {
        lock (locker) { UnsafeEnqueue(obj); }
    }

    public void UnsafeEnqueue(T obj)
    {
        values[rear] = obj;

        if (Count == Size)
            head = Incr(head, Size);
        rear = Incr(rear, Size);
        count = Math.Min(count + 1, Size);
    }

    #endregion

    #region Dequeue

    public T Dequeue()
    {
        return UnsafeDequeue();
    }

    public T SafeDequeue()
    {
        lock (locker) { return UnsafeDequeue(); }
    }

    public T UnsafeDequeue()
    {
        UnsafeEnsureQueueNotEmpty();

        T res = values[head];
        values[head] = default(T);
        head = Incr(head, Size);
        count--;

        return res;
    }

    #endregion

    #region Peek

    public T Peek()
    {
        return UnsafePeek();
    }

    public T SafePeek()
    {
        lock (locker) { return UnsafePeek(); }
    }

    public T UnsafePeek()
    {
        UnsafeEnsureQueueNotEmpty();

        return values[head];
    }

    #endregion


    #region GetEnumerator

    public IEnumerator<T> GetEnumerator()
    {
        return UnsafeGetEnumerator();
    }

    public IEnumerator<T> SafeGetEnumerator()
    {
        lock (locker)
        {
            List<T> res = new List<T>(count);
            var enumerator = UnsafeGetEnumerator();
            while (enumerator.MoveNext())
                res.Add(enumerator.Current);
            return res.GetEnumerator();
        }
    }

    public IEnumerator<T> UnsafeGetEnumerator()
    {
        int index = head;
        for (int i = 0; i < count; i++)
        {
            yield return values[index];
            index = Incr(index, size);
        }
    }

    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }

    #endregion
}

J'aime utiliser la Foo()/SafeFoo()/UnsafeFoo()convention:

  • Foo appel de méthodes UnsafeFoo par défaut.
  • UnsafeFoo Les méthodes modifient librement l'état sans verrou, elles ne devraient appeler que d'autres méthodes non sécurisées.
  • SafeFooLes méthodes appellent des UnsafeFoométhodes à l'intérieur d'un verrou.

C'est un peu verbeux, mais cela fait des erreurs évidentes, comme appeler des méthodes non sécurisées en dehors d'un verrou dans une méthode qui est censée être thread-safe, plus apparente.

Juliette
la source
5

Voici mon point de vue sur la file d'attente de taille fixe

Il utilise une file d'attente régulière, pour éviter la surcharge de synchronisation lorsque la Countpropriété est utilisée ConcurrentQueue. Il implémente également IReadOnlyCollectionafin que les méthodes LINQ puissent être utilisées. Le reste est très similaire aux autres réponses ici.

[Serializable]
[DebuggerDisplay("Count = {" + nameof(Count) + "}, Limit = {" + nameof(Limit) + "}")]
public class FixedSizedQueue<T> : IReadOnlyCollection<T>
{
    private readonly Queue<T> _queue = new Queue<T>();
    private readonly object _lock = new object();

    public int Count { get { lock (_lock) { return _queue.Count; } } }
    public int Limit { get; }

    public FixedSizedQueue(int limit)
    {
        if (limit < 1)
            throw new ArgumentOutOfRangeException(nameof(limit));

        Limit = limit;
    }

    public FixedSizedQueue(IEnumerable<T> collection)
    {
        if (collection is null || !collection.Any())
           throw new ArgumentException("Can not initialize the Queue with a null or empty collection", nameof(collection));

        _queue = new Queue<T>(collection);
        Limit = _queue.Count;
    }

    public void Enqueue(T obj)
    {
        lock (_lock)
        {
            _queue.Enqueue(obj);

            while (_queue.Count > Limit)
                _queue.Dequeue();
        }
    }

    public void Clear()
    {
        lock (_lock)
            _queue.Clear();
    }

    public IEnumerator<T> GetEnumerator()
    {
        lock (_lock)
            return new List<T>(_queue).GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}
Ali Zahid
la source
3

Juste pour le plaisir, voici une autre implémentation qui, je crois, répond à la plupart des préoccupations des commentateurs. En particulier, la sécurité des threads est obtenue sans verrouillage et l'implémentation est masquée par la classe d'emballage.

public class FixedSizeQueue<T> : IReadOnlyCollection<T>
{
  private ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
  private int _count;

  public int Limit { get; private set; }

  public FixedSizeQueue(int limit)
  {
    this.Limit = limit;
  }

  public void Enqueue(T obj)
  {
    _queue.Enqueue(obj);
    Interlocked.Increment(ref _count);

    // Calculate the number of items to be removed by this thread in a thread safe manner
    int currentCount;
    int finalCount;
    do
    {
      currentCount = _count;
      finalCount = Math.Min(currentCount, this.Limit);
    } while (currentCount != 
      Interlocked.CompareExchange(ref _count, finalCount, currentCount));

    T overflow;
    while (currentCount > finalCount && _queue.TryDequeue(out overflow))
      currentCount--;
  }

  public int Count
  {
    get { return _count; }
  }

  public IEnumerator<T> GetEnumerator()
  {
    return _queue.GetEnumerator();
  }

  System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
  {
    return _queue.GetEnumerator();
  }
}
erdomke
la source
1
Ceci est cassé s'il est utilisé simultanément - que se passe-t-il si un thread est préempté après l'appel _queue.Enqueue(obj)mais avant Interlocked.Increment(ref _count)et que l'autre thread appelle .Count? Ce serait un mauvais décompte. Je n'ai pas vérifié les autres problèmes.
KFL
3

Ma version est juste une sous-classe de Queuecelles normales ... rien de spécial mais voir tout le monde participer et cela va toujours avec le titre du sujet, je ferais aussi bien de le mettre ici. Il renvoie également les fichiers retirés de la file d'attente au cas où.

public sealed class SizedQueue<T> : Queue<T>
{
    public int FixedCapacity { get; }
    public SizedQueue(int fixedCapacity)
    {
        this.FixedCapacity = fixedCapacity;
    }

    /// <summary>
    /// If the total number of item exceed the capacity, the oldest ones automatically dequeues.
    /// </summary>
    /// <returns>The dequeued value, if any.</returns>
    public new T Enqueue(T item)
    {
        base.Enqueue(item);
        if (base.Count > FixedCapacity)
        {
            return base.Dequeue();
        }
        return default;
    }
}
5argon
la source
2

Ajoutons une autre réponse. Pourquoi cela par rapport aux autres?

1) Simplicité. Essayer de garantir la taille est bien, mais conduit à une complexité inutile qui peut présenter ses propres problèmes.

2) Implémente IReadOnlyCollection, ce qui signifie que vous pouvez utiliser Linq dessus et le transmettre à une variété de choses qui attendent IEnumerable.

3) Pas de verrouillage. La plupart des solutions ci-dessus utilisent des verrous, ce qui est incorrect sur une collection sans verrou.

4) Implémente le même ensemble de méthodes, propriétés et interfaces que ConcurrentQueue, y compris IProducerConsumerCollection, ce qui est important si vous souhaitez utiliser la collection avec BlockingCollection.

Cette implémentation pourrait potentiellement aboutir à plus d'entrées que prévu si TryDequeue échoue, mais la fréquence de ce qui se produit ne semble pas valoir un code spécialisé qui entravera inévitablement les performances et causera ses propres problèmes inattendus.

Si vous voulez absolument garantir une taille, implémenter une méthode Prune () ou similaire semble être la meilleure idée. Vous pouvez utiliser un verrou de lecture ReaderWriterLockSlim dans les autres méthodes (y compris TryDequeue) et prendre un verrou en écriture uniquement lors de l'élagage.

class ConcurrentFixedSizeQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>, ICollection {
    readonly ConcurrentQueue<T> m_concurrentQueue;
    readonly int m_maxSize;

    public int Count => m_concurrentQueue.Count;
    public bool IsEmpty => m_concurrentQueue.IsEmpty;

    public ConcurrentFixedSizeQueue (int maxSize) : this(Array.Empty<T>(), maxSize) { }

    public ConcurrentFixedSizeQueue (IEnumerable<T> initialCollection, int maxSize) {
        if (initialCollection == null) {
            throw new ArgumentNullException(nameof(initialCollection));
        }

        m_concurrentQueue = new ConcurrentQueue<T>(initialCollection);
        m_maxSize = maxSize;
    }

    public void Enqueue (T item) {
        m_concurrentQueue.Enqueue(item);

        if (m_concurrentQueue.Count > m_maxSize) {
            T result;
            m_concurrentQueue.TryDequeue(out result);
        }
    }

    public void TryPeek (out T result) => m_concurrentQueue.TryPeek(out result);
    public bool TryDequeue (out T result) => m_concurrentQueue.TryDequeue(out result);

    public void CopyTo (T[] array, int index) => m_concurrentQueue.CopyTo(array, index);
    public T[] ToArray () => m_concurrentQueue.ToArray();

    public IEnumerator<T> GetEnumerator () => m_concurrentQueue.GetEnumerator();
    IEnumerator IEnumerable.GetEnumerator () => GetEnumerator();

    // Explicit ICollection implementations.
    void ICollection.CopyTo (Array array, int index) => ((ICollection)m_concurrentQueue).CopyTo(array, index);
    object ICollection.SyncRoot => ((ICollection) m_concurrentQueue).SyncRoot;
    bool ICollection.IsSynchronized => ((ICollection) m_concurrentQueue).IsSynchronized;

    // Explicit IProducerConsumerCollection<T> implementations.
    bool IProducerConsumerCollection<T>.TryAdd (T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryAdd(item);
    bool IProducerConsumerCollection<T>.TryTake (out T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryTake(out item);

    public override int GetHashCode () => m_concurrentQueue.GetHashCode();
    public override bool Equals (object obj) => m_concurrentQueue.Equals(obj);
    public override string ToString () => m_concurrentQueue.ToString();
}
Josh
la source
2

Juste parce que personne ne l'a encore dit ... vous pouvez utiliser a LinkedList<T>et ajouter la sécurité des threads:

public class Buffer<T> : LinkedList<T>
{
    private int capacity;

    public Buffer(int capacity)
    {
        this.capacity = capacity;   
    }

    public void Enqueue(T item)
    {
        // todo: add synchronization mechanism
        if (Count == capacity) RemoveLast();
        AddFirst(item);
    }

    public T Dequeue()
    {
        // todo: add synchronization mechanism
        var last = Last.Value;
        RemoveLast();
        return last;
    }
}

Une chose à noter est que l'ordre d'énumération par défaut sera LIFO dans cet exemple. Mais cela peut être annulé si nécessaire.

Brandon
la source
1

Pour votre plaisir de codage, je vous soumets le ' ConcurrentDeck'

public class ConcurrentDeck<T>
{
   private readonly int _size;
   private readonly T[] _buffer;
   private int _position = 0;

   public ConcurrentDeck(int size)
   {
       _size = size;
       _buffer = new T[size];
   }

   public void Push(T item)
   {
       lock (this)
       {
           _buffer[_position] = item;
           _position++;
           if (_position == _size) _position = 0;
       }
   }

   public T[] ReadDeck()
   {
       lock (this)
       {
           return _buffer.Skip(_position).Union(_buffer.Take(_position)).ToArray();
       }
   }
}

Exemple d'utilisation:

void Main()
{
    var deck = new ConcurrentDeck<Tuple<string,DateTime>>(25);
    var handle = new ManualResetEventSlim();
    var task1 = Task.Factory.StartNew(()=>{
    var timer = new System.Timers.Timer();
    timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task1",DateTime.Now));};
    timer.Interval = System.TimeSpan.FromSeconds(1).TotalMilliseconds;
    timer.Enabled = true;
    handle.Wait();
    }); 
    var task2 = Task.Factory.StartNew(()=>{
    var timer = new System.Timers.Timer();
    timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task2",DateTime.Now));};
    timer.Interval = System.TimeSpan.FromSeconds(.5).TotalMilliseconds;
    timer.Enabled = true;
    handle.Wait();
    }); 
    var task3 = Task.Factory.StartNew(()=>{
    var timer = new System.Timers.Timer();
    timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task3",DateTime.Now));};
    timer.Interval = System.TimeSpan.FromSeconds(.25).TotalMilliseconds;
    timer.Enabled = true;
    handle.Wait();
    }); 
    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(10));
    handle.Set();
    var outputtime = DateTime.Now;
    deck.ReadDeck().Select(d => new {Message = d.Item1, MilliDiff = (outputtime - d.Item2).TotalMilliseconds}).Dump(true);
}
Chris Hayes
la source
1
J'aime cette implémentation mais notez que si aucune n'a été ajoutée, elle renvoie la valeur par défaut (T)
Daniel Leach
Si vous utilisez lock de cette manière, vous devez utiliser ReaderWriterLockSlim pour hiérarchiser vos lecteurs.
Josh
1

Eh bien, cela dépend de l'utilisation, j'ai remarqué que certaines des solutions ci-dessus peuvent dépasser la taille lorsqu'elles sont utilisées dans un environnement multithread. Quoi qu'il en soit, mon cas d'utilisation était d'afficher les 5 derniers événements et il y a plusieurs threads qui écrivent des événements dans la file d'attente et un autre thread qui la lit et l'affiche dans un contrôle Winform. C'était donc ma solution.

EDIT: Puisque nous utilisons déjà le verrouillage dans notre implémentation, nous n'avons pas vraiment besoin de ConcurrentQueue, cela peut améliorer les performances.

class FixedSizedConcurrentQueue<T> 
{
    readonly Queue<T> queue = new Queue<T>();
    readonly object syncObject = new object();

    public int MaxSize { get; private set; }

    public FixedSizedConcurrentQueue(int maxSize)
    {
        MaxSize = maxSize;
    }

    public void Enqueue(T obj)
    {
        lock (syncObject)
        {
            queue.Enqueue(obj);
            while (queue.Count > MaxSize)
            {
                queue.Dequeue();
            }
        }
    }

    public T[] ToArray()
    {
        T[] result = null;
        lock (syncObject)
        {
            result = queue.ToArray();
        }

        return result;
    }

    public void Clear()
    {
        lock (syncObject)
        {
            queue.Clear();
        }
    }
}

EDIT: Nous n'avons pas vraiment besoin syncObjectdans l'exemple ci-dessus et nous pouvons plutôt utiliser un queueobjet puisque nous ne réinitialisons queueaucune fonction et qu'il est marqué comme de readonlytoute façon.

Mubashar
la source
0

La réponse acceptée va avoir des effets secondaires évitables.

Mécanismes de verrouillage à grain fin et sans verrouillage

Les liens ci-dessous sont des références que j'ai utilisées lorsque j'ai écrit mon exemple ci-dessous.

Bien que la documentation de Microsoft soit un peu trompeuse car ils utilisent un verrou, ils verrouillent cependant les classes de segment. Les classes de segment elles-mêmes utilisent Interlocked.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;

namespace Lib.Core
{
    // Sources: 
    // https://docs.microsoft.com/en-us/dotnet/standard/collections/thread-safe/
    // https://docs.microsoft.com/en-us/dotnet/api/system.threading.interlocked?view=netcore-3.1
    // https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueue.cs
    // https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueueSegment.cs

    /// <summary>
    /// Concurrent safe circular buffer that will used a fixed capacity specified and resuse slots as it goes.
    /// </summary>
    /// <typeparam name="TObject">The object that you want to go into the slots.</typeparam>
    public class ConcurrentCircularBuffer<TObject>
    {
        private readonly ConcurrentQueue<TObject> _queue;

        public int Capacity { get; private set; }

        public ConcurrentCircularBuffer(int capacity)
        {
            if(capacity <= 0)
            {
                throw new ArgumentException($"The capacity specified '{capacity}' is not valid.", nameof(capacity));
            }

            // Setup the queue to the initial capacity using List's underlying implementation.
            _queue = new ConcurrentQueue<TObject>(new List<TObject>(capacity));

            Capacity = capacity;
        }

        public void Enqueue(TObject @object)
        {
            // Enforce the capacity first so the head can be used instead of the entire segment (slow).
            while (_queue.Count + 1 > Capacity)
            {
                if (!_queue.TryDequeue(out _))
                {
                    // Handle error condition however you want to ie throw, return validation object, etc.
                    var ex = new Exception("Concurrent Dequeue operation failed.");
                    ex.Data.Add("EnqueueObject", @object);
                    throw ex;
                }
            }

            // Place the item into the queue
            _queue.Enqueue(@object);
        }

        public TObject Dequeue()
        {
            if(_queue.TryDequeue(out var result))
            {
                return result;
            }

            return default;
        }
    }
}
jjhayter
la source
0

Voici encore une autre implémentation qui utilise autant que possible le ConcurrentQueue sous-jacent tout en fournissant les mêmes interfaces disponibles via ConcurrentQueue.

/// <summary>
/// This is a FIFO concurrent queue that will remove the oldest added items when a given limit is reached.
/// </summary>
/// <typeparam name="TValue"></typeparam>
public class FixedSizedConcurrentQueue<TValue> : IProducerConsumerCollection<TValue>, IReadOnlyCollection<TValue>
{
    private readonly ConcurrentQueue<TValue> _queue;

    private readonly object _syncObject = new object();

    public int LimitSize { get; }

    public FixedSizedConcurrentQueue(int limit)
    {
        _queue = new ConcurrentQueue<TValue>();
        LimitSize = limit;
    }

    public FixedSizedConcurrentQueue(int limit, System.Collections.Generic.IEnumerable<TValue> collection)
    {
        _queue = new ConcurrentQueue<TValue>(collection);
        LimitSize = limit;

    }

    public int Count => _queue.Count;

    bool ICollection.IsSynchronized => ((ICollection) _queue).IsSynchronized;

    object ICollection.SyncRoot => ((ICollection)_queue).SyncRoot; 

    public bool IsEmpty => _queue.IsEmpty;

    // Not supported until .NET Standard 2.1
    //public void Clear() => _queue.Clear();

    public void CopyTo(TValue[] array, int index) => _queue.CopyTo(array, index);

    void ICollection.CopyTo(Array array, int index) => ((ICollection)_queue).CopyTo(array, index);

    public void Enqueue(TValue obj)
    {
        _queue.Enqueue(obj);
        lock( _syncObject )
        {
            while( _queue.Count > LimitSize ) {
                _queue.TryDequeue(out _);
            }
        }
    }

    public IEnumerator<TValue> GetEnumerator() => _queue.GetEnumerator();

    IEnumerator IEnumerable.GetEnumerator() => ((IEnumerable<TValue>)this).GetEnumerator();

    public TValue[] ToArray() => _queue.ToArray();

    public bool TryAdd(TValue item)
    {
        Enqueue(item);
        return true;
    }

    bool IProducerConsumerCollection<TValue>.TryTake(out TValue item) => TryDequeue(out item);

    public bool TryDequeue(out TValue result) => _queue.TryDequeue(out result);

    public bool TryPeek(out TValue result) => _queue.TryPeek(out result);

}
Tod Cunningham
la source
-1

Voici ma version de la file d'attente:

public class FixedSizedQueue<T> {
  private object LOCK = new object();
  ConcurrentQueue<T> queue;

  public int MaxSize { get; set; }

  public FixedSizedQueue(int maxSize, IEnumerable<T> items = null) {
     this.MaxSize = maxSize;
     if (items == null) {
        queue = new ConcurrentQueue<T>();
     }
     else {
        queue = new ConcurrentQueue<T>(items);
        EnsureLimitConstraint();
     }
  }

  public void Enqueue(T obj) {
     queue.Enqueue(obj);
     EnsureLimitConstraint();
  }

  private void EnsureLimitConstraint() {
     if (queue.Count > MaxSize) {
        lock (LOCK) {
           T overflow;
           while (queue.Count > MaxSize) {
              queue.TryDequeue(out overflow);
           }
        }
     }
  }


  /// <summary>
  /// returns the current snapshot of the queue
  /// </summary>
  /// <returns></returns>
  public T[] GetSnapshot() {
     return queue.ToArray();
  }
}

Je trouve utile d'avoir un constructeur basé sur un IEnumerable et je trouve utile d'avoir un GetSnapshot pour avoir une liste sûre multithread (tableau dans ce cas) des éléments au moment de l'appel, qui ne monte pas erreurs si la collection sous-jacente change.

Le double contrôle de compte permet d'éviter le verrouillage dans certaines circonstances.

Pas important
la source
1
Voter pour le verrouillage de la file d'attente. Si vous voulez absolument verrouiller, un ReaderWriterLockSlim serait le meilleur (en supposant que vous vous attendiez à prendre un verrou en lecture plus souvent qu'un verrou en écriture). GetSnapshot n'est pas non plus nécessaire. Si vous implémentez IReadOnlyCollection <T> (ce que vous devriez pour la sémantique IEnumerable), ToList () remplira la même fonction.
Josh
Le ConcurrentQueue gère les verrous dans son implémentation, voir les liens dans ma réponse.
jjhayter le