Comment vérifier si une tâche est en cours dans le céleri (en particulier, j'utilise céleri-django)?
J'ai lu la documentation et j'ai cherché sur Google, mais je ne vois pas un appel comme:
my_example_task.state() == RUNNING
Mon cas d'utilisation est que j'ai un service externe (java) pour le transcodage. Lorsque j'envoie un document à transcoder, je souhaite vérifier si la tâche qui exécute ce service est en cours d'exécution, et sinon, la (re) démarrer.
J'utilise les versions stables actuelles - 2.4, je crois.
x
?async_result
. Dans votre cas d'utilisation, vous avez déjà l'instance, vous êtes prêt à partir. Mais que se passe-t-il si vous ne disposez que de l'ID de tâche et que vous devez instancier uneasync_result
instance pour pouvoir appelerasync_result.get()
? Il s'agit d'une instance de laAsyncResult
classe, mais vous ne pouvez pas utiliser la classe brutecelery.result.AsyncResult
, vous devez récupérer la classe à partir de la fonction encapsulée parapp.task()
. Dans votre cas, vous le feriezasync_result = run_instance.AsyncResult('task-id')
but you cannot use the raw class celery.result.AsyncResult, you need to get the class from the function wrapped by app.task().
- Je pense que c'est ainsi qu'il était censé être utilisé. Lisez le code: github.com/celery/celery/blob/…La création d'un
AsyncResult
objet à partir de l'ID de tâche est la méthode recommandée dans la FAQ pour obtenir l'état de la tâche lorsque la seule chose dont vous disposez est l'ID de la tâche.Cependant, à partir de Celery 3.x, il y a des mises en garde importantes qui pourraient mordre les gens s'ils ne font pas attention à eux. Cela dépend vraiment du scénario d'utilisation spécifique.
Par défaut, Celery n'enregistre pas un état "en cours d'exécution".
Pour que Celery enregistre qu'une tâche est en cours d'exécution, vous devez définir
task_track_started
surTrue
. Voici une tâche simple qui teste ceci:Quand
task_track_started
estFalse
, qui est la valeur par défaut, le show d'état estPENDING
même si la tâche a commencé. Si vous définisseztask_track_started
surTrue
, l'état seraSTARTED
.L'État
PENDING
signifie «je ne sais pas».Un
AsyncResult
avec l'étatPENDING
ne veut rien dire de plus que le céleri ne connaît pas l'état de la tâche. Cela peut être dû à un certain nombre de raisons.D'une part,
AsyncResult
peut être construit avec des identifiants de tâches non valides. Ces «tâches» seront considérées comme en attente par Celery:Ok, donc personne ne va fournir des identifiants manifestement invalides
AsyncResult
. Assez juste, mais cela a aussi pour effet deAsyncResult
considérer également une tâche qui s'est déroulée avec succès mais que Céleri a oublié comme étantPENDING
. Encore une fois, dans certains scénarios de cas d'utilisation, cela peut être un problème. Une partie du problème dépend de la façon dont Celery est configuré pour conserver les résultats des tâches, car cela dépend de la disponibilité des «tombstones» dans le backend des résultats. ("Tombstones" est le terme utilisé dans la documentation Celery pour les blocs de données qui enregistrent la fin de la tâche.) L'utilisationAsyncResult
ne fonctionnera pas du tout sitask_ignore_result
c'est le casTrue
. Un problème plus épineux est que Celery expire les pierres tombales par défaut. leresult_expires
le réglage par défaut est défini sur 24 heures. Donc, si vous lancez une tâche et enregistrez l'identifiant dans le stockage à long terme, et plus encore 24 heures plus tard, vous créez unAsyncResult
avec elle, le statut seraPENDING
.Toutes les «vraies tâches» commencent dans l'
PENDING
état. Ainsi, entreprendrePENDING
une tâche pourrait signifier que la tâche a été demandée mais n'a jamais progressé plus loin que cela (pour une raison quelconque). Ou cela pourrait signifier que la tâche s'est exécutée mais que Celery a oublié son état.Aie!
AsyncResult
ne fonctionnera pas pour moi. Que puis-je faire d'autre?Je préfère garder une trace des objectifs que de suivre les tâches elles-mêmes . Je garde certaines informations sur les tâches, mais elles sont vraiment secondaires au suivi des objectifs. Les buts sont stockés dans un stockage indépendant du céleri. Lorsqu'une demande doit effectuer un calcul dépend de l'atteinte d'un objectif, elle vérifie si l'objectif a déjà été atteint, si oui, elle utilise cet objectif mis en cache, sinon elle démarre la tâche qui affectera l'objectif, et envoie à le client qui a fait la demande HTTP une réponse qui indique qu'il doit attendre un résultat.
Les noms de variables et les hyperliens ci-dessus concernent Celery 4.x. Dans 3.x les variables et les liens hypertextes correspondants sont:
CELERY_TRACK_STARTED
,CELERY_IGNORE_RESULT
,CELERY_TASK_RESULT_EXPIRES
.la source
Chaque
Task
objet a une.request
propriété, qui le contientAsyncRequest
objet. En conséquence, la ligne suivante donne l'état d'une tâchetask
:la source
Vous pouvez également créer des états personnalisés et mettre à jour son exécution des tâches de valeur. Cet exemple est tiré de la documentation:
http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states
la source
Ancienne question mais j'ai récemment rencontré ce problème.
Si vous essayez d'obtenir le task_id, vous pouvez le faire comme ceci:
Vous savez maintenant exactement ce qu'est le task_id et pouvez maintenant l'utiliser pour obtenir le AsyncResult:
la source
apply_async
. L'objet renvoyé parapply_async
est unAsyncResult
objet, qui possède l'ID de la tâche générée par Celery.task_id
nécessitent de générer vous -même un identifiant de tâche. Dans votre commentaire, vous avez imaginé une raison qui va au-delà de "comment puis-je vérifier l'état de la tâche" et "Si vous essayez d'obtenir le task_id ..." Super si vous avez ce besoin mais ce n'est pas le cas ici. (D'ailleurs, utiliseruuid()
pour générer un identifiant de tâche ne fait absolument rien au-delà de ce que fait Celery par défaut.)Utilisez simplement cette API de la FAQ sur le céleri
Cela fonctionne très bien.
la source
Réponse de 2020:
la source
Essayer:
task.AsyncResult(task.request.id).state
cela fournira le statut de la tâche céleri. Si la tâche Celery est déjà en état d' échec , elle lancera une exception:
raised unexpected: KeyError('exc_type',)
la source
pour des tâches simples, nous pouvons utiliser http://flower.readthedocs.io/en/latest/screenshots.html et http://policystat.github.io/jobtastic/ pour effectuer la surveillance.
et pour les tâches compliquées, disons une tâche qui traite de beaucoup d'autres modules. Nous vous recommandons d'enregistrer manuellement la progression et le message sur l'unité de tâche spécifique.
la source
J'ai trouvé des informations utiles dans le
Guide des travailleurs du projet céleri inspecteurs-travailleurs
Pour mon cas, je vérifie si Celery fonctionne.
Vous pouvez jouer avec inspect pour obtenir vos besoins.
la source
vi my_celery_apps / app1.py
vi tâches / task1.py
la source
En dehors de l'approche programmatique ci-dessus L'utilisation du statut de la tâche de fleur peut être facilement vue.
Surveillance en temps réel à l'aide des événements Celery. Flower est un outil Web permettant de surveiller et d'administrer les grappes de céleri.
Document officiel: Fleur - Outil de surveillance du céleri
Installation:
Usage:
la source
la source