La bibliothèque Akka Streams est déjà livrée avec une abondante documentation . Cependant, le principal problème pour moi est qu'il fournit trop de matériel - je me sens assez dépassé par le nombre de concepts que je dois apprendre. Beaucoup d'exemples montrés ici semblent très lourds et ne peuvent pas être facilement traduits dans des cas d'utilisation réels et sont donc assez ésotériques. Je pense que cela donne trop de détails sans expliquer comment construire tous les blocs de construction ensemble et comment exactement cela aide à résoudre des problèmes spécifiques.
Il y a des sources, des puits, des flux, des étapes de graphique, des graphiques partiels, la matérialisation, un graphique DSL et bien plus encore et je ne sais pas par où commencer. Le guide de démarrage rapide est censé être un point de départ, mais je ne le comprends pas. Il introduit simplement les concepts mentionnés ci-dessus sans les expliquer. De plus, les exemples de code ne peuvent pas être exécutés - il manque des parties qui me rendent plus ou moins impossible de suivre le texte.
Quelqu'un peut-il expliquer les concepts sources, puits, flux, étapes du graphique, graphiques partiels, matérialisation et peut-être d'autres choses que j'ai manquées en termes simples et avec des exemples simples qui n'expliquent pas chaque détail (et qui ne sont probablement pas nécessaires de toute façon à le début)?
la source
Réponses:
Cette réponse est basée sur la
akka-stream
version2.4.2
. L'API peut être légèrement différente dans d'autres versions. La dépendance peut être consommée par sbt :D'accord, commençons. L'API d'Akka Streams se compose de trois types principaux. Contrairement aux flux réactifs , ces types sont beaucoup plus puissants et donc plus complexes. On suppose que pour tous les exemples de code, les définitions suivantes existent déjà:
Les
import
instructions sont nécessaires pour les déclarations de type.system
représente le système d'acteurs d'Akka etmaterializer
représente le contexte d'évaluation du flux. Dans notre cas, nous utilisons unActorMaterializer
, ce qui signifie que les flux sont évalués au-dessus des acteurs. Les deux valeurs sont marquées commeimplicit
, ce qui donne au compilateur Scala la possibilité d'injecter ces deux dépendances automatiquement chaque fois qu'elles sont nécessaires. Nous importons égalementsystem.dispatcher
, qui est un contexte d'exécution pourFutures
.Une nouvelle API
Les flux Akka ont ces propriétés clés:
Materializer
.Source
,Sink
etFlow
. Les blocs de construction forment un graphique dont l'évaluation est basée surMaterializer
et doit être déclenchée explicitement.Dans ce qui suit, une introduction plus approfondie sur la façon d'utiliser les trois principaux types sera donnée.
La source
A
Source
est un créateur de données, il sert de source d'entrée au flux. ChacunSource
a un seul canal de sortie et aucun canal d'entrée. Toutes les données transitent par le canal de sortie vers tout ce qui est connecté auSource
.Image prise à partir de boldradius.com .
Un
Source
peut être créé de plusieurs façons:Dans les cas ci-dessus, nous avons alimenté le
Source
avec des données finies, ce qui signifie qu'elles se termineront éventuellement. Il ne faut pas oublier que les flux réactifs sont paresseux et asynchrones par défaut. Cela signifie que l'on doit explicitement demander l'évaluation du flux. Dans Akka Streams, cela peut être fait à travers lesrun*
méthodes. La fonctionrunForeach
ne serait pas différente de laforeach
fonction bien connue - grâce à l'run
ajout, il est explicite que nous demandions une évaluation du flux. Puisque les données finies sont ennuyeuses, nous continuons avec une infinie:Avec la
take
méthode, nous pouvons créer un point d'arrêt artificiel qui nous empêche d'évaluer indéfiniment. Étant donné que la prise en charge des acteurs est intégrée, nous pouvons également facilement alimenter le flux avec des messages envoyés à un acteur:Nous pouvons voir que les
Futures
sont exécutés de manière asynchrone sur différents threads, ce qui explique le résultat. Dans l'exemple ci-dessus, un tampon pour les éléments entrants n'est pas nécessaire et donc avecOverflowStrategy.fail
nous pouvons configurer que le flux doit échouer lors d'un débordement de tampon. Surtout grâce à cette interface d'acteur, nous pouvons alimenter le flux via n'importe quelle source de données. Peu importe si les données sont créées par le même thread, par un autre, par un autre processus ou si elles proviennent d'un système distant via Internet.Évier
A
Sink
est fondamentalement l'opposé de aSource
. C'est le point final d'un flux et consomme donc des données. ASink
a un seul canal d'entrée et aucun canal de sortie.Sinks
sont particulièrement nécessaires lorsque nous voulons spécifier le comportement du collecteur de données de manière réutilisable et sans évaluer le flux. Lesrun*
méthodes déjà connues ne nous permettent pas ces propriétés, il est donc préférable d'utiliser à laSink
place.Image prise à partir de boldradius.com .
Un petit exemple d'un
Sink
en action:La connexion de a
Source
à aSink
peut être effectuée avec lato
méthode. Il retourne un soi-disantRunnableFlow
, qui est comme nous verrons plus tard une forme spéciale de aFlow
- un flux qui peut être exécuté en appelant simplement sarun()
méthode.Image prise à partir de boldradius.com .
Il est bien sûr possible de transmettre toutes les valeurs qui arrivent à un puits à un acteur:
Couler
Les sources de données et les récepteurs sont excellents si vous avez besoin d'une connexion entre les flux Akka et un système existant, mais on ne peut vraiment rien faire avec eux. Les flux sont la dernière pièce manquante dans l'abstraction de base d'Akka Streams. Ils agissent comme un connecteur entre différents flux et peuvent être utilisés pour transformer ses éléments.
Image prise à partir de boldradius.com .
Si un
Flow
est connecté àSource
un nouveau,Source
c'est le résultat. De même, unFlow
connecté à unSink
crée un nouveauSink
. Et unFlow
connecté à la fois à aSource
et à a pourSink
résultat unRunnableFlow
. Par conséquent, ils se situent entre le canal d'entrée et le canal de sortie mais en eux-mêmes ne correspondent pas à l'une des saveurs tant qu'ils ne sont connectés ni à aSource
ni à aSink
.Image prise à partir de boldradius.com .
Afin de mieux comprendre
Flows
, nous allons voir quelques exemples:Via la
via
méthode, nous pouvons connecter unSource
avec unFlow
. Nous devons spécifier le type d'entrée car le compilateur ne peut pas le déduire pour nous. Comme nous pouvons déjà le voir dans cet exemple simple, les fluxinvert
etdouble
sont complètement indépendants de tout producteur et consommateur de données. Ils transforment uniquement les données et les transmettent au canal de sortie. Cela signifie que nous pouvons réutiliser un flux parmi plusieurs flux:s1
ets2
représentent des flux complètement nouveaux - ils ne partagent aucune donnée via leurs blocs de construction.Flux de données illimités
Avant de poursuivre, nous devons d'abord revoir certains des aspects clés des flux réactifs. Un nombre illimité d'éléments peut arriver à tout moment et peut mettre un flux dans différents états. A côté d'un flux exécutable, qui est l'état habituel, un flux peut être arrêté soit par une erreur, soit par un signal qui indique qu'aucune autre donnée n'arrivera. Un flux peut être modélisé de manière graphique en marquant les événements sur une chronologie comme c'est le cas ici:
Image tirée de l'introduction à la programmation réactive que vous manquiez .
Nous avons déjà vu des flux exécutables dans les exemples de la section précédente. Nous obtenons un
RunnableGraph
chaque fois qu'un flux peut réellement être matérialisé, ce qui signifie qu'unSink
est connecté à unSource
. Jusqu'à présent, nous nous sommes toujours matérialisés à la valeurUnit
, ce qui peut être vu dans les types:Pour
Source
etSink
le deuxième paramètre de type etFlow
le troisième paramètre de type désignent la valeur matérialisée. Tout au long de cette réponse, le sens complet de la matérialisation ne sera pas expliqué. Cependant, de plus amples détails sur la matérialisation peuvent être trouvés dans la documentation officielle . Pour l'instant, la seule chose que nous devons savoir, c'est que la valeur matérialisée est ce que nous obtenons lorsque nous exécutons un flux. Comme nous n'étions intéressés que par les effets secondaires jusqu'à présent, nous avons obtenuUnit
la valeur matérialisée. L'exception à cela a été la matérialisation d'un évier, qui a abouti à unFuture
. Cela nous a redonnéFuture
, car cette valeur peut indiquer la fin du flux connecté au récepteur. Jusqu'à présent, les exemples de code précédents étaient agréables pour expliquer le concept, mais ils étaient également ennuyeux car nous ne nous occupions que de flux finis ou de flux infinis très simples. Pour le rendre plus intéressant, dans ce qui suit un flux complet asynchrone et illimité sera expliqué.Exemple ClickStream
Par exemple, nous voulons avoir un flux qui capture les événements de clic. Pour le rendre plus difficile, disons que nous voulons également regrouper les événements de clic qui se produisent peu de temps après l'autre. De cette façon, nous pourrions facilement découvrir des clics doubles, triples ou décuplés. De plus, nous voulons filtrer tous les clics simples. Respirez profondément et imaginez comment vous pourriez résoudre ce problème de manière impérative. Je parie que personne ne pourrait implémenter une solution qui fonctionne correctement au premier essai. De manière réactive, ce problème est trivial à résoudre. En fait, la solution est si simple et directe à implémenter que nous pouvons même l'exprimer dans un diagramme qui décrit directement le comportement du code:
Image tirée de l'introduction à la programmation réactive que vous manquiez .
Les cases grises sont des fonctions qui décrivent comment un flux est transformé en un autre. Avec la
throttle
fonction nous accumulons des clics dans les 250 millisecondes, les fonctionsmap
etfilter
devraient être explicites. Les orbes de couleur représentent un événement et les flèches décrivent comment elles circulent dans nos fonctions. Plus tard dans les étapes de traitement, nous obtenons de moins en moins d'éléments qui traversent notre flux, car nous les regroupons et les filtrons. Le code de cette image ressemblerait à ceci:Toute la logique peut être représentée en seulement quatre lignes de code! En Scala, nous pourrions l'écrire encore plus court:
La définition de
clickStream
est un peu plus complexe mais ce n'est que le cas car l'exemple de programme s'exécute sur la JVM, où la capture des événements de clic n'est pas facilement possible. Une autre complication est que Akka par défaut ne fournit pas lathrottle
fonction. Au lieu de cela, nous avons dû l'écrire par nous-mêmes. Étant donné que cette fonction est (comme c'est le cas pour les fonctionsmap
oufilter
) réutilisable dans différents cas d'utilisation, je ne compte pas ces lignes pour le nombre de lignes dont nous avions besoin pour implémenter la logique. Dans les langages impératifs cependant, il est normal que la logique ne puisse pas être réutilisée aussi facilement et que les différentes étapes logiques se produisent toutes au même endroit au lieu d'être appliquées séquentiellement, ce qui signifie que nous aurions probablement déformé notre code avec la logique de limitation. L'exemple de code complet est disponible en tant queessentiel et ne sera plus abordé ici.Exemple SimpleWebServer
Ce qui devrait être discuté à la place est un autre exemple. Bien que le flux de clics soit un bel exemple pour laisser Akka Streams gérer un exemple du monde réel, il n'a pas le pouvoir de montrer l'exécution parallèle en action. L'exemple suivant doit représenter un petit serveur Web qui peut gérer plusieurs demandes en parallèle. Le serveur Web doit pouvoir accepter les connexions entrantes et en recevoir des séquences d'octets qui représentent des signes ASCII imprimables. Ces séquences d'octets ou chaînes doivent être divisées à tous les caractères de nouvelle ligne en parties plus petites. Après cela, le serveur répondra au client avec chacune des lignes divisées. Alternativement, il pourrait faire autre chose avec les lignes et donner un jeton de réponse spécial, mais nous voulons rester simple dans cet exemple et donc n'introduire aucune fonctionnalité de fantaisie. Rappelles toi, le serveur doit être capable de gérer plusieurs demandes en même temps, ce qui signifie essentiellement qu'aucune demande n'est autorisée à bloquer toute autre demande de l'exécution ultérieure. La résolution de toutes ces exigences peut être difficile de manière impérative - avec Akka Streams cependant, nous ne devrions pas avoir besoin de plus de quelques lignes pour résoudre ces problèmes. Tout d'abord, voyons le serveur lui-même:
Fondamentalement, il n'y a que trois blocs de construction principaux. Le premier doit accepter les connexions entrantes. Le second doit gérer les demandes entrantes et le troisième doit envoyer une réponse. L'implémentation de ces trois blocs de construction est seulement un peu plus compliquée que l'implémentation du flux de clics:
La fonction
mkServer
prend (outre l'adresse et le port du serveur) également un système d'acteur et un matérialiseur comme paramètres implicites. Le flux de contrôle du serveur est représenté parbinding
, qui prend une source de connexions entrantes et les transmet à un récepteur de connexions entrantes. À l'intérieur deconnectionHandler
, qui est notre évier, nous traitons chaque connexion par le fluxserverLogic
, qui sera décrit plus loin.binding
renvoie unFuture
, qui se termine lorsque le serveur a été démarré ou que le démarrage a échoué, ce qui pourrait être le cas lorsque le port est déjà pris par un autre processus. Cependant, le code ne reflète pas complètement le graphique car nous ne pouvons pas voir un bloc de construction qui gère les réponses. La raison en est que la connexion fournit déjà cette logique par elle-même. Il s'agit d'un flux bidirectionnel et pas seulement unidirectionnel comme les flux que nous avons vus dans les exemples précédents. Comme ce fut le cas pour la matérialisation, de tels flux complexes ne seront pas expliqués ici. La documentation officielle contient de nombreux éléments pour couvrir des graphiques de flux plus complexes. Pour l'instant, il suffit de savoir queTcp.IncomingConnection
représente une connexion qui sait comment recevoir des requêtes et comment envoyer des réponses. La partie qui manque encore est laserverLogic
bloc de construction. Cela peut ressembler à ceci:Encore une fois, nous sommes en mesure de diviser la logique en plusieurs blocs de construction simples qui forment tous ensemble le flux de notre programme. Nous voulons d'abord diviser notre séquence d'octets en lignes, ce que nous devons faire chaque fois que nous trouvons un caractère de nouvelle ligne. Après cela, les octets de chaque ligne doivent être convertis en chaîne car travailler avec des octets bruts est fastidieux. Dans l'ensemble, nous pourrions recevoir un flux binaire d'un protocole compliqué, ce qui rendrait le travail avec les données brutes entrantes extrêmement difficile. Une fois que nous avons une chaîne lisible, nous pouvons créer une réponse. Pour des raisons de simplicité, la réponse peut être n'importe quoi dans notre cas. En fin de compte, nous devons reconvertir notre réponse en une séquence d'octets qui peuvent être envoyés sur le câble. Le code de la logique entière peut ressembler à ceci:
Nous savons déjà que
serverLogic
c'est un flux qui prend unByteString
et doit produire unByteString
. Avecdelimiter
nous pouvons diviser unByteString
en plus petites parties - dans notre cas, cela doit se produire chaque fois qu'un caractère de nouvelle ligne se produit.receiver
est le flux qui prend toutes les séquences d'octets fractionnés et les convertit en chaîne. C'est bien sûr une conversion dangereuse, car seuls les caractères ASCII imprimables doivent être convertis en chaîne, mais pour nos besoins, c'est assez bon.responder
est le dernier composant et est chargé de créer une réponse et de reconvertir la réponse en une séquence d'octets. Contrairement au graphique, nous n'avons pas divisé ce dernier composant en deux, car la logique est triviale. À la fin, nous connectons tous les flux à travers levia
fonction. À ce stade, on peut se demander si nous avons pris soin de la propriété multi-utilisateurs mentionnée au début. Et en effet, nous l'avons fait même si ce n'est peut-être pas évident immédiatement. En regardant ce graphique, il devrait devenir plus clair:Le
serverLogic
composant n'est rien d'autre qu'un flux contenant des flux plus petits. Ce composant prend une entrée, qui est une demande, et produit une sortie, qui est la réponse. Étant donné que les flux peuvent être construits plusieurs fois et qu'ils fonctionnent tous indépendamment les uns des autres, nous réalisons grâce à cette imbrication notre propriété multi-utilisateurs. Chaque demande est traitée dans sa propre demande et, par conséquent, une demande en cours d'exécution courte peut dépasser une demande en cours d'exécution précédemment démarrée. Au cas où vous vous le demanderiez, la définition deserverLogic
celle qui a été montrée précédemment peut bien sûr être écrite beaucoup plus courte en intégrant la plupart de ses définitions internes:Un test du serveur Web peut ressembler à ceci:
Pour que l'exemple de code ci-dessus fonctionne correctement, nous devons d'abord démarrer le serveur, qui est décrit par le
startServer
script:L'exemple de code complet de ce simple serveur TCP peut être trouvé ici . Nous ne pouvons pas seulement écrire un serveur avec Akka Streams mais aussi le client. Cela peut ressembler à ceci:
Le client TCP en code complet peut être trouvé ici . Le code semble assez similaire mais contrairement au serveur, nous n'avons plus à gérer les connexions entrantes.
Graphes complexes
Dans les sections précédentes, nous avons vu comment construire des programmes simples à partir de flux. Cependant, en réalité, il ne suffit souvent pas de simplement s'appuyer sur des fonctions déjà intégrées pour construire des flux plus complexes. Si nous voulons pouvoir utiliser Akka Streams pour des programmes arbitraires, nous devons savoir comment construire nos propres structures de contrôle personnalisées et flux combinables qui nous permettent de faire face à la complexité de nos applications. La bonne nouvelle est que Akka Streams a été conçu pour s'adapter aux besoins des utilisateurs et afin de vous donner une brève introduction dans les parties les plus complexes d'Akka Streams, nous ajoutons quelques fonctionnalités supplémentaires à notre exemple client / serveur.
Une chose que nous ne pouvons pas encore faire est de fermer une connexion. À ce stade, cela commence à devenir un peu plus compliqué car l'API de flux que nous avons vu jusqu'à présent ne nous permet pas d'arrêter un flux à un point arbitraire. Cependant, il y a l'
GraphStage
abstraction, qui peut être utilisée pour créer des étapes de traitement de graphe arbitraires avec un nombre illimité de ports d'entrée ou de sortie. Jetons d'abord un œil au côté serveur, où nous introduisons un nouveau composant, appelécloseConnection
:Cette API semble beaucoup plus encombrante que l'API de flux. Pas étonnant, nous devons faire beaucoup d'étapes impératives ici. En échange, nous avons plus de contrôle sur le comportement de nos flux. Dans l'exemple ci-dessus, nous spécifions uniquement un port d'entrée et un port de sortie et les mettons à la disposition du système en remplaçant la
shape
valeur. De plus, nous avons défini un soi-disantInHandler
et unOutHandler
, qui sont dans cet ordre chargés de recevoir et d'émettre les éléments. Si vous avez examiné attentivement l'exemple de flux de clics complet, vous devez déjà reconnaître ces composants. Dans leInHandler
nous prenons un élément et s'il s'agit d'une chaîne avec un seul caractère'q'
, nous voulons fermer le flux. Afin de donner au client une chance de découvrir que le flux sera bientôt fermé, nous émettons la chaîne"BYE"
puis nous fermons immédiatement la scène par la suite. LecloseConnection
composant peut être combiné avec un flux via lavia
méthode, qui a été introduite dans la section sur les flux.En plus de pouvoir fermer les connexions, il serait également intéressant de pouvoir afficher un message de bienvenue à une connexion nouvellement créée. Pour ce faire, nous devons encore une fois aller un peu plus loin:
La fonction
serverLogic
prend maintenant la connexion entrante comme paramètre. À l'intérieur de son corps, nous utilisons une DSL qui nous permet de décrire le comportement d'un flux complexe. Avecwelcome
nous créons un flux qui ne peut émettre qu'un seul élément - le message de bienvenue.logic
est ce qui a été décrit commeserverLogic
dans la section précédente. La seule différence notable est que nous y avons ajoutécloseConnection
. Maintenant vient en fait la partie intéressante de la DSL. LaGraphDSL.create
fonction met à disposition un générateurb
, qui est utilisé pour exprimer le flux sous forme de graphique. Avec la~>
fonction, il est possible de connecter les ports d'entrée et de sortie entre eux. LeConcat
composant utilisé dans l'exemple peut concaténer des éléments et est ici utilisé pour ajouter le message de bienvenue devant les autres éléments qui sortent deinternalLogic
. Dans la dernière ligne, nous rendons uniquement le port d'entrée de la logique du serveur et le port de sortie du flux concaténé parce que tous les autres ports doivent rester un détail d'implémentation duserverLogic
composant. Pour une introduction en profondeur au graphique DSL d'Akka Streams, visitez la section correspondante dans la documentation officielle . L'exemple de code complet du serveur TCP complexe et d'un client qui peut communiquer avec lui peut être trouvé ici . Chaque fois que vous ouvrez une nouvelle connexion à partir du client, vous devriez voir un message de bienvenue et en tapant"q"
sur le client, vous devriez voir un message qui vous indique que la connexion a été annulée.Il y a encore quelques sujets qui n'étaient pas couverts par cette réponse. En particulier, la matérialisation peut effrayer un lecteur ou un autre, mais je suis sûr qu'avec le matériel abordé ici, tout le monde devrait pouvoir passer les prochaines étapes par lui-même. Comme déjà dit, la documentation officielle est un bon endroit pour continuer à se renseigner sur Akka Streams.
la source