Comment effectuez-vous un test unitaire d'une tâche Celery?

Réponses:

61

Il est possible de tester des tâches de manière synchrone en utilisant n'importe quelle bibliothèque unittest disponible. Je fais normalement 2 sessions de test différentes lorsque je travaille avec des tâches de céleri. Le premier (comme je le suggère ci-dessous) est complètement synchrone et devrait être celui qui garantit que l'algorithme fait ce qu'il doit faire. La deuxième session utilise tout le système (y compris le courtier) et s'assure que je n'ai pas de problèmes de sérialisation ou de tout autre problème de distribution ou de communication.

Alors:

from celery import Celery

celery = Celery()

@celery.task
def add(x, y):
    return x + y

Et votre test:

from nose.tools import eq_

def test_add_task():
    rst = add.apply(args=(4, 4)).get()
    eq_(rst, 8)

J'espère que cela pourra aider!

FlaPer87
la source
1
Cela fonctionne sauf pour les tâches qui utilisent une HttpDispatchTask - docs.celleryproject.org/en/latest/userguide/remote-tasks.html où je dois définir celery.conf.CELERY_ALWAYS_EAGER = True mais même en définissant également celery.conf.CELERY_IMPORTS = ('celery.task.http') le test échoue avec NotRegistered: celery.task.http.HttpDispatchTask
davidmytton
Bizarre, êtes-vous sûr de ne pas avoir de problèmes d'importation? Ce test fonctionne (notez que je simule la réponse pour qu'elle renvoie ce que le céleri attend). De plus, les modules définis dans CELERY_IMPORTS seront importés lors de l' initialisation des workers , afin d'éviter cela, je vous suggère d'appeler celery.loader.import_default_modules().
FlaPer87
Je vous suggère également de jeter un oeil ici . Il se moque de la requête http. Je ne sais pas si cela aide, je suppose que vous voulez tester un service qui est opérationnel, n'est-ce pas?
FlaPer87
52

J'utilise ceci:

with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
    ...

Documents: http://docs.celleryproject.org/en/3.1/configuration.html#celery-always-eager

CELERY_ALWAYS_EAGER vous permet d'exécuter votre tâche de manière synchrone, et vous n'avez pas besoin d'un serveur de céleri.

guettli
la source
1
Je pense que c'est dépassé - je reçois ImportError: No module named celeryconfig.
Daenyth
7
Je crois que ci-dessus suppose que le module celeryconfig.pyexiste dans son package. Voir docs.celleryproject.org/en/latest/getting-started/… .
Kamil Sindi
1
Je sais que c'est vieux, mais pouvez-vous fournir un exemple complet pour savoir comment lancer des tâches à addpartir de la question d'OP au sein d'une TestCaseclasse?
Kruupös
@ MaxChrétien désolé, je ne peux pas donner un exemple complet, puisque je n'utilise plus de céleri. Vous pouvez modifier ma question si vous avez suffisamment de points de réputation. Si vous n'en avez pas assez, faites-moi savoir ce que je dois copier + coller dans cette réponse.
guettli
1
@ miken32 merci. Comme la réponse la plus récente aborde d'une manière ou d'une autre le problème avec lequel je voulais aider, je viens de laisser un commentaire selon lequel la documentation officielle pour 4.0 décourage l'utilisation de CELERY_TASK_ALWAYS_EAGERpour les tests unitaires.
krassowski
33

Cela dépend exactement de ce que vous voulez tester.

  • Testez directement le code de la tâche. N'appelez pas "task.delay (...)", appelez simplement "task (...)" à partir de vos tests unitaires.
  • Utilisez CELERY_ALWAYS_EAGER . Cela entraînera l'appel immédiat de vos tâches au moment où vous dites "task.delay (...)", vous pouvez donc tester le chemin complet (mais pas tout comportement asynchrone).
slacy
la source
24

Test de l'unité

import unittest

from myproject.myapp import celeryapp

class TestMyCeleryWorker(unittest.TestCase):

  def setUp(self):
      celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)

Appareils py.test

# conftest.py
from myproject.myapp import celeryapp

@pytest.fixture(scope='module')
def celery_app(request):
    celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
    return celeryapp

# test_tasks.py
def test_some_task(celery_app):
    ...

Addendum: rendre le respect de send_task impatient

from celery import current_app

def send_task(name, args=(), kwargs={}, **opts):
    # https://github.com/celery/celery/issues/581
    task = current_app.tasks[name]
    return task.apply(args, kwargs, **opts)

current_app.send_task = send_task
Kamil Sindi
la source
22

Pour ceux sur Celery 4 c'est:

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)

Étant donné que les noms des paramètres ont été modifiés et doivent être mis à jour si vous choisissez de mettre à niveau, consultez

https://docs.celleryproject.org/en/latest/history/whatsnew-4.0.html?highlight=what%20is%20new#lowercase-setting-names

okrutny
la source
11
Selon la documentation officielle , l'utilisation de "task_always_eager" (anciennement "CELERY_ALWAYS_EAGER") ne convient pas pour les tests unitaires. Au lieu de cela, ils proposent d'autres excellents moyens de tester l'unité de votre application Celery.
krassowski
4
J'ajouterai simplement que la raison pour laquelle vous ne voulez pas de tâches fastidieuses dans vos tests unitaires est que vous ne testez pas, par exemple, la sérialisation des paramètres qui se produira une fois que vous utiliserez le code en production.
damd
15

Depuis Celery 3.0 , une façon de définir CELERY_ALWAYS_EAGERdans Django est:

from django.test import TestCase, override_settings

from .foo import foo_celery_task

class MyTest(TestCase):

    @override_settings(CELERY_ALWAYS_EAGER=True)
    def test_foo(self):
        self.assertTrue(foo_celery_task.delay())
Aaron Lelevier
la source
7

Depuis Celery v4.0 , les appareils py.test sont fournis pour démarrer un ouvrier céleri juste pour le test et sont arrêtés une fois terminé:

def test_myfunc_is_executed(celery_session_worker):
    # celery_session_worker: <Worker: [email protected] (running)>
    assert myfunc.delay().wait(3)

Parmi les autres appareils décrits sur http://docs.celleryproject.org/en/latest/userguide/testing.html#py-test , vous pouvez modifier les options par défaut du céleri en redéfinissant l' celery_configappareil de cette façon:

@pytest.fixture(scope='session')
def celery_config():
    return {
        'accept_content': ['json', 'pickle'],
        'result_serializer': 'pickle',
    }

Par défaut, le test worker utilise un courtier en mémoire et un backend de résultats. Pas besoin d'utiliser un Redis ou RabbitMQ local si vous ne testez pas des fonctionnalités spécifiques.

alanjds
la source
Cher électeur, voudriez-vous expliquer pourquoi est-ce une mauvaise réponse? Sincèrement merci.
alanjds
2
Cela n'a pas fonctionné pour moi, la suite de tests se bloque juste. Pourriez-vous fournir un peu plus de contexte? (Je n'ai pas encore voté cependant;)).
duality_
Dans mon cas, j'ai dû définir explicitement le luminaire celey_config pour utiliser le courtier de mémoire et le backend cache + mémoire
sanzoghenzo
5

référence utilisant pytest.

def test_add(celery_worker):
    mytask.delay()

si vous utilisez flask, définissez la configuration de l'application

    CELERY_BROKER_URL = 'memory://'
    CELERY_RESULT_BACKEND = 'cache+memory://'

et en conftest.py

@pytest.fixture
def app():
    yield app   # Your actual Flask application

@pytest.fixture
def celery_app(app):
    from celery.contrib.testing import tasks   # need it
    yield celery_app    # Your actual Flask-Celery application
Yoge
la source
2

Dans mon cas (et je suppose que beaucoup d'autres), tout ce que je voulais, c'était tester la logique interne d'une tâche en utilisant pytest.

TL, DR; a fini par se moquer de tout ( OPTION 2 )


Exemple de cas d'utilisation :

proj/tasks.py

@shared_task(bind=True)
def add_task(self, a, b):
    return a+b;

tests/test_tasks.py

from proj import add_task

def test_add():
    assert add_task(1, 2) == 3, '1 + 2 should equal 3'

mais, puisque le shared_taskdécorateur fait beaucoup de logique interne au céleri, ce n'est pas vraiment un test unitaire.

Donc, pour moi, il y avait 2 options:

OPTION 1: Logique interne séparée

proj/tasks_logic.py

def internal_add(a, b):
    return a + b;

proj/tasks.py

from .tasks_logic import internal_add

@shared_task(bind=True)
def add_task(self, a, b):
    return internal_add(a, b);

Cela semble très étrange, et à part le rendre moins lisible, cela nécessite d'extraire et de transmettre manuellement les attributs qui font partie de la requête, par exemple task_idau cas où vous en auriez besoin, ce qui rend la logique moins pure.

OPTION 2: se
moque de se moquer des internes de céleri

tests/__init__.py

# noinspection PyUnresolvedReferences
from celery import shared_task

from mock import patch


def mock_signature(**kwargs):
    return {}


def mocked_shared_task(*decorator_args, **decorator_kwargs):
    def mocked_shared_decorator(func):
        func.signature = func.si = func.s = mock_signature
        return func

    return mocked_shared_decorator

patch('celery.shared_task', mocked_shared_task).start()

ce qui me permet ensuite de me moquer de l'objet de la requête (encore une fois, au cas où vous auriez besoin d'éléments de la requête, comme l'id ou le compteur de tentatives.

tests/test_tasks.py

from proj import add_task

class MockedRequest:
    def __init__(self, id=None):
        self.id = id or 1


class MockedTask:
    def __init__(self, id=None):
        self.request = MockedRequest(id=id)


def test_add():
    mocked_task = MockedTask(id=3)
    assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'

Cette solution est beaucoup plus manuelle, mais elle me donne le contrôle dont j'ai besoin pour effectuer un test unitaire , sans me répéter, et sans perdre la lunette céleri.

Daniel Dubovski
la source