Système d'événements en Python

195

Quel système d'événements pour Python utilisez-vous? Je suis déjà au courant de pydispatcher , mais je me demandais ce qui peut être trouvé ou est couramment utilisé?

Je ne suis pas intéressé par les gestionnaires d'événements qui font partie de grands cadres, je préfère utiliser une petite solution simple que je peux facilement étendre.

Josip
la source

Réponses:

178

Forfaits PyPI

En date de juin 2020, ce sont les packages liés à l'événement disponibles sur PyPI, classés par date de sortie la plus récente.

Il y a plus

C'est beaucoup de bibliothèques à choisir, en utilisant une terminologie très différente (événements, signaux, gestionnaires, répartition des méthodes, hooks, ...).

J'essaie de garder un aperçu des packages ci-dessus, ainsi que des techniques mentionnées dans les réponses ici.

Tout d'abord, une terminologie ...

Modèle d'observateur

Le style le plus élémentaire du système d'événements est le «sac de méthodes de gestion», qui est une implémentation simple du modèle Observer .

Fondamentalement, les méthodes de gestionnaire (callables) sont stockées dans un tableau et sont chacune appelées lorsque l'événement se déclenche.

Publier-s'abonner

L'inconvénient des systèmes d'événements Observer est que vous ne pouvez enregistrer les gestionnaires que sur l'objet Event réel (ou la liste des gestionnaires). Donc, au moment de l'inscription, l'événement doit déjà exister.

C'est pourquoi le deuxième style de systèmes d'événements existe: le modèle de publication-abonnement . Ici, les gestionnaires ne s'inscrivent pas sur un objet événement (ou une liste de gestionnaires), mais sur un répartiteur central. De plus, les notifiants ne parlent qu'au répartiteur. Ce à écouter ou à publier est déterminé par le «signal», qui n'est rien de plus qu'un nom (chaîne).

Motif médiateur

Peut-être aussi intéressant: le modèle Mediator .

Crochets

Un système «hook» est généralement utilisé dans le contexte des plugins d'application. L'application contient des points d'intégration fixes (hooks), et chaque plug-in peut se connecter à ce hook et effectuer certaines actions.

Autres événements'

Remarque: threading.Event n'est pas un «système d'événements» dans le sens ci-dessus. C'est un système de synchronisation de threads où un thread attend qu'un autre thread «signale» l'objet Event.

Les bibliothèques de messagerie réseau utilisent souvent aussi le terme «événements»; parfois ceux-ci sont similaires dans leur concept; parfois non. Ils peuvent bien sûr franchir les limites des threads, des processus et des ordinateurs. Voir par exemple pyzmq , pymq , Twisted , Tornado , gevent , eventlet .

Références faibles

En Python, détenir une référence à une méthode ou un objet garantit qu'il ne sera pas supprimé par le garbage collector. Cela peut être souhaitable, mais cela peut également entraîner des fuites de mémoire: les gestionnaires liés ne sont jamais nettoyés.

Certains systèmes d'événements utilisent des références faibles au lieu de références régulières pour résoudre ce problème.

Quelques mots sur les différentes bibliothèques

Systèmes d'événements de type observateur:

  • zope.event montre les os nus de comment cela fonctionne (voir la réponse de Lennart ). Remarque: cet exemple ne prend même pas en charge les arguments du gestionnaire.
  • « Liste » appelable de LongPoke mise en œuvre de montre qu'un tel système d'événements peut être mis en œuvre très minimalistically par le sous - classement list.
  • La variation EventHook de Felk garantit également les signatures des appelants et des appelants.
  • Le EventHook de spassig (le modèle d'événement de Michael Foord) est une implémentation simple.
  • La classe d'événement Valued Lessons de Josip est fondamentalement la même, mais utilise un setau lieu d'un listpour stocker le sac, et met en œuvre __call__des ajouts raisonnables.
  • PyNotify est similaire dans son concept et fournit également des concepts supplémentaires de variables et de conditions («événement changé de variable»). La page d'accueil n'est pas fonctionnelle.
  • axel est essentiellement un sac de-gestionnaires avec plus de fonctionnalités liées à filetage, la gestion des erreurs, ...
  • python-dispatch nécessite la dérivation des classes sources paires pydispatch.Dispatcher.
  • buslane est basé sur une classe, prend en charge un ou plusieurs gestionnaires et facilite des conseils de type étendus.
  • L' observateur / événement de Pithikos est un modèle léger.

Bibliothèques de publication-abonnement:

  • blinker a quelques fonctionnalités astucieuses telles que la déconnexion automatique et le filtrage basé sur l'expéditeur.
  • PyPubSub est un package stable, et promet "des fonctionnalités avancées qui facilitent le débogage et la maintenance des rubriques et des messages".
  • pymitter est un port Python de Node.js EventEmitter2 et propose des espaces de noms, des caractères génériques et TTL.
  • PyDispatcher semble mettre l'accent sur la flexibilité en ce qui concerne la publication plusieurs-à-plusieurs, etc. Prend en charge les références faibles.
  • louie est un PyDispatcher retravaillé et devrait fonctionner "dans une grande variété de contextes".
  • pypydispatcher est basé sur (vous l'avez deviné ...) PyDispatcher et fonctionne également dans PyPy.
  • django.dispatch est un PyDispatcher réécrit "avec une interface plus limitée, mais des performances supérieures".
  • pyeventdispatcher est basé sur le répartiteur d'événements du framework Symfony de PHP.
  • dispatcher a été extrait de django.dispatch mais devient assez ancien.
  • EventManger de Cristian Garcia est une implémentation très courte.

Autres:

  • pluggy contient un système de hook utilisé par les pytestplugins.
  • RxPy3 implémente le modèle observable et permet de fusionner des événements, de réessayer, etc.
  • Les signaux et emplacements de Qt sont disponibles sur PyQt ou PySide2 . Ils fonctionnent comme rappel lorsqu'ils sont utilisés dans le même thread, ou comme événements (à l'aide d'une boucle d'événements) entre deux threads différents. Les signaux et les emplacements ont la limitation de ne fonctionner que dans les objets des classes qui en dérivent QObject.
florisla
la source
2
Il y a aussi louie, qui est basé sur PyDispatcher: pypi.python.org/pypi/Louie/1.1
the979kid
@ the979kid louie semble mal entretenu, la page pypi renvoie aux 404 sur GitHub: 11craft.github.io/louie ; github.com/gldnspud/louie . Devrait être github.com/11craft/louie .
florisla
1
les auditeurs d'événements faibles sont un besoin commun. Sinon, l'utilisation dans le monde réel devient difficile. Une note que les solutions prennent en charge qui peuvent être utiles.
kxr
Pypubsub 4 est plusieurs à plusieurs, et dispose de puissants outils de débogage pour les messages, et de plusieurs façons de limiter les charges utiles des messages afin que vous sachiez plus tôt quand vous avez envoyé des données non valides ou des données manquantes. PyPubSub 4 prend en charge Python 3 (et PyPubSub 3.x prend en charge Python 2).
Oliver
J'ai récemment publié une bibliothèque appelée pymq github.com/thrau/pymq qui peut être un bon choix pour cette liste.
jeudi
99

Je l'ai fait de cette façon:

class Event(list):
    """Event subscription.

    A list of callable objects. Calling an instance of this will cause a
    call to each item in the list in ascending order by index.

    Example Usage:
    >>> def f(x):
    ...     print 'f(%s)' % x
    >>> def g(x):
    ...     print 'g(%s)' % x
    >>> e = Event()
    >>> e()
    >>> e.append(f)
    >>> e(123)
    f(123)
    >>> e.remove(f)
    >>> e()
    >>> e += (f, g)
    >>> e(10)
    f(10)
    g(10)
    >>> del e[0]
    >>> e(2)
    g(2)

    """
    def __call__(self, *args, **kwargs):
        for f in self:
            f(*args, **kwargs)

    def __repr__(self):
        return "Event(%s)" % list.__repr__(self)

Cependant, comme avec tout ce que j'ai vu, il n'y a pas de pydoc généré automatiquement pour cela, et pas de signatures, ce qui est vraiment nul.

L̲̳o̲̳̳n̲̳̳g̲̳̳p̲̳o̲̳̳k̲̳̳e̲̳̳
la source
3
Je trouve ce style plutôt intrigant. C'est doucement aux os nus. J'aime le fait qu'il permet de manipuler les événements et leurs abonnés comme des opérations autonomes. Je vais voir comment ça se passe dans un vrai projet.
Rudy Lattae
2
Très beau style minimaliste! super!
akaRem
2
Je ne peux pas voter assez pour cela, c'est vraiment simple et facile.
2
grande faveur, quelqu'un pourrait-il expliquer cela comme j'avais 10 ans? Cette classe est-elle héritée par la classe principale? Je ne vois pas d' init, donc super () ne serait pas utilisé. Ce n'est pas en cliquant pour moi pour une raison quelconque.
omgimdrunk
1
@omgimdrunk Un simple gestionnaire d'événements déclencherait une ou plusieurs fonctions appelables chaque fois qu'un événement se déclenche. Une classe pour "gérer" cela pour vous nécessiterait au minimum les méthodes suivantes - ajouter et tirer. Dans cette classe, vous devez conserver une liste de gestionnaires à exécuter. Mettons cela dans la variable d'instance _bag_of_handlersqui est une liste. La méthode add de la classe serait simplement self._bag_of_handlers.append(some_callable). La méthode fire de la classe ferait une boucle à travers `_bag_of_handlers` en passant les arguments et kwargs fournis aux gestionnaires et les exécuterait l'un après l'autre.
Gabe Spradlin
68

Nous utilisons un EventHook comme suggéré par Michael Foord dans son modèle d'événement :

Ajoutez simplement des EventHooks à vos cours avec:

class MyBroadcaster()
    def __init__():
        self.onChange = EventHook()

theBroadcaster = MyBroadcaster()

# add a listener to the event
theBroadcaster.onChange += myFunction

# remove listener from the event
theBroadcaster.onChange -= myFunction

# fire event
theBroadcaster.onChange.fire()

Nous ajoutons la fonctionnalité pour supprimer tous les écouteurs d'un objet dans la classe Michaels et nous nous sommes retrouvés avec ceci:

class EventHook(object):

    def __init__(self):
        self.__handlers = []

    def __iadd__(self, handler):
        self.__handlers.append(handler)
        return self

    def __isub__(self, handler):
        self.__handlers.remove(handler)
        return self

    def fire(self, *args, **keywargs):
        for handler in self.__handlers:
            handler(*args, **keywargs)

    def clearObjectHandlers(self, inObject):
        for theHandler in self.__handlers:
            if theHandler.im_self == inObject:
                self -= theHandler
spassig
la source
Un inconvénient de cette utilisation est que vous devez d'abord ajouter un événement avant de vous inscrire en tant qu'abonné. Si seuls les éditeurs ajoutent leurs événements (pas un must, juste une bonne pratique), alors vous devez initialiser les éditeurs avant les abonnés, ce qui est pénible dans les grands projets
Jonathan
6
la dernière méthode est buggée car les gestionnaires self .__ sont modifiés au cours des itérations. Correction: `self .__ handlers = [h for h in self .__ handlers if h.im_self! = Obj]`
Simon Bergot
1
@Simon a raison, mais introduit un bogue car nous pouvons avoir des fonctions non liées dans les gestionnaires self .__. Corrigé:self.__handlers = [h for h in self._handlers if getattr(h, 'im_self', False) != obj]
Eric Marcos
20

J'utilise zope.event . Ce sont les os les plus nus que vous puissiez imaginer. :-) En fait, voici le code source complet:

subscribers = []

def notify(event):
    for subscriber in subscribers:
        subscriber(event)

Notez que vous ne pouvez pas envoyer de messages entre les processus, par exemple. Ce n'est pas un système de messagerie, juste un système d'événements, rien de plus, rien de moins.

Lennart Regebro
la source
17
pypi.python.org/pypi/zope.event ... pour sauver le pauvre Google de la bande passante ;-)
Boldewyn
J'aimerais toujours pouvoir envoyer des messages. J'utiliserais le système d'événements dans une application basée sur Tkinter. Je n'utilise pas son système d'événements car il ne prend pas en charge les messages.
Josip
Vous pouvez envoyer tout ce que vous voulez avec zope.event. Mais mon point est que ce n'est pas un système de messagerie approprié, car vous ne pouvez pas envoyer d'événements / messages à d'autres processus ou à d'autres ordinateurs. Vous devriez probablement être un mais plus spécifique avec vos exigences.
Lennart Regebro le
15

J'ai trouvé ce petit script sur Valued Lessons . Il semble avoir juste le bon rapport simplicité / puissance que je recherche. Peter Thatcher est l'auteur du code suivant (aucune licence n'est mentionnée).

class Event:
    def __init__(self):
        self.handlers = set()

    def handle(self, handler):
        self.handlers.add(handler)
        return self

    def unhandle(self, handler):
        try:
            self.handlers.remove(handler)
        except:
            raise ValueError("Handler is not handling this event, so cannot unhandle it.")
        return self

    def fire(self, *args, **kargs):
        for handler in self.handlers:
            handler(*args, **kargs)

    def getHandlerCount(self):
        return len(self.handlers)

    __iadd__ = handle
    __isub__ = unhandle
    __call__ = fire
    __len__  = getHandlerCount

class MockFileWatcher:
    def __init__(self):
        self.fileChanged = Event()

    def watchFiles(self):
        source_path = "foo"
        self.fileChanged(source_path)

def log_file_change(source_path):
    print "%r changed." % (source_path,)

def log_file_change2(source_path):
    print "%r changed!" % (source_path,)

watcher              = MockFileWatcher()
watcher.fileChanged += log_file_change2
watcher.fileChanged += log_file_change
watcher.fileChanged -= log_file_change2
watcher.watchFiles()
Josip
la source
1
L'utilisation d'un set () au lieu d'une liste est agréable pour éviter que les gestionnaires ne soient enregistrés deux fois. L'une des conséquences est que les gestionnaires ne sont pas appelés dans l'ordre dans lequel ils ont été enregistrés. Ce n'est pas nécessairement une mauvaise chose ...
florisla
1
@florisla pourrait échanger pour OrderedSet, si on le souhaitait.
Robino
9

Voici un design minimal qui devrait bien fonctionner. Ce que vous devez faire, c'est simplement hériter Observerd'une classe et ensuite l'utiliser observe(event_name, callback_fn)pour écouter un événement spécifique. Chaque fois que cet événement spécifique est déclenché n'importe où dans le code (c'est-à-dire. Event('USB connected')), Le rappel correspondant se déclenche.

class Observer():
    _observers = []
    def __init__(self):
        self._observers.append(self)
        self._observed_events = []
    def observe(self, event_name, callback_fn):
        self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn})


class Event():
    def __init__(self, event_name, *callback_args):
        for observer in Observer._observers:
            for observable in observer._observed_events:
                if observable['event_name'] == event_name:
                    observable['callback_fn'](*callback_args)

Exemple:

class Room(Observer):
    def __init__(self):
        print("Room is ready.")
        Observer.__init__(self) # DON'T FORGET THIS
    def someone_arrived(self, who):
        print(who + " has arrived!")

# Observe for specific event
room = Room()
room.observe('someone arrived',  room.someone_arrived)

# Fire some events
Event('someone left',    'John')
Event('someone arrived', 'Lenard') # will output "Lenard has arrived!"
Event('someone Farted',  'Lenard')
Pithikos
la source
J'aime votre design, il est minimaliste et facile à comprendre. et ce serait léger en n'ayant pas à importer certains modules.
Atreyagaurav
8

J'ai créé une EventManagerclasse (code à la fin). La syntaxe est la suivante:

#Create an event with no listeners assigned to it
EventManager.addEvent( eventName = [] )

#Create an event with listeners assigned to it
EventManager.addEvent( eventName = [fun1, fun2,...] )

#Create any number event with listeners assigned to them
EventManager.addEvent( eventName1 = [e1fun1, e1fun2,...], eventName2 = [e2fun1, e2fun2,...], ... )

#Add or remove listener to an existing event
EventManager.eventName += extra_fun
EventManager.eventName -= removed_fun

#Delete an event
del EventManager.eventName

#Fire the event
EventManager.eventName()

Voici un exemple:

def hello(name):
    print "Hello {}".format(name)
    
def greetings(name):
    print "Greetings {}".format(name)

EventManager.addEvent( salute = [greetings] )
EventManager.salute += hello

print "\nInitial salute"
EventManager.salute('Oscar')

print "\nNow remove greetings"
EventManager.salute -= greetings
EventManager.salute('Oscar')

Production:


Salut initial Salutations Oscar
Bonjour Oscar

Maintenant, supprimez les salutations
Bonjour Oscar

Code EventManger:

class EventManager:
    
    class Event:
        def __init__(self,functions):
            if type(functions) is not list:
                raise ValueError("functions parameter has to be a list")
            self.functions = functions
            
        def __iadd__(self,func):
            self.functions.append(func)
            return self
            
        def __isub__(self,func):
            self.functions.remove(func)
            return self
            
        def __call__(self,*args,**kvargs):
            for func in self.functions : func(*args,**kvargs)
            
    @classmethod
    def addEvent(cls,**kvargs):
        """
        addEvent( event1 = [f1,f2,...], event2 = [g1,g2,...], ... )
        creates events using **kvargs to create any number of events. Each event recieves a list of functions,
        where every function in the list recieves the same parameters.
        
        Example:
        
        def hello(): print "Hello ",
        def world(): print "World"
        
        EventManager.addEvent( salute = [hello] )
        EventManager.salute += world
        
        EventManager.salute()
        
        Output:
        Hello World
        """
        for key in kvargs.keys():
            if type(kvargs[key]) is not list:
                raise ValueError("value has to be a list")
            else:
                kvargs[key] = cls.Event(kvargs[key])
        
        cls.__dict__.update(kvargs)
Cristian Garcia
la source
8

Vous pouvez jeter un œil au pymitter ( pypi ). C'est une petite approche à fichier unique (~ 250 loc) "fournissant des espaces de noms, des caractères génériques et TTL".

Voici un exemple de base:

from pymitter import EventEmitter

ee = EventEmitter()

# decorator usage
@ee.on("myevent")
def handler1(arg):
   print "handler1 called with", arg

# callback usage
def handler2(arg):
    print "handler2 called with", arg
ee.on("myotherevent", handler2)

# emit
ee.emit("myevent", "foo")
# -> "handler1 called with foo"

ee.emit("myotherevent", "bar")
# -> "handler2 called with bar"
Dalailirium
la source
6

J'ai fait une variation de l'approche minimaliste de Longpoke qui garantit également les signatures des appelants et des appelants:

class EventHook(object):
    '''
    A simple implementation of the Observer-Pattern.
    The user can specify an event signature upon inizializazion,
    defined by kwargs in the form of argumentname=class (e.g. id=int).
    The arguments' types are not checked in this implementation though.
    Callables with a fitting signature can be added with += or removed with -=.
    All listeners can be notified by calling the EventHook class with fitting
    arguments.

    >>> event = EventHook(id=int, data=dict)
    >>> event += lambda id, data: print("%d %s" % (id, data))
    >>> event(id=5, data={"foo": "bar"})
    5 {'foo': 'bar'}

    >>> event = EventHook(id=int)
    >>> event += lambda wrong_name: None
    Traceback (most recent call last):
        ...
    ValueError: Listener must have these arguments: (id=int)

    >>> event = EventHook(id=int)
    >>> event += lambda id: None
    >>> event(wrong_name=0)
    Traceback (most recent call last):
        ...
    ValueError: This EventHook must be called with these arguments: (id=int)
    '''
    def __init__(self, **signature):
        self._signature = signature
        self._argnames = set(signature.keys())
        self._handlers = []

    def _kwargs_str(self):
        return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())

    def __iadd__(self, handler):
        params = inspect.signature(handler).parameters
        valid = True
        argnames = set(n for n in params.keys())
        if argnames != self._argnames:
            valid = False
        for p in params.values():
            if p.kind == p.VAR_KEYWORD:
                valid = True
                break
            if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
                valid = False
                break
        if not valid:
            raise ValueError("Listener must have these arguments: (%s)"
                             % self._kwargs_str())
        self._handlers.append(handler)
        return self

    def __isub__(self, handler):
        self._handlers.remove(handler)
        return self

    def __call__(self, *args, **kwargs):
        if args or set(kwargs.keys()) != self._argnames:
            raise ValueError("This EventHook must be called with these " +
                             "keyword arguments: (%s)" % self._kwargs_str())
        for handler in self._handlers[:]:
            handler(**kwargs)

    def __repr__(self):
        return "EventHook(%s)" % self._kwargs_str()
Felk
la source
3

Si je fais du code dans pyQt j'utilise le paradigme des sockets / signaux QT, c'est la même chose pour Django

Si je fais des E / S asynchrones, j'utilise le module de sélection natif

Si j'utilise un analyseur python SAX, j'utilise l'API d'événements fournie par SAX. Il semble donc que je suis victime de l'API sous-jacente :-)

Vous devriez peut-être vous demander ce que vous attendez du cadre / module d'événement. Ma préférence personnelle est d'utiliser le paradigme Socket / Signal de QT. plus d'informations à ce sujet peuvent être trouvées ici

SashaN
la source
2

Voici un autre module à considérer. Cela semble un choix viable pour les applications plus exigeantes.

Py-notify est un package Python fournissant des outils pour implémenter le modèle de programmation Observer. Ces outils incluent des signaux, des conditions et des variables.

Les signaux sont des listes de gestionnaires qui sont appelés lorsque le signal est émis. Les conditions sont essentiellement des variables booléennes couplées à un signal qui est émis lorsque l'état des conditions change. Ils peuvent être combinés à l'aide d'opérateurs logiques standard (non, et, etc.) dans des conditions composées. Contrairement aux conditions, les variables peuvent contenir n'importe quel objet Python, pas seulement des booléens, mais elles ne peuvent pas être combinées.

Josip
la source
1
La page d'accueil est hors service pour celui-ci, peut-être plus supportée?
David Parks
1

Si vous vouliez faire des choses plus compliquées comme fusionner des événements ou réessayer, vous pouvez utiliser le modèle Observable et une bibliothèque mature qui implémente cela. https://github.com/ReactiveX/RxPY . Les observables sont très courants en Javascript et Java et très pratiques à utiliser pour certaines tâches asynchrones.

from rx import Observable, Observer


def push_five_strings(observer):
        observer.on_next("Alpha")
        observer.on_next("Beta")
        observer.on_next("Gamma")
        observer.on_next("Delta")
        observer.on_next("Epsilon")
        observer.on_completed()


class PrintObserver(Observer):

    def on_next(self, value):
        print("Received {0}".format(value))

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error Occurred: {0}".format(error))

source = Observable.create(push_five_strings)

source.subscribe(PrintObserver())

SORTIE :

Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!
David Dehghan
la source
1

Si vous avez besoin d'un bus d'événements qui fonctionne au-delà des frontières du processus ou du réseau, vous pouvez essayer PyMQ . Il prend actuellement en charge pub / sub, les files d'attente de messages et le RPC synchrone. La version par défaut fonctionne au-dessus d'un backend Redis, vous avez donc besoin d'un serveur Redis en cours d'exécution. Il existe également un backend en mémoire pour les tests. Vous pouvez également écrire votre propre backend.

import pymq

# common code
class MyEvent:
    pass

# subscribe code
@pymq.subscriber
def on_event(event: MyEvent):
    print('event received')

# publisher code
pymq.publish(MyEvent())

# you can also customize channels
pymq.subscribe(on_event, channel='my_channel')
pymq.publish(MyEvent(), channel='my_channel')

Pour initialiser le système:

from pymq.provider.redis import RedisConfig

# starts a new thread with a Redis event loop
pymq.init(RedisConfig())

# main application control loop

pymq.shutdown()

Avertissement: je suis l'auteur de cette bibliothèque

thrau
la source
0

Vous pouvez essayer le buslanemodule.

Cette bibliothèque facilite la mise en œuvre d'un système basé sur les messages. Il prend en charge les commandes (gestionnaire unique) et les événements (0 ou gestionnaires multiples). Buslane utilise des annotations de type Python pour enregistrer correctement le gestionnaire.

Exemple simple:

from dataclasses import dataclass

from buslane.commands import Command, CommandHandler, CommandBus


@dataclass(frozen=True)
class RegisterUserCommand(Command):
    email: str
    password: str


class RegisterUserCommandHandler(CommandHandler[RegisterUserCommand]):

    def handle(self, command: RegisterUserCommand) -> None:
        assert command == RegisterUserCommand(
            email='[email protected]',
            password='secret',
        )


command_bus = CommandBus()
command_bus.register(handler=RegisterUserCommandHandler())
command_bus.execute(command=RegisterUserCommand(
    email='[email protected]',
    password='secret',
))

Pour installer buslane, utilisez simplement pip:

$ pip install buslane
Konrad Hałas
la source
0

Il y a quelque temps, j'ai écrit une bibliothèque qui pourrait vous être utile. Il vous permet d'avoir des écouteurs locaux et globaux, plusieurs façons différentes de les enregistrer, la priorité d'exécution, etc.

from pyeventdispatcher import register

register("foo.bar", lambda event: print("second"))
register("foo.bar", lambda event: print("first "), -100)

dispatch(Event("foo.bar", {"id": 1}))
# first second

Jetez un œil pyeventdispatcher

Daniel Ancuta
la source