Annuler une tâche déjà en cours d'exécution avec Celery?

91

J'ai lu le document et recherché mais je n'arrive pas à trouver une réponse claire:

Pouvez-vous annuler une tâche déjà en cours d'exécution? (comme dans la tâche a commencé, prend un certain temps, et à mi-chemin, elle doit être annulée)

J'ai trouvé cela dans le doc sur Celery FAQ

>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()

Mais je ne sais pas si cela annulera les tâches en file d'attente ou si cela tuera un processus en cours sur un travailleur. Merci pour toute lumière que vous pouvez jeter!

dcoffey3296
la source

Réponses:

179

révoke annule l'exécution de la tâche. Si une tâche est révoquée, les travailleurs ignorent la tâche et ne l'exécutent pas. Si vous n'utilisez pas de révocation persistante, votre tâche peut être exécutée après le redémarrage du travailleur.

http://docs.celleryproject.org/en/latest/userguide/workers.html#worker-persistent-revokes

revoke a une option de terminaison qui est False par défaut. Si vous devez arrêter la tâche en cours d'exécution, vous devez définir terminate sur True .

>>> from celery.task.control import revoke
>>> revoke(task_id, terminate=True)

http://docs.celleryproject.org/en/latest/userguide/workers.html#revoke-revoking-tasks

mher
la source
3
C'est exactement l'explication que je cherchais, merci!
dcoffey3296
1
Cela fonctionne-t-il dans un environnement distribué? Je veux dire si j'ai des travailleurs sur plusieurs machines qui exécutent des tâches. Le céleri garde-t-il une trace de la machine sur laquelle la tâche s'exécute?
ksrini
1
Cela fait. La communication avec les travailleurs se fait via le courtier.
Mher
4
result.revoke (terminate = True) devrait faire la même chose que revoke (task_id, terminate = True)
CamHart
9
De plus, l'utilisation de l'option de terminaison est "un dernier recours pour les administrateurs", selon les récents documents Celery. Vous courez le risque de mettre fin à une autre tâche qui a récemment démarré sur ce travailleur.
kouk
38

Dans Celery 3.1, l' API de révocation des tâches est modifiée.

Selon la FAQ sur le céleri , vous devez utiliser result.revoke:

>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()

ou si vous n'avez que l'identifiant de la tâche:

>>> from proj.celery import app
>>> app.control.revoke(task_id)
Rockallite
la source
25

La réponse de @ 0x00mh est correcte, mais des documents récents sur le céleri disent que l'utilisation de l' terminateoption est " un dernier recours pour les administrateurs " car vous pouvez accidentellement mettre fin à une autre tâche qui a commencé à s'exécuter entre-temps. Une meilleure solution est peut-être la combinaison terminate=Trueavec signal='SIGUSR1'(ce qui provoque le déclenchement de l'exception SoftTimeLimitExceeded dans la tâche).

kouk
la source
2
Cette solution a très bien fonctionné pour moi. Quand SoftTimeLimitExceededest déclenché dans ma tâche, ma logique de nettoyage personnalisée (implémentée via try/ except/ finally) est appelée. C'est beaucoup mieux, à mon avis, que ce que AbortableTaskpropose ( docs.celleryproject.org/en/latest/reference / ... ). Avec ce dernier, vous avez besoin d'un backend de résultats de base de données et vous devez vérifier manuellement et à plusieurs reprises l'état d'une tâche en cours pour voir si elle a été abandonnée.
David Schneider
2
Comment est-ce mieux, pour autant que je sache si une autre tâche est reprise par le processus, elle va être arrêtée de toute façon, une exception différente sera lancée.
marxin
Si j'utilise worker_prefetch_multiplier = 1depuis que j'ai juste quelques tâches de longue durée, la terminaison devrait être bien - puisqu'aucune autre tâche ne sera effectuée par la fin - ai-je obtenu cela correctement? @spicyramen
maffe
1

Consultez les options suivantes pour les tâches: time_limit , soft_time_limit (ou vous pouvez le définir pour les travailleurs). Si vous souhaitez contrôler non seulement l'heure d'exécution, consultez l' argument expires de la méthode apply_async.

Simplylizz
la source