"Fire and forget" python async / wait

115

Parfois, il y a une opération asynchrone non critique qui doit se produire, mais je ne veux pas attendre qu'elle se termine. Dans l'implémentation coroutine de Tornado, vous pouvez "déclencher et oublier" une fonction asynchrone en ôtant simplement le yieldmot-clé.

J'ai essayé de comprendre comment "tirer et oublier" avec la nouvelle syntaxe async/ awaitpubliée dans Python 3.5. Par exemple, un extrait de code simplifié:

async def async_foo():
    print("Do some stuff asynchronously here...")

def bar():
    async_foo()  # fire and forget "async_foo()"

bar()

Ce qui se passe cependant, c'est que cela bar()ne s'exécute jamais et à la place, nous obtenons un avertissement d'exécution:

RuntimeWarning: coroutine 'async_foo' was never awaited
  async_foo()  # fire and forget "async_foo()"
Mike N
la source
En relation? stackoverflow.com/q/32808893/1639625 En fait, je pense que c'est un doublon, mais je ne veux pas le dupliquer instantanément. Quelqu'un peut-il confirmer?
tobias_k
3
@tobias_k, je ne pense pas que ce soit en double. La réponse au lien est trop large pour être une réponse à cette question.
Mikhail Gerasimov
2
Est-ce que (1) votre processus "principal" continue de fonctionner indéfiniment? Ou (2) voulez-vous permettre à votre processus de mourir mais permettre aux tâches oubliées de continuer leur travail? Ou (3) préférez-vous que votre processus principal attende les tâches oubliées juste avant la fin?
Julien Palard

Réponses:

170

Actualiser:

Remplacez asyncio.ensure_futurepar asyncio.create_taskpartout si vous utilisez Python> = 3.7 C'est un moyen plus récent et plus agréable de générer une tâche .


asyncio.Tâche de "tirer et oublier"

Selon la documentation de python, asyncio.Taskil est possible de démarrer une coroutine à exécuter "en arrière-plan" . La tâche créée par asyncio.ensure_future function ne bloquera pas l'exécution (donc la fonction retournera immédiatement!). Cela ressemble à un moyen de «tirer et d'oublier» comme vous l'avez demandé.

import asyncio


async def async_foo():
    print("async_foo started")
    await asyncio.sleep(1)
    print("async_foo done")


async def main():
    asyncio.ensure_future(async_foo())  # fire and forget async_foo()

    # btw, you can also create tasks inside non-async funcs

    print('Do some actions 1')
    await asyncio.sleep(1)
    print('Do some actions 2')
    await asyncio.sleep(1)
    print('Do some actions 3')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Production:

Do some actions 1
async_foo started
Do some actions 2
async_foo done
Do some actions 3

Que faire si les tâches s'exécutent après la fin de la boucle d'événements?

Notez qu'asyncio s'attend à ce que la tâche soit terminée au moment où la boucle d'événements est terminée. Donc, si vous changez main()pour:

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(0.1)
    print('Do some actions 2')

Vous recevrez cet avertissement une fois le programme terminé:

Task was destroyed but it is pending!
task: <Task pending coro=<async_foo() running at [...]

Pour éviter cela, vous pouvez simplement attendre toutes les tâches en attente une fois la boucle d'événements terminée:

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(0.1)
    print('Do some actions 2')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    # Let's also finish all running tasks:
    pending = asyncio.Task.all_tasks()
    loop.run_until_complete(asyncio.gather(*pending))

Tuez les tâches au lieu de les attendre

Parfois, vous ne voulez pas attendre que les tâches soient terminées (par exemple, certaines tâches peuvent être créées pour s'exécuter indéfiniment). Dans ce cas, vous pouvez simplement les annuler () au lieu de les attendre:

import asyncio
from contextlib import suppress


async def echo_forever():
    while True:
        print("echo")
        await asyncio.sleep(1)


async def main():
    asyncio.ensure_future(echo_forever())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(1)
    print('Do some actions 2')
    await asyncio.sleep(1)
    print('Do some actions 3')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    # Let's also cancel all running tasks:
    pending = asyncio.Task.all_tasks()
    for task in pending:
        task.cancel()
        # Now we should await task to execute it's cancellation.
        # Cancelled task raises asyncio.CancelledError that we can suppress:
        with suppress(asyncio.CancelledError):
            loop.run_until_complete(task)

Production:

Do some actions 1
echo
Do some actions 2
echo
Do some actions 3
echo
Mikhail Gerasimov
la source
J'ai copié et passé le premier bloc et je l'ai simplement exécuté de mon côté et pour une raison quelconque, j'ai obtenu: ligne 4 async def async_foo (): ^ Comme s'il y avait une erreur de syntaxe avec la définition de fonction à la ligne 4: "async def async_foo ( ): "Est-ce que je manque quelque chose?
Gil Allen
3
@GilAllen cette syntaxe ne fonctionne que dans Python 3.5+. Python 3.4 nécessite une ancienne syntaxe (voir docs.python.org/3.4/library/asyncio-task.html ). Python 3.3 et les versions antérieures ne prennent pas du tout en charge asyncio.
Mikhail Gerasimov
Comment tueriez-vous les tâches d'un thread?… ̣J'ai un thread qui crée des tâches et je veux tuer toutes les tâches en attente lorsque le thread meurt dans sa stop()méthode.
Sardathrion - contre les abus SE
@Sardathrion Je ne sais pas si la tâche pointe quelque part sur le fil dans lequel elle a été créée, mais rien ne vous empêche de les suivre manuellement: par exemple, ajoutez simplement toutes les tâches créées dans le fil à une liste et le moment venu, annulez-les de manière expliquée au dessus.
Mikhail Gerasimov
2
Notez que "Task.all_tasks () est obsolète depuis Python 3.7, utilisez plutôt asyncio.all_tasks ()"
Alexis
12

Merci Sergey pour la réponse succincte. Voici la version décorée de la même chose.

import asyncio
import time

def fire_and_forget(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, *kwargs)

    return wrapped

@fire_and_forget
def foo():
    time.sleep(1)
    print("foo() completed")

print("Hello")
foo()
print("I didn't wait for foo()")

Produit

>>> Hello
>>> foo() started
>>> I didn't wait for foo()
>>> foo() completed

Remarque: Vérifiez mon autre réponse qui fait la même chose en utilisant des fils simples.

nehem
la source
J'ai connu un ralentissement substantiel après avoir utilisé cette approche en créant ~ 5 petites tâches d'incendie et d'oubli par seconde. Ne l'utilisez pas en production pour une tâche de longue durée. Cela va manger votre CPU et votre mémoire!
pir
10

Ce n'est pas une exécution entièrement asynchrone, mais peut-être que run_in_executor () vous convient.

def fire_and_forget(task, *args, **kwargs):
    loop = asyncio.get_event_loop()
    if callable(task):
        return loop.run_in_executor(None, task, *args, **kwargs)
    else:    
        raise TypeError('Task must be a callable')

def foo():
    #asynchronous stuff here


fire_and_forget(foo)
Sergey Gornostaev
la source
3
Belle réponse concise. Il est à noter que la executorvolonté par défaut d'appeler concurrent.futures.ThreadPoolExecutor.submit(). Je mentionne parce que la création de fils n'est pas gratuite; le feu et l'oubli 1000 fois par seconde mettra probablement une grosse pression sur la gestion des threads
Brad Solomon
Oui. Je n'ai pas tenu compte de votre avertissement et j'ai connu un ralentissement substantiel après avoir utilisé cette approche, créant ~ 5 petites tâches d'incendie et d'oubli par seconde. Ne l'utilisez pas en production pour une tâche de longue durée. Cela va manger votre CPU et votre mémoire!
pir
3

Pour une raison quelconque, si vous ne pouvez pas utiliser, asynciovoici l'implémentation utilisant des threads simples. Vérifiez mes autres réponses et la réponse de Sergey aussi.

import threading

def fire_and_forget(f):
    def wrapped():
        threading.Thread(target=f).start()

    return wrapped

@fire_and_forget
def foo():
    time.sleep(1)
    print("foo() completed")

print("Hello")
foo()
print("I didn't wait for foo()")
nehem
la source
Si nous n'avons besoin que de cette fonctionnalité fire_and_forget et rien d'autre d'Asyncio, serait-il toujours préférable d'utiliser asyncio? Quels sont les avantages?
pir