Comment vérifier l'état des tâches dans Celery?

92

Comment vérifier si une tâche est en cours dans le céleri (en particulier, j'utilise céleri-django)?

J'ai lu la documentation et j'ai cherché sur Google, mais je ne vois pas un appel comme:

my_example_task.state() == RUNNING

Mon cas d'utilisation est que j'ai un service externe (java) pour le transcodage. Lorsque j'envoie un document à transcoder, je souhaite vérifier si la tâche qui exécute ce service est en cours d'exécution, et sinon, la (re) démarrer.

J'utilise les versions stables actuelles - 2.4, je crois.

Marcin
la source

Réponses:

97

Renvoyez le task_id (qui est donné par .delay ()) et demandez ensuite à l'instance de céleri l'état:

x = method.delay(1,2)
print x.task_id

Lorsque vous le demandez, obtenez un nouveau AsyncResult en utilisant ce task_id:

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()
Gregor
la source
10
Merci, mais que faire si je n'y ai pas accès x?
Marcin
4
Où mettez-vous vos travaux en file d'attente dans le céleri? Là, vous devez retourner le task_id pour suivre le travail à l'avenir.
Gregor
Contrairement à @ Marcin, cette réponse n'utilise pas la méthode statique Task.AsyncResult () comme fabrique de AsyncResult, qui réutilise utilement la configuration du backend, sinon une erreur est déclenchée lors de la tentative d'obtention du résultat.
ArnauOrriols
2
@Chris La controverse avec le code @gregor est dans l'instanciation de async_result. Dans votre cas d'utilisation, vous avez déjà l'instance, vous êtes prêt à partir. Mais que se passe-t-il si vous ne disposez que de l'ID de tâche et que vous devez instancier une async_resultinstance pour pouvoir appeler async_result.get()? Il s'agit d'une instance de la AsyncResultclasse, mais vous ne pouvez pas utiliser la classe brute celery.result.AsyncResult, vous devez récupérer la classe à partir de la fonction encapsulée par app.task(). Dans votre cas, vous le feriezasync_result = run_instance.AsyncResult('task-id')
ArnauOrriols
1
but you cannot use the raw class celery.result.AsyncResult, you need to get the class from the function wrapped by app.task(). - Je pense que c'est ainsi qu'il était censé être utilisé. Lisez le code: github.com/celery/celery/blob/…
nevelis
70

La création d'un AsyncResultobjet à partir de l'ID de tâche est la méthode recommandée dans la FAQ pour obtenir l'état de la tâche lorsque la seule chose dont vous disposez est l'ID de la tâche.

Cependant, à partir de Celery 3.x, il y a des mises en garde importantes qui pourraient mordre les gens s'ils ne font pas attention à eux. Cela dépend vraiment du scénario d'utilisation spécifique.

Par défaut, Celery n'enregistre pas un état "en cours d'exécution".

Pour que Celery enregistre qu'une tâche est en cours d'exécution, vous devez définir task_track_startedsur True. Voici une tâche simple qui teste ceci:

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state

Quand task_track_startedest False, qui est la valeur par défaut, le show d'état est PENDINGmême si la tâche a commencé. Si vous définissez task_track_startedsur True, l'état sera STARTED.

L'État PENDINGsignifie «je ne sais pas».

Un AsyncResultavec l'étatPENDING ne veut rien dire de plus que le céleri ne connaît pas l'état de la tâche. Cela peut être dû à un certain nombre de raisons.

D'une part, AsyncResultpeut être construit avec des identifiants de tâches non valides. Ces «tâches» seront considérées comme en attente par Celery:

>>> task.AsyncResult("invalid").status
'PENDING'

Ok, donc personne ne va fournir des identifiants manifestement invalides AsyncResult. Assez juste, mais cela a aussi pour effet de AsyncResultconsidérer également une tâche qui s'est déroulée avec succès mais que Céleri a oublié comme étant PENDING. Encore une fois, dans certains scénarios de cas d'utilisation, cela peut être un problème. Une partie du problème dépend de la façon dont Celery est configuré pour conserver les résultats des tâches, car cela dépend de la disponibilité des «tombstones» dans le backend des résultats. ("Tombstones" est le terme utilisé dans la documentation Celery pour les blocs de données qui enregistrent la fin de la tâche.) L'utilisation AsyncResultne fonctionnera pas du tout si task_ignore_resultc'est le cas True. Un problème plus épineux est que Celery expire les pierres tombales par défaut. leresult_expiresle réglage par défaut est défini sur 24 heures. Donc, si vous lancez une tâche et enregistrez l'identifiant dans le stockage à long terme, et plus encore 24 heures plus tard, vous créez un AsyncResultavec elle, le statut sera PENDING.

Toutes les «vraies tâches» commencent dans l' PENDINGétat. Ainsi, entreprendre PENDINGune tâche pourrait signifier que la tâche a été demandée mais n'a jamais progressé plus loin que cela (pour une raison quelconque). Ou cela pourrait signifier que la tâche s'est exécutée mais que Celery a oublié son état.

Aie! AsyncResultne fonctionnera pas pour moi. Que puis-je faire d'autre?

Je préfère garder une trace des objectifs que de suivre les tâches elles-mêmes . Je garde certaines informations sur les tâches, mais elles sont vraiment secondaires au suivi des objectifs. Les buts sont stockés dans un stockage indépendant du céleri. Lorsqu'une demande doit effectuer un calcul dépend de l'atteinte d'un objectif, elle vérifie si l'objectif a déjà été atteint, si oui, elle utilise cet objectif mis en cache, sinon elle démarre la tâche qui affectera l'objectif, et envoie à le client qui a fait la demande HTTP une réponse qui indique qu'il doit attendre un résultat.


Les noms de variables et les hyperliens ci-dessus concernent Celery 4.x. Dans 3.x les variables et les liens hypertextes correspondants sont: CELERY_TRACK_STARTED, CELERY_IGNORE_RESULT, CELERY_TASK_RESULT_EXPIRES.

Louis
la source
Donc, si je veux vérifier le résultat plus tard (peut-être même dans un autre processus), je suis mieux avec ma propre implémentation? Stockage manuel du résultat dans la base de données?
Franklin Yu
Oui, je séparerais le suivi du «but» du suivi des «tâches». J'ai écrit "effectuer un calcul qui dépend d'un objectif". Habituellement, le «but» est aussi un calcul. Par exemple, si je veux montrer l'article X à un utilisateur, je dois le convertir de XML en HTML, mais avant cela, je dois avoir résolu toutes les références bibliographiques. (X est comme un article de journal.) Je vérifie si l'objectif «article X avec toutes les références bibliographiques résolues» existe et je l'utilise plutôt que d'essayer de vérifier l'état de la tâche d'une tâche Celery qui aurait calculé l'objectif que je souhaite.
Louis
Et l'information «article X avec toutes les références bibliographiques résolues» est stockée dans une mémoire cache et stockée dans une base de données eXist-db.
Louis
61

Chaque Taskobjet a une .requestpropriété, qui le contient AsyncRequestobjet. En conséquence, la ligne suivante donne l'état d'une tâche task:

task.AsyncResult(task.request.id).state
Marcin
la source
2
Existe-t-il un moyen de stocker le pourcentage d'avancement d'une tâche?
patrick
4
Lorsque je fais cela, j'obtiens un AsyncResult PENDING de manière permanente, même si j'attends assez longtemps que la tâche se termine. Y a-t-il un moyen de faire cela voir les changements d'état? Je crois que mon backend est configuré et j'ai essayé de définir CELERY_TRACK_STARTED = True en vain.
dstromberg
1
@dstromberg Malheureusement, cela faisait 4 ans que c'était un problème pour moi, donc je ne peux pas vous aider. Vous devez presque certainement configurer le céleri pour suivre l'état.
Marcin
16

Vous pouvez également créer des états personnalisés et mettre à jour son exécution des tâches de valeur. Cet exemple est tiré de la documentation:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states

msangel
la source
11

Ancienne question mais j'ai récemment rencontré ce problème.

Si vous essayez d'obtenir le task_id, vous pouvez le faire comme ceci:

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

Vous savez maintenant exactement ce qu'est le task_id et pouvez maintenant l'utiliser pour obtenir le AsyncResult:

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4
Cesar Rios
la source
3
Il n'est absolument pas nécessaire de créer votre propre ID de tâche et de le transmettre à apply_async. L'objet renvoyé par apply_async est un AsyncResultobjet, qui possède l'ID de la tâche générée par Celery.
Louis
1
Corrigez-moi si je me trompe, mais n'est-il pas parfois utile de générer un UUID basé sur certaines entrées, de sorte que tous les appels ayant les mêmes entrées aient le même UUID? IOW, il est peut-être parfois utile de spécifier votre task_id.
dstromberg
1
@dstromberg La question posée par l'OP est "comment puis-je vérifier l'état des tâches" et la réponse ici dit "Si vous essayez d'obtenir le task_id ...". Ni la vérification du statut de la tâche, ni l’obtention ne task_idnécessitent de générer vous -même un identifiant de tâche. Dans votre commentaire, vous avez imaginé une raison qui va au-delà de "comment puis-je vérifier l'état de la tâche" et "Si vous essayez d'obtenir le task_id ..." Super si vous avez ce besoin mais ce n'est pas le cas ici. (D'ailleurs, utiliser uuid()pour générer un identifiant de tâche ne fait absolument rien au-delà de ce que fait Celery par défaut.)
Louis
Je conviens que l'OP n'a pas spécifiquement demandé comment obtenir des ID de tâches prévisibles, mais la réponse à la question du OP est actuellement "suivre l'ID de tâche et faire x". Il me semble que le suivi de l'ID de tâche n'est pas pratique dans une grande variété de situations, de sorte que la réponse peut ne pas être réellement satisfaisante. Cette réponse m'aide à résoudre mon cas d'utilisation (si je peux surmonter d'autres limitations notées) pour la même raison que @dstromberg souligne - qu'elle soit motivée ou non pour cette raison.
claytond
7

Utilisez simplement cette API de la FAQ sur le céleri

result = app.AsyncResult(task_id)

Cela fonctionne très bien.

David Ding
la source
1

Réponse de 2020:

#### tasks.py
@celery.task()
def mytask(arg1):
    print(arg1)

#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
    process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
    state = process.state
    return f"Thanks for your patience, your job {process.task_id} \
             is being processed. Status {state}"
Adrian Garcia Moreno
la source
0

Essayer:

task.AsyncResult(task.request.id).state

cela fournira le statut de la tâche céleri. Si la tâche Celery est déjà en état d' échec , elle lancera une exception:

raised unexpected: KeyError('exc_type',)

gogasca
la source
0

J'ai trouvé des informations utiles dans le

Guide des travailleurs du projet céleri inspecteurs-travailleurs

Pour mon cas, je vérifie si Celery fonctionne.

inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
    state = 'FAILURE'
else:
    state = str(task.state) 

Vous pouvez jouer avec inspect pour obtenir vos besoins.

zerocog
la source
0
  • D'abord , dans votre application céleri :

vi my_celery_apps / app1.py

app = Celery(worker_name)
  • et ensuite, passez au fichier de tâches , importez l'application à partir de votre module d'application céleri.

vi tâches / task1.py

from my_celery_apps.app1 import app

app.AsyncResult(taskid)

try:
   if task.state.lower() != "success":
        return
except:
    """ do something """

Vous ZhengChuan
la source
-1

En dehors de l'approche programmatique ci-dessus L'utilisation du statut de la tâche de fleur peut être facilement vue.

Surveillance en temps réel à l'aide des événements Celery. Flower est un outil Web permettant de surveiller et d'administrer les grappes de céleri.

  1. Progression et historique des tâches
  2. Possibilité d'afficher les détails de la tâche (arguments, heure de début, exécution, etc.)
  3. Graphiques et statistiques

Document officiel: Fleur - Outil de surveillance du céleri

Installation:

$ pip install flower

Usage:

http://localhost:5555
Roshan Bagdiya
la source
-1
res = method.delay()
    
print(f"id={res.id}, state={res.state}, status={res.status} ")

print(res.get())
Saurabh I
la source
2
Veuillez ne pas publier uniquement le code comme réponse, mais également expliquer ce que fait votre code et comment il résout le problème de la question. Les réponses avec une explication sont généralement plus utiles et de meilleure qualité, et sont plus susceptibles d'attirer des votes positifs.
Mark Rotteveel le