Comment fonctionne réellement asyncio?

120

Cette question est motivée par mon autre question: comment attendre en cdef?

Il y a des tonnes d'articles et de billets de blog sur le Web asyncio, mais ils sont tous très superficiels. Je n'ai trouvé aucune information sur la manière dont asyncioest réellement implémentée et sur ce qui rend les E / S asynchrones. J'essayais de lire le code source, mais ce sont des milliers de lignes de code C qui n'est pas de la plus haute qualité, dont beaucoup traitent des objets auxiliaires, mais surtout, il est difficile de se connecter entre la syntaxe Python et le code C qu'elle traduirait dans.

La propre documentation d'Asycnio est encore moins utile. Il n'y a aucune information sur son fonctionnement, seulement quelques directives sur la façon de l'utiliser, qui sont aussi parfois trompeuses / très mal rédigées.

Je connais l'implémentation des coroutines par Go et j'espérais en quelque sorte que Python fasse la même chose. Si tel était le cas, le code que je suis venu dans le post lié ci-dessus aurait fonctionné. Puisque ce n'est pas le cas, j'essaie maintenant de comprendre pourquoi. Ma meilleure estimation à ce jour est la suivante, veuillez me corriger là où je me trompe:

  1. Les définitions de procédure du formulaire async def foo(): ...sont en fait interprétées comme des méthodes d'héritage de classe coroutine.
  2. Peut-être, async defest en fait divisé en plusieurs méthodes par des awaitinstructions, où l'objet, sur lequel ces méthodes sont appelées, est capable de suivre la progression de l'exécution jusqu'à présent.
  3. Si ce qui précède est vrai, alors, essentiellement, l'exécution d'une coroutine se résume à appeler des méthodes d'objet coroutine par un gestionnaire global (boucle?).
  4. Le gestionnaire global est en quelque sorte (comment?) Conscient du moment où les opérations d'E / S sont effectuées par du code Python (uniquement?) Et est capable de choisir l'une des méthodes de coroutine en attente à exécuter après que la méthode d'exécution actuelle a abandonné le contrôle (appuyez sur l' awaitinstruction ).

En d'autres termes, voici ma tentative de "désuétiser" une asynciosyntaxe en quelque chose de plus compréhensible:

async def coro(name):
    print('before', name)
    await asyncio.sleep()
    print('after', name)

asyncio.gather(coro('first'), coro('second'))

# translated from async def coro(name)
class Coro(coroutine):
    def before(self, name):
        print('before', name)

    def after(self, name):
        print('after', name)

    def __init__(self, name):
        self.name = name
        self.parts = self.before, self.after
        self.pos = 0

    def __call__():
        self.parts[self.pos](self.name)
        self.pos += 1

    def done(self):
        return self.pos == len(self.parts)


# translated from asyncio.gather()
class AsyncIOManager:

    def gather(*coros):
        while not every(c.done() for c in coros):
            coro = random.choice(coros)
            coro()

Si ma supposition est correcte: alors j'ai un problème. Comment les E / S se produisent-elles réellement dans ce scénario? Dans un fil séparé? L'interprète entier est-il suspendu et les E / S se produisent en dehors de l'interprète? Qu'entend-on exactement par E / S? Si ma procédure python a appelé procédure C open(), et qu'elle a à son tour envoyé une interruption au noyau, lui abandonnant le contrôle, comment l'interpréteur Python sait-il à ce sujet et est-il capable de continuer à exécuter un autre code, tandis que le code du noyau effectue les E / S réelles et jusqu'à ce que il réveille la procédure Python qui a envoyé l'interruption à l'origine? Comment l'interpréteur Python peut-il en principe être conscient de ce qui se passe?

wvxvw
la source
2
La plupart de la logique est gérée par l'implémentation de la boucle d'événements. Regardez comment le CPython BaseEventLoopest implémenté: github.com/python/cpython/blob/…
Blender
@Blender ok, je pense que j'ai enfin trouvé ce que je voulais, mais maintenant je ne comprends pas la raison pour laquelle le code a été écrit tel quel. Pourquoi _run_once, qui est en fait la seule fonction utile de tout ce module est-il rendu "privé"? La mise en œuvre est horrible, mais c'est moins un problème. Pourquoi la seule fonction que vous voudriez appeler sur une boucle d'événement est marquée comme "ne m'appelle pas"?
wvxvw
C'est une question pour la liste de diffusion. Quel cas d'utilisation vous obligerait-il à toucher _run_onceen premier lieu?
Blender
8
Cela ne répond pas vraiment à ma question, cependant. Comment résoudriez-vous un problème utile en utilisant juste _run_once? asyncioest complexe et a ses défauts, mais veuillez garder la discussion civile. Ne blâmez pas les développeurs derrière un code que vous ne comprenez pas vous-même.
Blender
1
@ user8371915 Si vous pensez qu'il y a quelque chose que je n'ai pas couvert, vous pouvez ajouter ou commenter ma réponse.
Bharel

Réponses:

203

Comment fonctionne asyncio?

Avant de répondre à cette question, nous devons comprendre quelques termes de base, sautez-les si vous en connaissez déjà un.

Générateurs

Les générateurs sont des objets qui nous permettent de suspendre l'exécution d'une fonction python. Les générateurs sélectionnés par l'utilisateur sont implémentés à l'aide du mot-clé yield. En créant une fonction normale contenant le yieldmot - clé, nous transformons cette fonction en générateur:

>>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

Comme vous pouvez le voir, l'appel next()au générateur amène l'interpréteur à charger la trame du test et à renvoyer la yieldvaleur ed. Un next()nouvel appel provoque le chargement de la trame dans la pile d'interpréteur et continue yieldune autre valeur.

Au troisième next()appel, notre générateur était terminé et a StopIterationété lancé.

Communiquer avec un générateur

Une caractéristique moins connue des générateurs est le fait que vous pouvez communiquer avec eux en utilisant deux méthodes: send()et throw().

>>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in test
Exception

Lors de l'appel gen.send(), la valeur est transmise comme valeur de retour du yieldmot - clé.

gen.throw()d'autre part, permet de lancer des exceptions à l'intérieur des générateurs, à l'exception soulevée au même endroit a yieldété appelée.

Renvoyer les valeurs des générateurs

Le renvoi d'une valeur à partir d'un générateur entraîne la mise de la valeur dans l' StopIterationexception. Nous pouvons plus tard récupérer la valeur de l'exception et l'utiliser selon nos besoins.

>>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc

Voici, un nouveau mot-clé: yield from

Python 3.4 est venu avec l'ajout d'un nouveau mot - clé: yield from. Qu'est - ce que ce mot - clé nous permet de faire, est de passer sur tout next(), send()et throw()dans un générateur interne le plus imbriqué. Si le générateur interne renvoie une valeur, c'est aussi la valeur de retour de yield from:

>>> def inner():
...     inner_result = yield 2
...     print('inner', inner_result)
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print('outer', val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4

J'ai écrit un article pour approfondir ce sujet.

Mettre tous ensemble

Lors de l'introduction du nouveau mot-clé yield fromdans Python 3.4, nous étions maintenant en mesure de créer des générateurs à l'intérieur de générateurs qui, tout comme un tunnel, transmettent les données dans les deux sens des générateurs les plus internes aux générateurs les plus externes. Cela a engendré une nouvelle signification pour les générateurs - les coroutines .

Les coroutines sont des fonctions qui peuvent être arrêtées et reprises pendant leur exécution. En Python, ils sont définis à l'aide du async defmot - clé. Tout comme les générateurs, ils utilisent aussi leur propre forme yield fromdont est await. Avant asyncet awaitavons été introduits dans Python 3.5, nous avons créé des coroutines exactement de la même manière que les générateurs étaient créés (avec yield fromau lieu de await).

async def inner():
    return 1

async def outer():
    await inner()

Comme tous les itérateurs ou générateurs qui implémentent la __iter__()méthode, les coroutines implémentent __await__()ce qui leur permet de continuer à chaque await coroappel.

Il y a un joli diagramme de séquence dans la documentation Python que vous devriez consulter.

Dans asyncio, outre les fonctions coroutines, nous avons 2 objets importants: les tâches et les futurs .

Futures

Les futurs sont des objets dont la __await__()méthode est implémentée, et leur travail consiste à conserver un certain état et un certain résultat. L'état peut être l'un des suivants:

  1. PENDING - future n'a aucun résultat ou exception défini.
  2. ANNULÉ - le futur a été annulé avec fut.cancel()
  3. FINISHED - le futur était terminé, soit par un jeu de résultats utilisant fut.set_result()ou par un jeu d'exceptions utilisantfut.set_exception()

Le résultat, comme vous l'avez deviné, peut être soit un objet Python, qui sera retourné, soit une exception qui peut être déclenchée.

Une autre caractéristique importante des futureobjets est qu'ils contiennent une méthode appelée add_done_callback(). Cette méthode permet d'appeler des fonctions dès que la tâche est terminée - qu'elle ait déclenché une exception ou qu'elle soit terminée.

Tâches

Les objets de tâche sont des futurs spéciaux, qui s'enroulent autour des coroutines et communiquent avec les coroutines les plus internes et les plus externes. Chaque fois qu'une coroutine est awaitun futur, le futur est renvoyé à la tâche (comme dans yield from), et la tâche le reçoit.

Ensuite, la tâche se lie à l'avenir. Il le fait en invoquant add_done_callback()l'avenir. À partir de maintenant, si le futur sera jamais fait, soit en étant annulé, en passant une exception ou en passant un objet Python en conséquence, le rappel de la tâche sera appelé et il reviendra à l'existence.

Asyncio

La dernière question brûlante à laquelle nous devons répondre est la suivante: comment l'OI est-il mis en œuvre?

Au plus profond d'Asyncio, nous avons une boucle d'événements. Une boucle d'événements de tâches. Le travail de la boucle d'événements est d'appeler des tâches chaque fois qu'elles sont prêtes et de coordonner tous ces efforts dans une seule machine de travail.

La partie IO de la boucle d'événements repose sur une seule fonction cruciale appelée select. Select est une fonction de blocage, implémentée par le système d'exploitation en dessous, qui permet d'attendre sur les sockets des données entrantes ou sortantes. Lors de la réception des données, il se réveille et renvoie les sockets qui ont reçu des données, ou les sockets qui sont prêtes pour l'écriture.

Lorsque vous essayez de recevoir ou d'envoyer des données sur un socket via asyncio, ce qui se passe en fait ci-dessous est que le socket est d'abord vérifié s'il contient des données pouvant être immédiatement lues ou envoyées. Si son .send()tampon est plein ou si le .recv()tampon est vide, le socket est enregistré dans la selectfonction (en l'ajoutant simplement à l'une des listes, rlistpour recvet wlistpour send) et la fonction appropriée est awaitun futureobjet nouvellement créé , lié à ce socket.

Lorsque toutes les tâches disponibles attendent des futurs, l'événement appelle selectet attend. Lorsque l'une des sockets a des données entrantes ou que sa sendmémoire tampon est épuisée, asyncio vérifie le futur objet lié à cette socket et le définit sur done.

Maintenant, toute la magie opère. L'avenir est prêt à être terminé, la tâche qui s'est ajoutée avant avec add_done_callback()remonte à la vie, et appelle .send()la coroutine qui reprend la coroutine la plus interne (à cause de la awaitchaîne) et vous lisez les données nouvellement reçues à partir d'un tampon proche. a été renversé.

Chaîne de méthodes à nouveau, en cas de recv():

  1. select.select attend.
  2. Un socket prêt, avec des données est renvoyé.
  3. Les données du socket sont déplacées dans un tampon.
  4. future.set_result() est appelé.
  5. La tâche qui s'est ajoutée avec add_done_callback()est maintenant réveillée.
  6. La tâche appelle .send()la coroutine qui va jusqu'à la coroutine la plus interne et la réveille.
  7. Les données sont lues à partir du tampon et renvoyées à notre humble utilisateur.

En résumé, asyncio utilise des capacités de générateur, qui permettent de mettre en pause et de reprendre les fonctions. Il utilise des yield fromcapacités qui permettent de passer des données dans les deux sens du générateur le plus interne au générateur le plus externe. Il utilise tout cela pour arrêter l'exécution de la fonction pendant qu'il attend la fin des E / S (en utilisant la selectfonction OS ).

Et le meilleur de tous? Pendant qu'une fonction est en pause, une autre peut s'exécuter et s'entrelacer avec le tissu délicat, qui est asyncio.

Bharel
la source
12
S'il y a plus d'explications nécessaires, n'hésitez pas à commenter. Btw, je ne suis pas tout à fait sûr si j'aurais dû écrire cela comme un article de blog ou une réponse dans stackoverflow. La question est longue à répondre.
Bharel
1
Sur un socket asynchrone, toute tentative d'envoi ou de réception de données vérifie d'abord la mémoire tampon du système d'exploitation. Si vous essayez de recevoir et qu'il n'y a pas de données dans le tampon, la fonction de réception sous-jacente renverra une valeur d'erreur qui se propagera en tant qu'exception dans Python. Idem avec l'envoi et un tampon plein. Lorsque l'exception est déclenchée, Python envoie à son tour ces sockets à la fonction de sélection qui suspend le processus. Mais ce n'est pas ainsi que fonctionne asyncio, c'est le fonctionnement de la sélection et des sockets qui est également très spécifique au système d'exploitation.
Bharel
2
@ user8371915 Toujours là pour vous aider :-) Gardez à l'esprit que pour comprendre Asyncio, vous devez savoir comment les générateurs, la communication et le yield fromfonctionnement des générateurs . J'ai cependant noté en haut qu'il est désactivable au cas où le lecteur le saurait déjà :-) Y a-t-il autre chose que vous pensez que je devrais ajouter?
Bharel
2
Les choses avant la section Asyncio sont peut-être les plus critiques, car elles sont la seule chose que le langage fait réellement par lui-même. Le selectpeut également être qualifié, car c'est ainsi que les appels système d'E / S non bloquants fonctionnent sur le système d'exploitation. Les asyncioconstructions réelles et la boucle d'événements ne sont que du code au niveau de l'application construit à partir de ces éléments.
MisterMiyagi
3
Cet article contient des informations sur l'épine dorsale des E / S asynchrones en Python. Merci pour une si gentille explication.
mjkim le
83

Parler async/awaitet ce asyncion'est pas la même chose. La première est une construction fondamentale de bas niveau (coroutines) tandis que la dernière est une bibliothèque utilisant ces constructions. À l'inverse, il n'y a pas de réponse ultime unique.

Ce qui suit est une description générale de la façon async/awaitet le asynciotravail des bibliothèques -comme. Autrement dit, il peut y avoir d'autres astuces en plus (il y en a ...) mais elles sont sans importance à moins que vous ne les construisiez vous-même. La différence devrait être négligeable sauf si vous en savez déjà assez pour ne pas avoir à poser une telle question.

1. Coroutines versus sous-programmes dans une coquille de noix

Juste comme sous-programmes (fonctions, procédures, ...), les coroutines (générateurs, ...) sont une abstraction de pile d'appels et de pointeur d'instruction: il y a une pile de morceaux de code en cours d'exécution, et chacun est à une instruction spécifique.

La distinction entre defcontre async defest simplement pour plus de clarté. La différence réelle est returnpar rapport àyield . À partir de là, awaitou yield fromfaites la différence entre les appels individuels et des piles entières.

1.1. Sous-programmes

Un sous-programme représente un nouveau niveau de pile pour contenir les variables locales, et un seul parcours de ses instructions pour atteindre une fin. Considérez un sous-programme comme celui-ci:

def subfoo(bar):
     qux = 3
     return qux * bar

Lorsque vous l'exécutez, cela signifie

  1. allouer de l'espace de pile pour bar etqux
  2. exécuter récursivement la première instruction et passer à l'instruction suivante
  3. une fois à la return , poussez sa valeur vers la pile appelante
  4. effacez la pile (1.) et le pointeur d'instructions (2.)

Notamment, 4. signifie qu'un sous-programme démarre toujours au même état. Tout ce qui est exclusif à la fonction elle-même est perdu à la fin. Une fonction ne peut pas être reprise, même s'il y a des instructions après return.

root -\
  :    \- subfoo --\
  :/--<---return --/
  |
  V

1.2. Les coroutines comme sous-programmes persistants

Une coroutine est comme un sous-programme, mais peut sortir sans détruire son état. Considérez une coroutine comme celle-ci:

 def cofoo(bar):
      qux = yield bar  # yield marks a break point
      return qux

Lorsque vous l'exécutez, cela signifie

  1. allouer de l'espace de pile pourbar etqux
  2. exécuter récursivement la première instruction et passer à l'instruction suivante
    1. une fois à a yield, poussez sa valeur vers la pile appelante mais stockez la pile et le pointeur d'instruction
    2. une fois l'appel yield, restaurez le pointeur de pile et d'instruction et poussez les arguments versqux
  3. une fois à areturn , poussez sa valeur vers la pile appelante
  4. effacez la pile (1.) et le pointeur d'instructions (2.)

Notez l'ajout de 2.1 et 2.2 - une coroutine peut être suspendue et reprise à des points prédéfinis. Ceci est similaire à la façon dont un sous-programme est suspendu lors de l'appel d'un autre sous-programme. La différence est que la coroutine active n'est pas strictement liée à sa pile appelante. Au lieu de cela, une coroutine suspendue fait partie d'une pile séparée et isolée.

root -\
  :    \- cofoo --\
  :/--<+--yield --/
  |    :
  V    :

Cela signifie que les coroutines suspendues peuvent être librement stockées ou déplacées entre les piles. Toute pile d'appels ayant accès à une coroutine peut décider de la reprendre.

1.3. Traverser la pile d'appels

Jusqu'à présent, notre coroutine descend uniquement dans la pile d'appels avec yield. Un sous-programme peut descendre et remonter la pile d'appels avec returnet (). Pour être complet, les coroutines ont également besoin d'un mécanisme pour remonter la pile d'appels. Considérez une coroutine comme celle-ci:

def wrap():
    yield 'before'
    yield from cofoo()
    yield 'after'

Lorsque vous l'exécutez, cela signifie qu'il alloue toujours le pointeur de pile et d'instruction comme un sous-programme. Quand il se suspend, c'est comme stocker un sous-programme.

Cependant, yield fromfait les deux . Il suspend la pile et le pointeur d'instructions wrap et s'exécute cofoo. Notez que wrapreste suspendu jusqu'à ce qu'il se cofootermine complètement. Chaque fois que cofoosuspend ou quelque chose est envoyé, cofooest directement connecté à la pile appelante.

1.4. Coroutines tout en bas

Comme établi, yield frompermet de connecter deux oscilloscopes sur un autre intermédiaire. Lorsqu'il est appliqué de manière récursive, cela signifie que le haut de la pile peut être connecté au bas de la pile.

root -\
  :    \-> coro_a -yield-from-> coro_b --\
  :/ <-+------------------------yield ---/
  |    :
  :\ --+-- coro_a.send----------yield ---\
  :                             coro_b <-/

Notez cela rootet coro_bne vous connaissez pas. Cela rend les coroutines beaucoup plus propres que les callbacks: les coroutines sont toujours construites sur une relation 1: 1 comme les sous-programmes. Les coroutines suspendent et reprennent l'intégralité de leur pile d'exécution existante jusqu'à un point d'appel normal.

Notamment, rootpourrait avoir un nombre arbitraire de coroutines à reprendre. Pourtant, il ne peut jamais en reprendre plus d'un à la fois. Les coroutines de même racine sont concurrentes mais pas parallèles!

1.5. Python asyncetawait

L'explication a jusqu'à présent utilisé explicitement le vocabulaire yieldet les yield fromgénérateurs - la fonctionnalité sous-jacente est la même. La nouvelle syntaxe Python3.5 asyncet awaitexiste principalement pour plus de clarté.

def foo():  # subroutine?
     return None

def foo():  # coroutine?
     yield from foofoo()  # generator? coroutine?

async def foo():  # coroutine!
     await foofoo()  # coroutine!
     return None

Les instructions async foret async withsont nécessaires car vous briseriez la yield from/awaitchaîne avec les instructions foret nues with.

2. Anatomie d'une simple boucle d'événements

En soi, une coroutine n'a aucun concept de céder le contrôle à une autre coroutine. Il ne peut céder le contrôle qu'à l'appelant au bas d'une pile de coroutine. Cet appelant peut alors passer à une autre coroutine et l'exécuter.

Ce nœud racine de plusieurs coroutines est généralement une boucle d'événement : en cas de suspension, une coroutine génère un événement sur lequel elle veut reprendre. À son tour, la boucle d'événements est capable d'attendre efficacement que ces événements se produisent. Cela lui permet de décider quelle coroutine exécuter ensuite, ou comment attendre avant de reprendre.

Une telle conception implique qu'il existe un ensemble d'événements prédéfinis que la boucle comprend. Plusieurs coroutines await, jusqu'à ce que finalement un événement soit awaitédité. Cet événement peut communiquer directement avec la boucle d'événements yielden contrôlant.

loop -\
  :    \-> coroutine --await--> event --\
  :/ <-+----------------------- yield --/
  |    :
  |    :  # loop waits for event to happen
  |    :
  :\ --+-- send(reply) -------- yield --\
  :        coroutine <--yield-- event <-/

La clé est que la suspension de coroutine permet à la boucle d'événements et aux événements de communiquer directement. La pile de coroutine intermédiaire ne nécessite pas aucune connaissance de la boucle qui l'exécute, ni du fonctionnement des événements.

2.1.1. Événements dans le temps

L'événement le plus simple à gérer atteint un point dans le temps. Il s'agit également d'un bloc fondamental de code threadé: un thread sleeps à plusieurs reprises jusqu'à ce qu'une condition soit vraie. Cependant, un habituésleep exécution bloque par elle-même - nous voulons que les autres coroutines ne soient pas bloquées. Au lieu de cela, nous voulons dire à la boucle d'événements quand elle doit reprendre la pile de coroutine actuelle.

2.1.2. Définition d'un événement

Un événement est simplement une valeur que nous pouvons identifier - que ce soit via une énumération, un type ou une autre identité. Nous pouvons définir cela avec une classe simple qui stocke notre temps cible. En plus de stocker les informations d'événement, nous pouvons autoriser awaitune classe directement.

class AsyncSleep:
    """Event to sleep until a point in time"""
    def __init__(self, until: float):
        self.until = until

    # used whenever someone ``await``s an instance of this Event
    def __await__(self):
        # yield this Event to the loop
        yield self

    def __repr__(self):
        return '%s(until=%.1f)' % (self.__class__.__name__, self.until)

Cette classe ne stocke que l'événement - elle ne dit pas comment le gérer réellement.

La seule particularité est __await__- c'est ce que recherche le awaitmot - clé. En pratique, il s'agit d'un itérateur mais non disponible pour les machines d'itération régulières.

2.2.1. En attente d'un événement

Maintenant que nous avons un événement, comment réagissent les coroutines? Nous devrions pouvoir exprimer l'équivalent de sleeppar awaitnotre événement. Pour mieux voir ce qui se passe, nous attendons deux fois la moitié du temps:

import time

async def asleep(duration: float):
    """await that ``duration`` seconds pass"""
    await AsyncSleep(time.time() + duration / 2)
    await AsyncSleep(time.time() + duration / 2)

Nous pouvons directement instancier et exécuter cette coroutine. Similaire à un générateur, l'utilisation coroutine.sendexécute la coroutine jusqu'à ce qu'elle soit yieldun résultat.

coroutine = asleep(100)
while True:
    print(coroutine.send(None))
    time.sleep(0.1)

Cela nous donne deux AsyncSleepévénements et ensuite un StopIterationlorsque la coroutine est terminée. Notez que le seul retard provient de time.sleepla boucle! Chacun AsyncSleepne stocke qu'un décalage par rapport à l'heure actuelle.

2.2.2. Événement + sommeil

À ce stade, nous avons deux mécanismes distincts à notre disposition:

  • AsyncSleep Événements pouvant être générés depuis l'intérieur d'une coroutine
  • time.sleep qui peut attendre sans impacter les coroutines

Notamment, ces deux sont orthogonaux: ni l'un n'affecte ni ne déclenche l'autre. En conséquence, nous pouvons proposer notre propre stratégie pour sleepfaire face au retard d'un AsyncSleep.

2.3. Une boucle événementielle naïve

Si nous avons plusieurs coroutines, chacune peut nous dire quand elle veut être réveillée. On peut alors attendre que le premier d'entre eux veuille être repris, puis celui d'après, et ainsi de suite. Notamment, à chaque point, nous ne nous soucions que de celui qui est le suivant .

Cela permet une planification simple:

  1. trier les coroutines en fonction de l'heure de réveil souhaitée
  2. choisissez le premier qui veut se réveiller
  3. attendez jusqu'à ce moment
  4. exécuter cette coroutine
  5. répéter à partir de 1.

Une implémentation triviale ne nécessite aucun concept avancé. A listpermet de trier les coroutines par date. L'attente est un habitué time.sleep. L'exécution de coroutines fonctionne comme avant avec coroutine.send.

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    # store wake-up-time and coroutines
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting:
        # 2. pick the first coroutine that wants to wake up
        until, coroutine = waiting.pop(0)
        # 3. wait until this point in time
        time.sleep(max(0.0, until - time.time()))
        # 4. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])

Bien entendu, cela peut être amélioré. Nous pouvons utiliser un tas pour la file d'attente ou une table de répartition pour les événements. Nous pourrions également récupérer les valeurs de retour duStopIteration et les affecter à la coroutine. Cependant, le principe fondamental reste le même.

2.4. Attente coopérative

L' AsyncSleepévénement et la runboucle d'événements sont une implémentation entièrement fonctionnelle des événements chronométrés.

async def sleepy(identifier: str = "coroutine", count=5):
    for i in range(count):
        print(identifier, 'step', i + 1, 'at %.2f' % time.time())
        await asleep(0.1)

run(*(sleepy("coroutine %d" % j) for j in range(5)))

Cela commute en coopération entre chacune des cinq coroutines, en les suspendant chacune pendant 0,1 seconde. Même si la boucle d'événements est synchrone, elle exécute toujours le travail en 0,5 seconde au lieu de 2,5 secondes. Chaque coroutine détient un état et agit indépendamment.

3. Boucle d'événements d'E / S

Une boucle d'événements qui prend en charge sleepconvient à l' interrogation . Cependant, l'attente d'E / S sur un descripteur de fichier peut se faire plus efficacement: le système d'exploitation implémente les E / S et sait donc quels descripteurs sont prêts. Idéalement, une boucle d'événement devrait prendre en charge un événement explicite «prêt pour les E / S».

3.1. L' selectappel

Python a déjà une interface pour interroger le système d'exploitation pour les poignées d'E / S de lecture. Lorsqu'il est appelé avec des poignées pour lire ou écrire, il renvoie les poignées prêtes à lire ou à écrire:

readable, writeable, _ = select.select(rlist, wlist, xlist, timeout)

Par exemple, nous pouvons openun fichier à écrire et attendre qu'il soit prêt:

write_target = open('/tmp/foo')
readable, writeable, _ = select.select([], [write_target], [])

Une fois les retours sélectionnés, writeablecontient notre fichier ouvert.

3.2. Événement d'E / S de base

Comme pour la AsyncSleepdemande, nous devons définir un événement pour les E / S. Avec la selectlogique sous-jacente , l'événement doit faire référence à un objet lisible - disons un openfichier. De plus, nous stockons la quantité de données à lire.

class AsyncRead:
    def __init__(self, file, amount=1):
        self.file = file
        self.amount = amount
        self._buffer = ''

    def __await__(self):
        while len(self._buffer) < self.amount:
            yield self
            # we only get here if ``read`` should not block
            self._buffer += self.file.read(1)
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.file, self.amount, len(self._buffer)
        )

Comme pour la AsyncSleepplupart, nous stockons simplement les données nécessaires à l'appel système sous-jacent. Cette fois, il __await__est capable d'être repris plusieurs fois - jusqu'à ce que notre désir amountait été lu. De plus, nous obtenons returnle résultat d'E / S au lieu de simplement reprendre.

3.3. Augmenter une boucle d'événements avec des E / S de lecture

La base de notre boucle d'événements est toujours celle rundéfinie précédemment. Tout d'abord, nous devons suivre les demandes de lecture. Ce n'est plus un planning trié, nous mappons uniquement les requêtes de lecture aux coroutines.

# new
waiting_read = {}  # type: Dict[file, coroutine]

Comme select.selectprend un paramètre de délai d'attente, nous pouvons l'utiliser à la place de time.sleep.

# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(reads), [], [])

Cela nous donne tous les fichiers lisibles - s'il y en a, nous exécutons la coroutine correspondante. S'il n'y en a pas, nous avons attendu assez longtemps pour que notre coroutine actuelle s'exécute.

# new - reschedule waiting coroutine, run readable coroutine
if readable:
    waiting.append((until, coroutine))
    waiting.sort()
    coroutine = waiting_read[readable[0]]

Enfin, nous devons réellement écouter les demandes de lecture.

# new
if isinstance(command, AsyncSleep):
    ...
elif isinstance(command, AsyncRead):
    ...

3.4. Mettre ensemble

Ce qui précède était un peu une simplification. Nous devons faire quelques changements pour ne pas affamer les coroutines endormies si nous pouvons toujours lire. Nous devons gérer n'avoir rien à lire ou rien à attendre. Cependant, le résultat final s'inscrit toujours dans 30 LOC.

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    waiting_read = {}  # type: Dict[file, coroutine]
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting or waiting_read:
        # 2. wait until the next coroutine may run or read ...
        try:
            until, coroutine = waiting.pop(0)
        except IndexError:
            until, coroutine = float('inf'), None
            readable, _, _ = select.select(list(waiting_read), [], [])
        else:
            readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
        # ... and select the appropriate one
        if readable and time.time() < until:
            if until and coroutine:
                waiting.append((until, coroutine))
                waiting.sort()
            coroutine = waiting_read.pop(readable[0])
        # 3. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension ...
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])
        # ... or register reads
        elif isinstance(command, AsyncRead):
            waiting_read[command.file] = coroutine

3.5. E / S coopératives

Les implémentations AsyncSleep, AsyncReadet runsont désormais entièrement fonctionnelles pour dormir et / ou lire. Comme pour sleepy, nous pouvons définir un assistant pour tester la lecture:

async def ready(path, amount=1024*32):
    print('read', path, 'at', '%d' % time.time())
    with open(path, 'rb') as file:
        result = return await AsyncRead(file, amount)
    print('done', path, 'at', '%d' % time.time())
    print('got', len(result), 'B')

run(sleepy('background', 5), ready('/dev/urandom'))

En exécutant cela, nous pouvons voir que nos E / S sont entrelacées avec la tâche en attente:

id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B

4. E / S non bloquantes

Alors que les E / S sur les fichiers font passer le concept, cela ne convient pas vraiment à une bibliothèque comme asyncio: l' selectappel revient toujours pour les fichiers , et les deux openet readpeuvent bloquer indéfiniment . Cela bloque toutes les coroutines d'une boucle d'événements - ce qui est mauvais. Les bibliothèques comme l' aiofilesutilisation de threads et la synchronisation pour simuler des E / S et des événements non bloquants sur fichier.

Cependant, les sockets permettent des E / S non bloquantes - et leur latence inhérente la rend beaucoup plus critique. Lorsqu'il est utilisé dans une boucle d'événements, l'attente de données et la nouvelle tentative peuvent être encapsulées sans rien bloquer.

4.1. Événement d'E / S non bloquant

Semblable à notre AsyncRead, nous pouvons définir un événement suspend-and-read pour les sockets. Au lieu de prendre un fichier, nous prenons une socket - qui doit être non bloquante. En outre, nos __await__utilisations socket.recvau lieu de file.read.

class AsyncRecv:
    def __init__(self, connection, amount=1, read_buffer=1024):
        assert not connection.getblocking(), 'connection must be non-blocking for async recv'
        self.connection = connection
        self.amount = amount
        self.read_buffer = read_buffer
        self._buffer = b''

    def __await__(self):
        while len(self._buffer) < self.amount:
            try:
                self._buffer += self.connection.recv(self.read_buffer)
            except BlockingIOError:
                yield self
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.connection, self.amount, len(self._buffer)
        )

Contrairement à AsyncRead, __await__effectue des E / S vraiment non bloquantes. Lorsque les données sont disponibles, elles sont toujours lues. Lorsqu'aucune donnée n'est disponible, il est toujours suspendu. Cela signifie que la boucle d'événements n'est bloquée que pendant que nous effectuons un travail utile.

4.2. Débloquer la boucle d'événements

En ce qui concerne la boucle d'événements, rien ne change beaucoup. L'événement à écouter est toujours le même que pour les fichiers - un descripteur de fichier marqué prêt par select.

# old
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
    waiting_read[command.connection] = coroutine

À ce stade, il devrait être évident que AsyncReadet AsyncRecvsont le même genre d'événement. Nous pourrions facilement les refactoriser en un seul événement avec un composant d'E / S échangeable. En effet, la boucle d'événements, les coroutines et les événements séparent proprement un planificateur, un code intermédiaire arbitraire et les E / S réelles.

4.3. Le côté laid des E / S non bloquantes

En principe, ce que vous devez faire à ce stade est de reproduire la logique de readas a recvfor AsyncRecv. Cependant, c'est beaucoup plus laid maintenant - vous devez gérer les retours anticipés lorsque les fonctions se bloquent à l'intérieur du noyau, mais vous cèdent le contrôle. Par exemple, l'ouverture d'une connexion par rapport à l'ouverture d'un fichier est beaucoup plus longue:

# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
    connection.connect((url, port))
except BlockingIOError:
    pass

Pour faire court, il ne reste que quelques dizaines de lignes de gestion des exceptions. Les événements et la boucle d'événements fonctionnent déjà à ce stade.

id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5

Addenda

Exemple de code sur github

MonsieurMiyagi
la source
Utiliser yield selfdans AsyncSleep me donne une Task got back yielderreur, pourquoi? Je vois que le code dans asyncio.Futures l'utilise. L'utilisation d'un rendement nu fonctionne très bien.
Ron Serruya
1
Les boucles d'événements n'attendent généralement que leurs propres événements. Vous ne pouvez généralement pas mélanger des événements et des boucles d'événements entre les bibliothèques; les événements affichés ici ne fonctionnent qu'avec la boucle d'événements affichée. En particulier, asyncio utilise uniquement None (c'est-à-dire un rendement nu) comme signal pour la boucle d'événements. Les événements interagissent directement avec l'objet de la boucle d'événements pour enregistrer les réveils.
MisterMiyagi
12

Votre corodésuétude est conceptuellement correcte, mais légèrement incomplète.

awaitne suspend pas inconditionnellement, mais seulement s'il rencontre un appel bloquant. Comment sait-il qu'un appel est bloqué? Ceci est décidé par le code attendu. Par exemple, une implémentation attendue de socket read pourrait être déconseillée pour:

def read(sock, n):
    # sock must be in non-blocking mode
    try:
        return sock.recv(n)
    except EWOULDBLOCK:
        event_loop.add_reader(sock.fileno, current_task())
        return SUSPEND

En vrai asyncio, le code équivalent modifie l'état de a Futureau lieu de renvoyer des valeurs magiques, mais le concept est le même. Lorsqu'il est adapté de manière appropriée à un objet de type générateur, le code ci-dessus peut être awaitédité.

Côté appelant, lorsque votre coroutine contient:

data = await read(sock, 1024)

Il se désucre en quelque chose de proche de:

data = read(sock, 1024)
if data is SUSPEND:
    return SUSPEND
self.pos += 1
self.parts[self.pos](...)

Les personnes familières avec les générateurs ont tendance à décrire ce qui précède en termes de yield fromqui effectue la suspension automatiquement.

La chaîne de suspension continue jusqu'à la boucle d'événements, qui remarque que la coroutine est suspendue, la supprime de l'ensemble exécutable et continue d'exécuter des coroutines exécutables, le cas échéant. Si aucune coroutine n'est exécutable, la boucle attend select()jusqu'à ce qu'un descripteur de fichier auquel une coroutine s'intéresse devienne prêt pour IO. (La boucle d'événements maintient un mappage descripteur de fichier vers coroutine.)

Dans l'exemple ci-dessus, une fois que select()la boucle d'événements sockest lisible, elle sera rajoutée coroà l'ensemble exécutable, de sorte qu'elle se poursuivra à partir du point de suspension.

En d'autres termes:

  1. Tout se passe dans le même fil par défaut.

  2. La boucle d'événements est chargée de planifier les coroutines et de les réveiller lorsque tout ce qu'ils attendaient (généralement un appel IO qui se bloquerait normalement ou un délai d'expiration) devient prêt.

Pour un aperçu des boucles d'événements de conduite de coroutine, je recommande cette conférence de Dave Beazley, où il montre le codage d'une boucle d'événements à partir de zéro devant un public en direct.

user4815162342
la source
Merci, c'est plus proche de ce que je recherche, mais cela n'explique toujours pas pourquoi async.wait_for()ne fait pas ce qu'il est censé faire ... Pourquoi est-ce un si gros problème d'ajouter un rappel à la boucle d'événements et de le dire pour traiter le nombre de rappels dont il a besoin, y compris celui que vous venez d'ajouter? Ma frustration avec asyncioest en partie due au fait que le concept sous-jacent est très simple et, par exemple, Emacs Lisp a été implémenté pendant des lustres, sans utiliser de mots à la mode ... (c'est create-async-process-à- dire et accept-process-output- et c'est tout ce qu'il faut ... (suite)
wvxvw
10
@wvxvw J'ai fait tout ce que j'ai pu pour répondre à la question que vous avez posée, autant que cela soit même possible étant donné que seul le dernier paragraphe contient six questions. Et donc nous continuons - ce n'est pas que wait_for cela ne fait pas ce qu'il est censé faire (cela fait, c'est une coroutine que vous êtes censé attendre), c'est que vos attentes ne correspondent pas à ce pour quoi le système a été conçu et implémenté. Je pense que votre problème pourrait être associé à asyncio si la boucle d'événements s'exécutait dans un thread séparé, mais je ne connais pas les détails de votre cas d'utilisation et, honnêtement, votre attitude ne rend pas très amusant de vous aider.
user4815162342
5
@wvxvw My frustration with asyncio is in part due to the fact that the underlying concept is very simple, and, for example, Emacs Lisp had implementation for ages, without using buzzwords...- Rien ne vous empêche de mettre en œuvre ce concept simple sans mots à la mode pour le Python alors :) Pourquoi utilisez-vous ce vilain asyncio? Implémentez le vôtre à partir de zéro. Par exemple, vous pouvez commencer par créer votre propre async.wait_for()fonction qui fait exactement ce qu'elle est censée faire.
Mikhail Gerasimov
1
@MikhailGerasimov vous semblez penser que c'est une question rhétorique. Mais j'aimerais dissiper le mystère pour vous. La langue est conçue pour parler aux autres. Je ne peux pas choisir pour les autres la langue qu'ils parlent, même si je pense que la langue qu'ils parlent est des ordures, le mieux que je puisse faire est d'essayer de les convaincre que c'est le cas. En d'autres termes, si j'étais libre de choisir, je ne choisirais jamais Python pour commencer, et encore moins asyncio. Mais, en principe, ce n’est pas ma décision à prendre. Je suis contraint d'utiliser le langage des ordures via en.wikipedia.org/wiki/Ultimatum_game .
wvxvw
4

Tout se résume aux deux principaux défis auxquels asyncio fait face:

  • Comment effectuer plusieurs E / S dans un seul thread?
  • Comment mettre en œuvre le multitâche coopératif?

La réponse au premier point existe depuis longtemps et s'appelle une boucle de sélection . En python, il est implémenté dans le module des sélecteurs .

La deuxième question est liée au concept de coroutine , c'est-à-dire des fonctions qui peuvent arrêter leur exécution et être restaurées ultérieurement. En python, les coroutines sont implémentées à l'aide de générateurs et de l' instruction yield from . C'est ce qui se cache derrière la syntaxe async / await .

Plus de ressources dans cette réponse .


EDIT: Répondre à votre commentaire sur les goroutines:

L'équivalent le plus proche d'un goroutine dans asyncio n'est en fait pas une coroutine mais une tâche (voir la différence dans la documentation ). En python, une coroutine (ou un générateur) ne sait rien des concepts de boucle d'événement ou d'E / S. Il s'agit simplement d'une fonction qui peut arrêter son exécution en utilisant yieldtout en conservant son état actuel, afin qu'elle puisse être restaurée plus tard. La yield fromsyntaxe permet de les chaîner de manière transparente.

Désormais, dans une tâche asyncio, la coroutine tout en bas de la chaîne finit toujours par céder un avenir . Cet avenir bouillonne alors jusqu'à la boucle des événements et s'intègre dans la machinerie interne. Lorsque le futur est défini par un autre rappel interne, la boucle d'événements peut restaurer la tâche en renvoyant le futur dans la chaîne coroutine.


MODIFIER: Répondre à certaines des questions de votre message:

Comment les E / S se produisent-elles réellement dans ce scénario? Dans un fil séparé? L'interprète entier est-il suspendu et les E / S se produisent en dehors de l'interprète?

Non, rien ne se passe dans un fil. Les E / S sont toujours gérées par la boucle d'événements, principalement via des descripteurs de fichiers. Cependant, l'enregistrement de ces descripteurs de fichiers est généralement masqué par des coroutines de haut niveau, ce qui fait le sale boulot pour vous.

Qu'entend-on exactement par E / S? Si ma procédure python appelée procédure C open (), et qu'elle a à son tour envoyé une interruption au noyau, lui abandonnant le contrôle, comment l'interpréteur Python le sait-il et est-il capable de continuer à exécuter un autre code, tandis que le code du noyau fait le I / O et jusqu'à ce qu'il réveille la procédure Python qui a envoyé l'interruption à l'origine? Comment l'interpréteur Python peut-il en principe être conscient de ce qui se passe?

Une E / S est un appel bloquant. Dans asyncio, toutes les opérations d'E / S doivent passer par la boucle d'événements, car comme vous l'avez dit, la boucle d'événements n'a aucun moyen de savoir qu'un appel de blocage est effectué dans un code synchrone. Cela signifie que vous n'êtes pas censé utiliser un synchrone opendans le contexte d'une coroutine. À la place, utilisez une bibliothèque dédiée telle que aiofiles qui fournit une version asynchrone de open.

Vincent
la source
Dire que les coroutines sont implémentées en utilisant yield fromne dit vraiment rien. yield fromest juste une construction syntaxique, ce n'est pas un élément fondamental que les ordinateurs peuvent exécuter. De même, pour la boucle de sélection. Oui, les coroutines dans Go utilisent également la boucle de sélection, mais ce que j'essayais de faire fonctionnerait dans Go, mais pas en Python. J'ai besoin de réponses plus détaillées pour comprendre pourquoi cela n'a pas fonctionné.
wvxvw
Désolé ... non, pas vraiment. «futur», «tâche», «voie transparente», «rendement de» ne sont que des mots à la mode, ce ne sont pas des objets du domaine de la programmation. la programmation a des variables, des procédures et des structures. Donc, dire que «la goroutine est une tâche» n'est qu'une déclaration circulaire qui soulève une question. En fin de compte, une explication de ce que asynciofait, pour moi, se résumerait à un code C qui illustre en quoi la syntaxe Python a été traduite.
wvxvw
Pour expliquer plus en détail pourquoi votre réponse ne répond pas à ma question: avec toutes les informations que vous avez fournies, je n'ai aucune idée de la raison pour laquelle ma tentative à partir du code que j'ai publié dans la question liée n'a pas fonctionné. Je suis absolument certain que je pourrais écrire une boucle d'événements de manière à ce que ce code fonctionne. En fait, ce serait ainsi que j'écrirais une boucle d'événements, si je devais en écrire une.
wvxvw
7
@wvxvw Je ne suis pas d'accord. Ce ne sont pas des «mots à la mode» mais des concepts de haut niveau qui ont été implémentés dans de nombreuses bibliothèques. Par exemple, une tâche asyncio, un greenlet gevent et un goroutine correspondent tous à la même chose: une unité d'exécution qui peut s'exécuter simultanément dans un seul thread. De plus, je ne pense pas que C soit nécessaire pour comprendre asyncio du tout, à moins que vous ne souhaitiez entrer dans le fonctionnement interne des générateurs python.
Vincent le
@wvxvw Voir ma deuxième modification. Cela devrait éliminer quelques idées fausses.
Vincent le