Suppression de toutes les tâches en attente dans céleri / rabbitmq

199

Comment puis-je supprimer toutes les tâches en attente sans connaître le task_idpour chaque tâche?

nabizan
la source

Réponses:

296

De la documentation :

$ celery -A proj purge

ou

from proj.celery import app
app.control.purge()

(EDIT: mis à jour avec la méthode actuelle.)

Philip Southam
la source
56
Ou, à partir de Django, pour céleri 3.0+: manage.py celery purge( celeryctlest désormais obsolète et disparaîtra en 3.1).
Henrik Heimbuerger
3
J'ai trouvé cette réponse en cherchant comment faire cela avec un backend redis. La meilleure méthode que j'ai trouvée était celle redis-cli KEYS "celery*" | xargs redis-cli DELqui fonctionnait pour moi. Cela effacera toutes les tâches stockées sur le backend redis que vous utilisez.
Melignus
1
Comment puis-je faire cela dans céleri 3.0?
luistm
2
Pour moi, c'était tout simplement celery purge(à l'intérieur de l'environnement virtuel pertinent). Oups - il y a une réponse avec le même ci-dessous ..... stackoverflow.com/a/20404976/1213425
Erve1879
Pour Celery 4.0+ en combinaison avec Django, c'est à nouveau cette commande, où l'argument to -Aest l'application Django où celery.pyse trouve le.
gitaarik
120

Pour le céleri 3.0+:

$ celery purge

Pour purger une file d'attente spécifique:

$ celery -Q queue_name purge
ToonAlfrink
la source
9
Si vous obtenez des erreurs de connexion, assurez-vous de spécifier l'application, par exemple celery -A proj purge.
Kamil Sindi
25

Pour le céleri 2.x et 3.x:

Lors de l'utilisation d'un travailleur avec le paramètre -Q pour définir des files d'attente, par exemple

celery worker -Q queue1,queue2,queue3

alors celery purgene fonctionnera pas, car vous ne pouvez pas lui passer les paramètres de file d'attente. Il supprimera uniquement la file d'attente par défaut. La solution consiste à démarrer vos travailleurs avec un --purgeparamètre comme celui-ci:

celery worker -Q queue1,queue2,queue3 --purge

Cela exécutera cependant le travailleur.

Une autre option consiste à utiliser la sous-commande amqp de céleri

celery amqp queue.delete queue1
celery amqp queue.delete queue2
celery amqp queue.delete queue3
smido
la source
Oui, c'est pour les anciennes versions (2.x et peut-être 3.x) de céleri. Je ne peux pas modifier la réponse
smido
9

J'ai trouvé que celery purgecela ne fonctionne pas pour ma configuration de céleri plus complexe. J'utilise plusieurs files d'attente nommées à différentes fins:

$ sudo rabbitmqctl list_queues -p celery name messages consumers
Listing queues ...  # Output sorted, whitespaced for readability
celery                                          0   2
[email protected]                      0   1
[email protected]                      0   1
apns                                            0   1
[email protected]                        0   1
analytics                                       1   1
[email protected]                   0   1
bcast.361093f1-de68-46c5-adff-d49ea8f164c0      0   1
bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1      0   1
celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54   0   1
celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866   0   1
celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99   0   1
celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e   0   1

La première colonne est le nom de la file d'attente, la seconde est le nombre de messages en attente dans la file d'attente et la troisième est le nombre d'écouteurs pour cette file d'attente. Les files d'attente sont:

  • céleri - File d'attente pour les tâches de céleri standard et idempotentes
  • apns - File d'attente pour les tâches d'Apple Push Notification Service, pas tout à fait aussi idempotentes
  • analytics - File d'attente pour les analyses nocturnes de longue durée
  • * .pidbox - File d'attente pour les commandes de travail, telles que l'arrêt et la réinitialisation, une par travailleur (2 travailleurs céleri, un travailleur apns, un travailleur analytique)
  • bcast. * - Files d'attente de diffusion, pour envoyer des messages à tous les employés qui écoutent une file d'attente (plutôt que le premier à la récupérer)
  • celeryev. * - Files d'attente d'événements de céleri, pour rapporter l'analyse des tâches

La tâche d'analyse est une tâche de force brute qui fonctionnait très bien sur de petits ensembles de données, mais prend désormais plus de 24 heures à traiter. De temps en temps, quelque chose va mal et il restera bloqué en attente sur la base de données. Il doit être réécrit, mais jusque-là, quand il est bloqué, je tue la tâche, vide la file d'attente et réessaye. Je détecte le "blocage" en regardant le nombre de messages pour la file d'attente d'analyse, qui devrait être 0 (analyse terminée) ou 1 (en attente de la fin de l'analyse de la nuit dernière). 2 ou plus est mauvais et je reçois un e-mail.

celery purge propose d'effacer les tâches de l'une des files d'attente de diffusion, et je ne vois pas d'option pour choisir une autre file d'attente nommée.

Voici mon processus:

$ sudo /etc/init.d/celeryd stop  # Wait for analytics task to be last one, Ctrl-C
$ ps -ef | grep analytics  # Get the PID of the worker, not the root PID reported by celery
$ sudo kill <PID>
$ sudo /etc/init.d/celeryd stop  # Confim dead
$ python manage.py celery amqp queue.purge analytics
$ sudo rabbitmqctl list_queues -p celery name messages consumers  # Confirm messages is 0
$ sudo /etc/init.d/celeryd start
jwhitlock
la source
Pas une réponse cependant, n'est-ce pas? Très instructif cependant!
amn
4
celeryctl purgen'a pas fonctionné avec les files d'attente nommées. python manage.py celery amqp queue.purge <queue_name>fait. Je pense que le contexte est utile pour ceux qui ont des configurations complexes, afin qu'ils puissent comprendre ce qu'ils doivent faire en cas d' celeryctl purgeéchec pour eux.
jwhitlock
Je ne trouve pas manage.pydans mon Celery 3.1.17, le fichier a-t-il été supprimé ou juste une nouvelle fessée? J'ai cependant trouvé ce qui ressemble à l'interface correspondante ( queue.purge) */bin/amqp.py. Mais après avoir essayé de corréler le contenu du fichier avec la documentation, je dois malheureusement admettre que Celery est malheureusement sans papiers et aussi un travail très compliqué, du moins à en juger par son code source.
amn
manage.pyest le script de gestion de Django, et manage.py celeryexécute le céleri après avoir chargé la configuration à partir des paramètres de Django. Je n'ai pas utilisé le céleri en dehors de Django, mais la celerycommande incluse peut être celle que vous recherchez: celery.readthedocs.org/en/latest/userguide/monitoring.html
jwhitlock
5

Dans Céleri 3+

http://docs.celryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks

CLI

Purger la file d'attente nommée:

 celery -A proj amqp queue.purge <queue name>

Purger la file d'attente configurée

celery -A proj purge

J'ai supprimé les messages, mais il reste des messages dans la file d'attente? Réponse: Les tâches sont acquittées (supprimées de la file d'attente) dès qu'elles sont réellement exécutées. Une fois que le travailleur a reçu une tâche, cela prendra un certain temps jusqu'à ce qu'elle soit réellement exécutée, surtout s'il y a déjà beaucoup de tâches en attente d'exécution. Les messages qui ne sont pas acquittés sont conservés par le travailleur jusqu'à ce qu'il ferme la connexion au courtier (serveur AMQP). Lorsque cette connexion est fermée (par exemple parce que le travailleur a été arrêté), les tâches seront renvoyées par le courtier au prochain travailleur disponible (ou au même travailleur lorsqu'il a été redémarré), afin de purger correctement la file d'attente des tâches en attente que vous devez arrêter tous les travailleurs, puis purger les tâches à l'aide de celery.control.purge ().

Donc, pour purger toute la file d'attente, les travailleurs doivent être arrêtés.

oneklc
la source
5

Si vous souhaitez supprimer toutes les tâches en attente ainsi que les tâches actives et réservées pour arrêter complètement Celery, voici ce qui a fonctionné pour moi:

from proj.celery import app
from celery.task.control import inspect, revoke

# remove pending tasks
app.control.purge()

# remove active tasks
i = inspect()
jobs = i.active()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)

# remove reserved tasks
jobs = i.reserved()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)
kahlo
la source
2

1. Pour purger correctement la file d'attente des tâches en attente, vous devez arrêter tous les travailleurs ( http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are- encore-messages-laissés dans la file d'attente ):

$ sudo rabbitmqctl stop

ou (dans le cas où RabbitMQ / courtier de messages est géré par le superviseur):

$ sudo supervisorctl stop all

2. ... puis purgez les tâches d'une file d'attente spécifique:

$ cd <source_dir>
$ celery amqp queue.purge <queue name>

3. Démarrez RabbitMQ:

$ sudo rabbitmqctl start

ou (dans le cas où RabbitMQ est géré par le superviseur):

$ sudo supervisorctl start all
Ukr
la source
2

commande celery 4+ celery purge pour purger toutes les files d'attente de tâches configurées

celery -A *APPNAME* purge

par programme:

from proj.celery import app
app.control.purge()

toutes les tâches en attente seront purgées. Référence: celerydoc

Roshan Bagdiya
la source