from celery.app.control importInspect# Inspect all nodes.
i =Inspect()# Show the items that have an ETA or are scheduled for later processing
i.scheduled()# Show tasks that are currently active.
i.active()# Show tasks that have been claimed by workers
i.reserved()
J'ai essayé ça, mais c'est vraiment lent (comme 1 seconde). Je l'utilise de manière synchrone dans une application tornado pour suivre les progrès, donc cela doit être rapide.
JulienFr
41
Cela ne renverra pas une liste des tâches de la file d'attente qui n'ont pas encore été traitées.
Ed J
9
Utilisez i.reserved()pour obtenir une liste des tâches en file d'attente.
Banana
4
Quelqu'un a-t-il constaté que i.reserved () n'a pas une liste précise des tâches actives? J'ai des tâches en cours d'exécution qui n'apparaissent pas dans la liste. Je suis sur django-celery == 3.1.10
Seperman
6
Lorsque vous spécifiez le travailleur que je devais utiliser une liste comme argument: inspect(['celery@Flatty']). Amélioration énorme de la vitesse inspect().
Adversus du
42
si vous utilisez rabbitMQ, utilisez ceci dans le terminal:
sudo rabbitmqctl list_queues
il imprimera la liste des files d'attente avec le nombre de tâches en attente. par exemple:
le nombre dans la colonne de droite correspond au nombre de tâches dans la file d'attente. ci-dessus, la file d'attente de céleri a 166 tâches en attente.
Je suis familier avec cela lorsque j'ai des privilèges sudo, mais je veux qu'un utilisateur système non privilégié puisse vérifier - des suggestions?
sage
De plus, vous pouvez le faire passer grep -e "^celery\s" | cut -f2pour l'extraire 166si vous souhaitez traiter ce nombre plus tard, par exemple pour les statistiques.
jamesc
22
Si vous n'utilisez pas de tâches prioritaires, c'est en fait assez simple si vous utilisez Redis. Pour obtenir le nombre de tâches:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME
Mais, les tâches prioritaires utilisent une clé différente dans redis , donc l'image complète est légèrement plus compliquée. L'image complète est que vous devez interroger redis pour chaque priorité de tâche. En python (et du projet Flower), cela ressemble à:
PRIORITY_SEP ='\x06\x16'
DEFAULT_PRIORITY_STEPS =[0,3,6,9]def make_queue_name_for_pri(queue, pri):"""Make a queue name for redis
Celery uses PRIORITY_SEP to separate different priorities of tasks into
different queues in Redis. Each queue-priority combination becomes a key in
redis with names like:
- batch1\x06\x163 <-- P3 queue named batch1
There's more information about this in Github, but it doesn't look like it
will change any time soon:
- https://github.com/celery/kombu/issues/422
In that ticket the code below, from the Flower project, is referenced:
- https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135
:param queue: The name of the queue to make a name for.
:param pri: The priority to make a name with.
:return: A name for the queue-priority pair.
"""if pri notin DEFAULT_PRIORITY_STEPS:raiseValueError('Priority not in priority steps')return'{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri)if pri else(queue,'','')))def get_queue_length(queue_name='celery'):"""Get the number of tasks in a celery queue.
:param queue_name: The name of the queue you want to inspect.
:return: the number of items in the queue.
"""
priority_names =[make_queue_name_for_pri(queue_name, pri)for pri in
DEFAULT_PRIORITY_STEPS]
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],)return sum([r.llen(x)for x in priority_names])
Si vous souhaitez obtenir une tâche réelle, vous pouvez utiliser quelque chose comme:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0-1
De là, vous devrez désérialiser la liste renvoyée. Dans mon cas, j'ai pu accomplir cela avec quelque chose comme:
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],)
l = r.lrange('celery',0,-1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))
Sachez simplement que la désérialisation peut prendre un moment et que vous devrez ajuster les commandes ci-dessus pour travailler avec différentes priorités.
J'ai mis à jour ce qui précède pour gérer les tâches prioritaires. Le progrès!
mlissner
1
Juste pour préciser les choses, le DATABASE_NUMBERutilisé par défaut est 0, et le QUEUE_NAMEest celery, donc redis-cli -n 0 llen celeryretournera le nombre de messages en file d'attente.
Vineet Bansal
Pour mon céleri, le nom de la file d'attente est '{{{0}}}{1}{2}'au lieu de '{0}{1}{2}'. A part ça, cela fonctionne parfaitement!
zupo
12
Pour récupérer des tâches depuis le backend, utilisez ceci
Si vous utilisez Celery + Django, le moyen le plus simple d'inspecter les tâches à l'aide de commandes directement depuis votre terminal dans votre environnement virtuel ou en utilisant un chemin complet vers céleri:
Si vous avez un projet de définition, vous pouvez utilisercelery -A my_proj inspect reserved
sashaboulouds
6
Une solution de copier-coller pour Redis avec sérialisation json:
def get_celery_queue_items(queue_name):import base64
import json
# Get a configured instance of a celery app:from yourproject.celery import app as celery_app
with celery_app.pool.acquire(block=True)as conn:
tasks = conn.default_channel.client.lrange(queue_name,0,-1)
decoded_tasks =[]for task in tasks:
j = json.loads(task)
body = json.loads(base64.b64decode(j['body']))
decoded_tasks.append(body)return decoded_tasks
Cela fonctionne avec Django. N'oubliez pas de changer yourproject.celery.
Si vous utilisez le sérialiseur pickle, vous pouvez modifier la body =ligne en body = pickle.loads(base64.b64decode(j['body'])).
Jim Hunziker
4
Le module d'inspection du céleri semble ne connaître les tâches que du point de vue des travailleurs. Si vous souhaitez afficher les messages qui sont dans la file d'attente (qui n'ont pas encore été extraits par les travailleurs), je suggère d'utiliser pyrabbit , qui peut s'interfacer avec l'api http rabbitmq pour récupérer toutes sortes d'informations de la file d'attente.
Je pense que le seul moyen d'obtenir les tâches en attente est de conserver une liste des tâches que vous avez commencées et de laisser la tâche se retirer de la liste lorsqu'elle a commencé.
Si ce que vous voulez inclut la tâche en cours de traitement, mais que vous n'êtes pas encore terminé, vous pouvez conserver une liste de vos tâches et vérifier leurs états:
from tasks import add
result = add.delay(4,4)
result.ready()# True if finished
Vous pouvez également laisser Celery stocker les résultats avec CELERY_RESULT_BACKEND et vérifier lesquelles de vos tâches ne s'y trouvent pas.
@daveoncode Je ne pense pas que ce soit suffisamment d'informations pour que je puisse répondre utilement. Vous pouvez ouvrir votre propre question. Je ne pense pas que ce serait un double de celui-ci si vous spécifiez que vous souhaitez récupérer les informations en python. Je reviendrais sur stackoverflow.com/a/19465670/9843399 , sur lequel j'ai basé ma réponse, et je m'assurerais que cela fonctionne en premier.
Caleb Syring le
@CalebSyring C'est la première approche qui me montre vraiment les tâches en file d'attente. Très agréable. Le seul problème pour moi est que l'ajout de liste ne semble pas fonctionner. Des idées sur la façon dont je peux faire écrire la fonction de rappel dans la liste?
Varlor le
@Varlor Je suis désolé, quelqu'un a modifié ma réponse. Vous pouvez rechercher dans l'historique des modifications la réponse originale, qui fonctionnera probablement pour vous. Je travaille pour résoudre ce problème. (EDIT: je viens d'entrer et j'ai rejeté la modification, qui contenait une erreur python évidente. Faites-moi savoir si cela a résolu votre problème ou non.)
Caleb Syring
@CalebSyring J'ai maintenant utilisé votre code dans une classe, avoir la liste comme attribut de classe fonctionne!
Varlor
2
Autant que je sache, Celery ne donne pas d'API pour examiner les tâches en attente dans la file d'attente. Ceci est spécifique au courtier. Si vous utilisez Redis comme courtier pour un exemple, alors l'examen des tâches en attente dans la celeryfile d'attente (par défaut) est aussi simple que:
se connecter à la base de données du courtier
lister les éléments de la celeryliste (commande LRANGE pour un exemple)
Gardez à l'esprit qu'il s'agit de tâches EN ATTENTE d'être sélectionnées par les travailleurs disponibles. Votre cluster peut avoir des tâches en cours d'exécution - celles-ci ne figureront pas dans cette liste car elles ont déjà été sélectionnées.
J'en suis venu à la conclusion que le meilleur moyen d'obtenir le nombre de travaux sur une file d'attente est d'utiliser rabbitmqctlcomme cela a été suggéré à plusieurs reprises ici. Pour permettre à tout utilisateur choisi d'exécuter la commande avec, sudoj'ai suivi les instructions ici (j'ai ignoré la modification de la partie du profil car cela ne me dérange pas de taper sudo avant la commande.)
J'ai également attrapé les jamesc grepet l' cutextrait de code et je l'ai enveloppé dans des appels de sous-processus.
from subprocess importPopen, PIPE
p1 =Popen(["sudo","rabbitmqctl","list_queues","-p","[name of your virtula host"], stdout=PIPE)
p2 =Popen(["grep","-e","^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 =Popen(["cut","-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()print("number of jobs on queue: %i"% int(p3.communicate()[0]))
from celery.task.control import inspect
def key_in_list(k, l):return bool([Truefor i in l if k in i.values()])def check_task(task_id):
task_value_dict = inspect().active().values()for task_list in task_value_dict:if self.key_in_list(task_id, task_list):returnTruereturnFalse
Si vous contrôlez le code des tâches, vous pouvez contourner le problème en laissant une tâche déclencher une nouvelle tentative triviale la première fois qu'elle s'exécute, puis en vérifiant inspect().reserved(). La nouvelle tentative enregistre la tâche avec le moteur de résultat, et céleri peut le voir. La tâche doit accepter selfou contextcomme premier paramètre afin que nous puissions accéder au nombre de tentatives.
Cette solution est indépendante du courtier, c'est-à-dire. vous n'avez pas à vous soucier de savoir si vous utilisez RabbitMQ ou Redis pour stocker les tâches.
EDIT: après les tests, j'ai trouvé que ce n'était qu'une solution partielle. La taille de réservé est limitée au paramètre de prélecture pour le worker.
Réponses:
MODIFIER: voir les autres réponses pour obtenir une liste des tâches dans la file d'attente.
Vous devriez regarder ici: Guide du céleri - Inspection des ouvriers
Fondamentalement ceci:
Selon ce que vous voulez
la source
i.reserved()
pour obtenir une liste des tâches en file d'attente.inspect(['celery@Flatty'])
. Amélioration énorme de la vitesseinspect()
.si vous utilisez rabbitMQ, utilisez ceci dans le terminal:
il imprimera la liste des files d'attente avec le nombre de tâches en attente. par exemple:
le nombre dans la colonne de droite correspond au nombre de tâches dans la file d'attente. ci-dessus, la file d'attente de céleri a 166 tâches en attente.
la source
grep -e "^celery\s" | cut -f2
pour l'extraire166
si vous souhaitez traiter ce nombre plus tard, par exemple pour les statistiques.Si vous n'utilisez pas de tâches prioritaires, c'est en fait assez simple si vous utilisez Redis. Pour obtenir le nombre de tâches:
Mais, les tâches prioritaires utilisent une clé différente dans redis , donc l'image complète est légèrement plus compliquée. L'image complète est que vous devez interroger redis pour chaque priorité de tâche. En python (et du projet Flower), cela ressemble à:
Si vous souhaitez obtenir une tâche réelle, vous pouvez utiliser quelque chose comme:
De là, vous devrez désérialiser la liste renvoyée. Dans mon cas, j'ai pu accomplir cela avec quelque chose comme:
Sachez simplement que la désérialisation peut prendre un moment et que vous devrez ajuster les commandes ci-dessus pour travailler avec différentes priorités.
la source
DATABASE_NUMBER
utilisé par défaut est0
, et leQUEUE_NAME
estcelery
, doncredis-cli -n 0 llen celery
retournera le nombre de messages en file d'attente.'{{{0}}}{1}{2}'
au lieu de'{0}{1}{2}'
. A part ça, cela fonctionne parfaitement!Pour récupérer des tâches depuis le backend, utilisez ceci
la source
Si vous utilisez Celery + Django, le moyen le plus simple d'inspecter les tâches à l'aide de commandes directement depuis votre terminal dans votre environnement virtuel ou en utilisant un chemin complet vers céleri:
Doc : http://docs.celleryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers
De plus, si vous utilisez Celery + RabbitMQ, vous pouvez inspecter la liste des files d'attente à l'aide de la commande suivante:
Plus d'infos : https://linux.die.net/man/1/rabbitmqctl
la source
celery -A my_proj inspect reserved
Une solution de copier-coller pour Redis avec sérialisation json:
Cela fonctionne avec Django. N'oubliez pas de changer
yourproject.celery
.la source
body =
ligne enbody = pickle.loads(base64.b64decode(j['body']))
.Le module d'inspection du céleri semble ne connaître les tâches que du point de vue des travailleurs. Si vous souhaitez afficher les messages qui sont dans la file d'attente (qui n'ont pas encore été extraits par les travailleurs), je suggère d'utiliser pyrabbit , qui peut s'interfacer avec l'api http rabbitmq pour récupérer toutes sortes d'informations de la file d'attente.
Un exemple peut être trouvé ici: Récupérer la longueur de la file d'attente avec Celery (RabbitMQ, Django)
la source
Je pense que le seul moyen d'obtenir les tâches en attente est de conserver une liste des tâches que vous avez commencées et de laisser la tâche se retirer de la liste lorsqu'elle a commencé.
Avec rabbitmqctl et list_queues, vous pouvez avoir un aperçu du nombre de tâches en attente, mais pas des tâches elles-mêmes: http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
Si ce que vous voulez inclut la tâche en cours de traitement, mais que vous n'êtes pas encore terminé, vous pouvez conserver une liste de vos tâches et vérifier leurs états:
Vous pouvez également laisser Celery stocker les résultats avec CELERY_RESULT_BACKEND et vérifier lesquelles de vos tâches ne s'y trouvent pas.
la source
Cela a fonctionné pour moi dans mon application:
active_jobs
sera une liste de chaînes correspondant aux tâches de la file d'attente.N'oubliez pas d'échanger CELERY_APP_INSTANCE avec le vôtre.
Merci à @ashish de m'avoir pointé dans la bonne direction avec sa réponse ici: https://stackoverflow.com/a/19465670/9843399
la source
jobs
est toujours zéro ... une idée?Autant que je sache, Celery ne donne pas d'API pour examiner les tâches en attente dans la file d'attente. Ceci est spécifique au courtier. Si vous utilisez Redis comme courtier pour un exemple, alors l'examen des tâches en attente dans la
celery
file d'attente (par défaut) est aussi simple que:celery
liste (commande LRANGE pour un exemple)Gardez à l'esprit qu'il s'agit de tâches EN ATTENTE d'être sélectionnées par les travailleurs disponibles. Votre cluster peut avoir des tâches en cours d'exécution - celles-ci ne figureront pas dans cette liste car elles ont déjà été sélectionnées.
la source
J'en suis venu à la conclusion que le meilleur moyen d'obtenir le nombre de travaux sur une file d'attente est d'utiliser
rabbitmqctl
comme cela a été suggéré à plusieurs reprises ici. Pour permettre à tout utilisateur choisi d'exécuter la commande avec,sudo
j'ai suivi les instructions ici (j'ai ignoré la modification de la partie du profil car cela ne me dérange pas de taper sudo avant la commande.)J'ai également attrapé les jamesc
grep
et l'cut
extrait de code et je l'ai enveloppé dans des appels de sous-processus.la source
la source
Si vous contrôlez le code des tâches, vous pouvez contourner le problème en laissant une tâche déclencher une nouvelle tentative triviale la première fois qu'elle s'exécute, puis en vérifiant
inspect().reserved()
. La nouvelle tentative enregistre la tâche avec le moteur de résultat, et céleri peut le voir. La tâche doit accepterself
oucontext
comme premier paramètre afin que nous puissions accéder au nombre de tentatives.Cette solution est indépendante du courtier, c'est-à-dire. vous n'avez pas à vous soucier de savoir si vous utilisez RabbitMQ ou Redis pour stocker les tâches.
EDIT: après les tests, j'ai trouvé que ce n'était qu'une solution partielle. La taille de réservé est limitée au paramètre de prélecture pour le worker.
la source
Avec
subprocess.run
:Attention à changer
my_proj
avecyour_proj
la source