J'ai besoin d'une file d'attente dans laquelle plusieurs threads peuvent mettre des éléments et à partir de laquelle plusieurs threads peuvent lire.
Python a au moins deux classes de files d'attente, Queue.Queue et collections.deque, la première utilisant apparemment la seconde en interne. Les deux prétendent être thread-safe dans la documentation.
Cependant, la documentation de la file d'attente indique également:
collections.deque est une implémentation alternative de files d'attente illimitées avec des opérations atomiques rapides append () et popleft () qui ne nécessitent pas de verrouillage.
Ce que je suppose que je ne comprends pas tout à fait: cela signifie-t-il que deque n'est pas entièrement thread-safe après tout?
Si c'est le cas, je ne comprends peut-être pas entièrement la différence entre les deux classes. Je peux voir que Queue ajoute une fonctionnalité de blocage. D'autre part, il perd certaines fonctionnalités deque comme la prise en charge de l'opérateur interne.
Accéder directement à l'objet deque interne est
x dans la file d'attente (). deque
thread-safe?
Aussi, pourquoi Queue utilise-t-il un mutex pour ses opérations alors que deque est déjà thread-safe?
la source
RuntimeError: deque mutated during iteration
est ce que vous pourriez obtenir est d'utiliser un partagedeque
entre plusieurs threads et pas de verrouillage ...deque
itération en cours, même dans le même thread. La seule raison pour laquelle vous ne pouvez pas obtenir cette erreurQueue
est qu'elleQueue
ne prend pas en charge l'itération.Réponses:
Queue.Queue
etcollections.deque
servent à des fins différentes. Queue.Queue est destiné à permettre à différents threads de communiquer à l'aide de messages / données en file d'attente, alors qu'ilcollections.deque
est simplement conçu comme une structure de données. C'est pourquoiQueue.Queue
a des méthodes telles queput_nowait()
,get_nowait()
etjoin()
, alorscollections.deque
ne fonctionne pas.Queue.Queue
n'est pas destiné à être utilisé comme une collection, c'est pourquoi il n'a pas les goûts de l'in
opérateur.Cela se résume à ceci: si vous avez plusieurs threads et que vous voulez qu'ils puissent communiquer sans avoir besoin de verrous, vous recherchez
Queue.Queue
; si vous voulez juste une file d'attente ou une file d'attente double comme structure de données, utilisezcollections.deque
.Enfin, accéder et manipuler le deque interne d'un
Queue.Queue
est jouer avec le feu - vous ne voulez vraiment pas faire cela.la source
Queue.Queue
, il utilisedeque
sous le capot.collections.deque
est une collection, tandis queQueue.Queue
c'est un mécanisme de communication. La surchargeQueue.Queue
est de le rendre threadsafe. Utiliserdeque
pour communiquer entre les threads ne mènera qu'à des courses douloureuses. Chaque foisdeque
qu'il est threadsafe, c'est un heureux accident de la façon dont l'interpréteur est implémenté, et non quelque chose sur lequel on peut se fier. C'est pourquoiQueue.Queue
existe en premier lieu.deque is threadsafe by accident due to the existence of GIL
; il est vrai quedeque
cela s'appuie sur GIL pour garantir la sécurité des threads - mais ce n'est pas le casby accident
. La documentation officielle de python indique clairement que les méthodesdeque
pop*
/append*
sont thread-safe. Ainsi, toute implémentation python valide doit fournir la même garantie (les implémentations sans GIL devront trouver comment faire cela sans GIL). Vous pouvez compter sur ces garanties en toute sécurité.deque
pour la communication. Si vous vous enroulezpop
dans untry/except
, vous vous retrouverez avec une boucle occupée consommant une énorme quantité de CPU en attendant de nouvelles données. Cela semble être une approche horriblement inefficace par rapport aux appels de blocage proposés parQueue
, qui garantissent que le thread en attente de données passera en veille et ne perdra pas de temps CPU.Queue.Queue
, car il est écrit à l'aide decollections.deque
: hg.python.org/cpython/file/2.7/Lib/Queue.py - il utilise des variables de condition pour permettre efficacement l'accès à l'deque
enveloppe. au-dessus des limites de filetage de manière sûre et efficace L'explication de la façon dont vous utiliseriez undeque
pour la communication est juste là dans la source.Si tout ce que vous recherchez est un moyen sûr pour les threads de transférer des objets entre les threads , les deux fonctionneraient (à la fois pour FIFO et LIFO). Pour FIFO:
Queue.put()
etQueue.get()
sont thread-safedeque.append()
etdeque.popleft()
sont thread-safeRemarque:
deque
peuvent ne pas être thread-safe, je ne suis pas sûr.deque
ne bloque paspop()
oupopleft()
donc vous ne pouvez pas baser votre flux de thread consommateur sur le blocage jusqu'à ce qu'un nouvel élément arrive.Cependant, il semble que deque présente un avantage d'efficacité significatif . Voici quelques résultats de référence en quelques secondes en utilisant CPython 2.7.3 pour insérer et supprimer 100 000 éléments
Voici le code de référence:
la source
deque
peuvent ne pas être thread-safe". D'où vient ça?Pour plus d'informations, il existe un ticket Python référencé pour deque thread-safety ( https://bugs.python.org/issue15329 ). Titre "clarifier quelles méthodes deque sont thread-safe"
Bottom line ici: https://bugs.python.org/issue15329#msg199368
Quoi qu'il en soit, si vous n'êtes pas sûr à 100% et que vous préférez la fiabilité à la performance, mettez simplement un verrou similaire;)
la source
Toutes les méthodes à un seul élément sur
deque
sont atomiques et thread-safe. Toutes les autres méthodes sont également thread-safe. Des choses commelen(dq)
,dq[4]
donnent des valeurs correctes momentanées. Mais pensez par exemple àdq.extend(mylist)
ceci: vous n'obtenez pas la garantie que tous les éléments dansmylist
sont classés dans une ligne lorsque d'autres threads ajoutent également des éléments du même côté - mais ce n'est généralement pas une exigence dans la communication inter-thread et pour la tâche en question.Donc, a
deque
est ~ 20x plus rapide queQueue
(qui utilise adeque
sous le capot) et à moins que vous n'ayez pas besoin de l'API de synchronisation "confortable" (blocage / timeout), de l'maxsize
obéissance stricte ou du "Override these methods (_put, _get, ..) ) pour implémenter le comportement de sous-classification d' autres organisations de files d'attente , ou lorsque vous vous occupez de telles choses vous-même, alors un baredeque
est une bonne et efficace affaire pour la communication inter-thread à grande vitesse.En fait, l'utilisation intensive d'un mutex supplémentaire et d'
._get()
appels de méthode supplémentaires, etc.Queue.py
est due à des contraintes de compatibilité ascendante, à une sur-conception passée et au manque de soin pour fournir une solution efficace à cet important problème de goulot d'étranglement de vitesse dans la communication inter-thread. Une liste était utilisée dans les anciennes versions de Python - mais même list.append () /. Pop (0) était et est atomique et threadsafe ...la source
En ajoutant
notify_all()
à chacun d'euxdeque
append
, despopleft
résultats bien piresdeque
que l'amélioration de 20x obtenue par ledeque
comportement par défaut :@Jonathan modifie un peu son code et j'obtiens le benchmark en utilisant cPython 3.6.2 et j'ajoute une condition dans la boucle deque pour simuler le comportement de Queue.
Et il semble que les performances soient limitées par cette fonction
condition.notify_all()
la source
deque
est thread-safe. «opérations qui ne nécessitent pas de verrouillage» signifie que vous n'avez pas à faire le verrouillage vous-même, ledeque
s'en charge.En regardant la
Queue
source, le deque interne est appeléself.queue
et utilise un mutex pour les accesseurs et les mutations, ilQueue().queue
n'est donc pas sûr à utiliser pour les threads.Si vous recherchez un opérateur "in", alors une deque ou une file d'attente n'est peut-être pas la structure de données la plus appropriée pour votre problème.
la source
(il semble que je n'ai pas la réputation de commenter ...) Vous devez faire attention aux méthodes de deque que vous utilisez à partir de différents threads.
deque.get () semble être threadsafe, mais j'ai trouvé que faire
peut échouer si un autre thread ajoute des éléments en même temps. J'ai eu une RuntimeException qui se plaignait de "deque muté pendant l'itération".
Vérifiez collectionsmodule.c pour voir quelles opérations sont affectées par cela
la source
>>> di = {1:None} >>> for x in di: del di[x]
while
boucle.