Comment démarrer avec Akka Streams? [fermé]

222

La bibliothèque Akka Streams est déjà livrée avec une abondante documentation . Cependant, le principal problème pour moi est qu'il fournit trop de matériel - je me sens assez dépassé par le nombre de concepts que je dois apprendre. Beaucoup d'exemples montrés ici semblent très lourds et ne peuvent pas être facilement traduits dans des cas d'utilisation réels et sont donc assez ésotériques. Je pense que cela donne trop de détails sans expliquer comment construire tous les blocs de construction ensemble et comment exactement cela aide à résoudre des problèmes spécifiques.

Il y a des sources, des puits, des flux, des étapes de graphique, des graphiques partiels, la matérialisation, un graphique DSL et bien plus encore et je ne sais pas par où commencer. Le guide de démarrage rapide est censé être un point de départ, mais je ne le comprends pas. Il introduit simplement les concepts mentionnés ci-dessus sans les expliquer. De plus, les exemples de code ne peuvent pas être exécutés - il manque des parties qui me rendent plus ou moins impossible de suivre le texte.

Quelqu'un peut-il expliquer les concepts sources, puits, flux, étapes du graphique, graphiques partiels, matérialisation et peut-être d'autres choses que j'ai manquées en termes simples et avec des exemples simples qui n'expliquent pas chaque détail (et qui ne sont probablement pas nécessaires de toute façon à le début)?

kiritsuku
la source
2
Pour info, ceci est en cours de discussion sur meta
DavidG
10
En tant que première personne à voter pour clore ce dossier (en suivant le fil Meta), permettez-moi d'abord de dire que votre réponse ici est excellente . C'est vraiment approfondi et c'est certainement une ressource très utile. Cependant, malheureusement, la question que vous avez posée est beaucoup trop large pour Stack Overflow. Si d'une manière ou d'une autre votre réponse peut être postée à une question libellée différemment, alors génial, mais je ne pense pas que ce soit possible. Je vous conseille fortement de soumettre à nouveau ceci comme un article de blog ou quelque chose de similaire que vous-même et les autres pouvez utiliser comme ressource de référence dans les réponses futures.
James Donnelly
2
Je pense que rédiger cette question sous forme de blog ne serait pas efficace. Oui, c'est une question large - et c'est une très bonne question. Rétrécir sa portée ne l'améliorerait pas. La réponse fournie est formidable. Je suis sûr que Quora serait heureuse de retirer des affaires à SO pour les grandes questions.
Mike Slinn
11
@MikeSlinn n'essaie pas de discuter avec les gens de SO des questions appropriées, ils suivent aveuglément les règles. Tant que la question n'est pas supprimée, je suis heureux et je n'ai pas envie de passer à une autre plate-forme.
kiritsuku
2
@sschaef How pedantic. Oui, bien sûr, les règles ne valent rien, votre grand moi en sait tellement mieux et tous ceux qui essaient d'appliquer des règles suivent aveuglément le battage médiatique. / rant. plus sérieusement, ce serait un excellent ajout à la version bêta de la documentation, si vous y êtes. Vous pouvez toujours postuler et le mettre là-haut, mais vous devriez au moins voir que ce n'est pas un bon choix pour le site principal.
Félix Gagnon-Grenier

Réponses:

506

Cette réponse est basée sur la akka-streamversion 2.4.2. L'API peut être légèrement différente dans d'autres versions. La dépendance peut être consommée par sbt :

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2"

D'accord, commençons. L'API d'Akka Streams se compose de trois types principaux. Contrairement aux flux réactifs , ces types sont beaucoup plus puissants et donc plus complexes. On suppose que pour tous les exemples de code, les définitions suivantes existent déjà:

import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util._

implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher

Les importinstructions sont nécessaires pour les déclarations de type. systemreprésente le système d'acteurs d'Akka et materializerreprésente le contexte d'évaluation du flux. Dans notre cas, nous utilisons un ActorMaterializer, ce qui signifie que les flux sont évalués au-dessus des acteurs. Les deux valeurs sont marquées comme implicit, ce qui donne au compilateur Scala la possibilité d'injecter ces deux dépendances automatiquement chaque fois qu'elles sont nécessaires. Nous importons également system.dispatcher, qui est un contexte d'exécution pour Futures.

Une nouvelle API

Les flux Akka ont ces propriétés clés:

  • Ils implémentent la spécification Reactive Streams , dont les trois objectifs principaux, la contre-pression, les limites asynchrones et non bloquantes et l'interopérabilité entre les différentes implémentations, s'appliquent également pleinement aux Akka Streams.
  • Ils fournissent une abstraction pour un moteur d'évaluation des flux, qui est appelé Materializer.
  • Les programmes sont formulés comme des blocs de construction réutilisables, qui sont représentés comme les trois types principaux Source, Sinket Flow. Les blocs de construction forment un graphique dont l'évaluation est basée sur Materializeret doit être déclenchée explicitement.

Dans ce qui suit, une introduction plus approfondie sur la façon d'utiliser les trois principaux types sera donnée.

La source

A Sourceest un créateur de données, il sert de source d'entrée au flux. Chacun Sourcea un seul canal de sortie et aucun canal d'entrée. Toutes les données transitent par le canal de sortie vers tout ce qui est connecté au Source.

La source

Image prise à partir de boldradius.com .

Un Sourcepeut être créé de plusieurs façons:

scala> val s = Source.empty
s: akka.stream.scaladsl.Source[Nothing,akka.NotUsed] = ...

scala> val s = Source.single("single element")
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> val s = Source(1 to 3)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val s = Source(Future("single value from a Future"))
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> s runForeach println
res0: scala.concurrent.Future[akka.Done] = ...
single value from a Future

Dans les cas ci-dessus, nous avons alimenté le Sourceavec des données finies, ce qui signifie qu'elles se termineront éventuellement. Il ne faut pas oublier que les flux réactifs sont paresseux et asynchrones par défaut. Cela signifie que l'on doit explicitement demander l'évaluation du flux. Dans Akka Streams, cela peut être fait à travers les run*méthodes. La fonction runForeachne serait pas différente de la foreachfonction bien connue - grâce à l' runajout, il est explicite que nous demandions une évaluation du flux. Puisque les données finies sont ennuyeuses, nous continuons avec une infinie:

scala> val s = Source.repeat(5)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> s take 3 runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
5
5
5

Avec la takeméthode, nous pouvons créer un point d'arrêt artificiel qui nous empêche d'évaluer indéfiniment. Étant donné que la prise en charge des acteurs est intégrée, nous pouvons également facilement alimenter le flux avec des messages envoyés à un acteur:

def run(actor: ActorRef) = {
  Future { Thread.sleep(300); actor ! 1 }
  Future { Thread.sleep(200); actor ! 2 }
  Future { Thread.sleep(100); actor ! 3 }
}
val s = Source
  .actorRef[Int](bufferSize = 0, OverflowStrategy.fail)
  .mapMaterializedValue(run)

scala> s runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
3
2
1

Nous pouvons voir que les Futuressont exécutés de manière asynchrone sur différents threads, ce qui explique le résultat. Dans l'exemple ci-dessus, un tampon pour les éléments entrants n'est pas nécessaire et donc avec OverflowStrategy.failnous pouvons configurer que le flux doit échouer lors d'un débordement de tampon. Surtout grâce à cette interface d'acteur, nous pouvons alimenter le flux via n'importe quelle source de données. Peu importe si les données sont créées par le même thread, par un autre, par un autre processus ou si elles proviennent d'un système distant via Internet.

Évier

A Sinkest fondamentalement l'opposé de a Source. C'est le point final d'un flux et consomme donc des données. A Sinka un seul canal d'entrée et aucun canal de sortie. Sinkssont particulièrement nécessaires lorsque nous voulons spécifier le comportement du collecteur de données de manière réutilisable et sans évaluer le flux. Les run*méthodes déjà connues ne nous permettent pas ces propriétés, il est donc préférable d'utiliser à la Sinkplace.

Évier

Image prise à partir de boldradius.com .

Un petit exemple d'un Sinken action:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](elem => println(s"sink received: $elem"))
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val flow = source to sink
flow: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> flow.run()
res3: akka.NotUsed = NotUsed
sink received: 1
sink received: 2
sink received: 3

La connexion de a Sourceà a Sinkpeut être effectuée avec la tométhode. Il retourne un soi-disant RunnableFlow, qui est comme nous verrons plus tard une forme spéciale de a Flow- un flux qui peut être exécuté en appelant simplement sa run()méthode.

Flux exécutable

Image prise à partir de boldradius.com .

Il est bien sûr possible de transmettre toutes les valeurs qui arrivent à un puits à un acteur:

val actor = system.actorOf(Props(new Actor {
  override def receive = {
    case msg => println(s"actor received: $msg")
  }
}))

scala> val sink = Sink.actorRef[Int](actor, onCompleteMessage = "stream completed")
sink: akka.stream.scaladsl.Sink[Int,akka.NotUsed] = ...

scala> val runnable = Source(1 to 3) to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res3: akka.NotUsed = NotUsed
actor received: 1
actor received: 2
actor received: 3
actor received: stream completed

Couler

Les sources de données et les récepteurs sont excellents si vous avez besoin d'une connexion entre les flux Akka et un système existant, mais on ne peut vraiment rien faire avec eux. Les flux sont la dernière pièce manquante dans l'abstraction de base d'Akka Streams. Ils agissent comme un connecteur entre différents flux et peuvent être utilisés pour transformer ses éléments.

Couler

Image prise à partir de boldradius.com .

Si un Flowest connecté à Sourceun nouveau, Sourcec'est le résultat. De même, un Flowconnecté à un Sinkcrée un nouveau Sink. Et un Flowconnecté à la fois à a Sourceet à a pour Sinkrésultat un RunnableFlow. Par conséquent, ils se situent entre le canal d'entrée et le canal de sortie mais en eux-mêmes ne correspondent pas à l'une des saveurs tant qu'ils ne sont connectés ni à a Sourceni à a Sink.

Flux complet

Image prise à partir de boldradius.com .

Afin de mieux comprendre Flows, nous allons voir quelques exemples:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](println)
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val invert = Flow[Int].map(elem => elem * -1)
invert: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val doubler = Flow[Int].map(elem => elem * 2)
doubler: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val runnable = source via invert via doubler to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res10: akka.NotUsed = NotUsed
-2
-4
-6

Via la viaméthode, nous pouvons connecter un Sourceavec un Flow. Nous devons spécifier le type d'entrée car le compilateur ne peut pas le déduire pour nous. Comme nous pouvons déjà le voir dans cet exemple simple, les flux invertet doublesont complètement indépendants de tout producteur et consommateur de données. Ils transforment uniquement les données et les transmettent au canal de sortie. Cela signifie que nous pouvons réutiliser un flux parmi plusieurs flux:

scala> val s1 = Source(1 to 3) via invert to sink
s1: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> val s2 = Source(-3 to -1) via invert to sink
s2: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> s1.run()
res10: akka.NotUsed = NotUsed
-1
-2
-3

scala> s2.run()
res11: akka.NotUsed = NotUsed
3
2
1

s1et s2représentent des flux complètement nouveaux - ils ne partagent aucune donnée via leurs blocs de construction.

Flux de données illimités

Avant de poursuivre, nous devons d'abord revoir certains des aspects clés des flux réactifs. Un nombre illimité d'éléments peut arriver à tout moment et peut mettre un flux dans différents états. A côté d'un flux exécutable, qui est l'état habituel, un flux peut être arrêté soit par une erreur, soit par un signal qui indique qu'aucune autre donnée n'arrivera. Un flux peut être modélisé de manière graphique en marquant les événements sur une chronologie comme c'est le cas ici:

Montre qu'un flux est une séquence d'événements en cours ordonnés dans le temps

Image tirée de l'introduction à la programmation réactive que vous manquiez .

Nous avons déjà vu des flux exécutables dans les exemples de la section précédente. Nous obtenons un RunnableGraphchaque fois qu'un flux peut réellement être matérialisé, ce qui signifie qu'un Sinkest connecté à un Source. Jusqu'à présent, nous nous sommes toujours matérialisés à la valeur Unit, ce qui peut être vu dans les types:

val source: Source[Int, NotUsed] = Source(1 to 3)
val sink: Sink[Int, Future[Done]] = Sink.foreach[Int](println)
val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(x => x)

Pour Sourceet Sinkle deuxième paramètre de type et Flowle troisième paramètre de type désignent la valeur matérialisée. Tout au long de cette réponse, le sens complet de la matérialisation ne sera pas expliqué. Cependant, de plus amples détails sur la matérialisation peuvent être trouvés dans la documentation officielle . Pour l'instant, la seule chose que nous devons savoir, c'est que la valeur matérialisée est ce que nous obtenons lorsque nous exécutons un flux. Comme nous n'étions intéressés que par les effets secondaires jusqu'à présent, nous avons obtenu Unitla valeur matérialisée. L'exception à cela a été la matérialisation d'un évier, qui a abouti à un Future. Cela nous a redonnéFuture, car cette valeur peut indiquer la fin du flux connecté au récepteur. Jusqu'à présent, les exemples de code précédents étaient agréables pour expliquer le concept, mais ils étaient également ennuyeux car nous ne nous occupions que de flux finis ou de flux infinis très simples. Pour le rendre plus intéressant, dans ce qui suit un flux complet asynchrone et illimité sera expliqué.

Exemple ClickStream

Par exemple, nous voulons avoir un flux qui capture les événements de clic. Pour le rendre plus difficile, disons que nous voulons également regrouper les événements de clic qui se produisent peu de temps après l'autre. De cette façon, nous pourrions facilement découvrir des clics doubles, triples ou décuplés. De plus, nous voulons filtrer tous les clics simples. Respirez profondément et imaginez comment vous pourriez résoudre ce problème de manière impérative. Je parie que personne ne pourrait implémenter une solution qui fonctionne correctement au premier essai. De manière réactive, ce problème est trivial à résoudre. En fait, la solution est si simple et directe à implémenter que nous pouvons même l'exprimer dans un diagramme qui décrit directement le comportement du code:

La logique de l'exemple de flux de clics

Image tirée de l'introduction à la programmation réactive que vous manquiez .

Les cases grises sont des fonctions qui décrivent comment un flux est transformé en un autre. Avec la throttlefonction nous accumulons des clics dans les 250 millisecondes, les fonctions mapet filterdevraient être explicites. Les orbes de couleur représentent un événement et les flèches décrivent comment elles circulent dans nos fonctions. Plus tard dans les étapes de traitement, nous obtenons de moins en moins d'éléments qui traversent notre flux, car nous les regroupons et les filtrons. Le code de cette image ressemblerait à ceci:

val multiClickStream = clickStream
    .throttle(250.millis)
    .map(clickEvents => clickEvents.length)
    .filter(numberOfClicks => numberOfClicks >= 2)

Toute la logique peut être représentée en seulement quatre lignes de code! En Scala, nous pourrions l'écrire encore plus court:

val multiClickStream = clickStream.throttle(250.millis).map(_.length).filter(_ >= 2)

La définition de clickStreamest un peu plus complexe mais ce n'est que le cas car l'exemple de programme s'exécute sur la JVM, où la capture des événements de clic n'est pas facilement possible. Une autre complication est que Akka par défaut ne fournit pas la throttlefonction. Au lieu de cela, nous avons dû l'écrire par nous-mêmes. Étant donné que cette fonction est (comme c'est le cas pour les fonctions mapou filter) réutilisable dans différents cas d'utilisation, je ne compte pas ces lignes pour le nombre de lignes dont nous avions besoin pour implémenter la logique. Dans les langages impératifs cependant, il est normal que la logique ne puisse pas être réutilisée aussi facilement et que les différentes étapes logiques se produisent toutes au même endroit au lieu d'être appliquées séquentiellement, ce qui signifie que nous aurions probablement déformé notre code avec la logique de limitation. L'exemple de code complet est disponible en tant queessentiel et ne sera plus abordé ici.

Exemple SimpleWebServer

Ce qui devrait être discuté à la place est un autre exemple. Bien que le flux de clics soit un bel exemple pour laisser Akka Streams gérer un exemple du monde réel, il n'a pas le pouvoir de montrer l'exécution parallèle en action. L'exemple suivant doit représenter un petit serveur Web qui peut gérer plusieurs demandes en parallèle. Le serveur Web doit pouvoir accepter les connexions entrantes et en recevoir des séquences d'octets qui représentent des signes ASCII imprimables. Ces séquences d'octets ou chaînes doivent être divisées à tous les caractères de nouvelle ligne en parties plus petites. Après cela, le serveur répondra au client avec chacune des lignes divisées. Alternativement, il pourrait faire autre chose avec les lignes et donner un jeton de réponse spécial, mais nous voulons rester simple dans cet exemple et donc n'introduire aucune fonctionnalité de fantaisie. Rappelles toi, le serveur doit être capable de gérer plusieurs demandes en même temps, ce qui signifie essentiellement qu'aucune demande n'est autorisée à bloquer toute autre demande de l'exécution ultérieure. La résolution de toutes ces exigences peut être difficile de manière impérative - avec Akka Streams cependant, nous ne devrions pas avoir besoin de plus de quelques lignes pour résoudre ces problèmes. Tout d'abord, voyons le serveur lui-même:

serveur

Fondamentalement, il n'y a que trois blocs de construction principaux. Le premier doit accepter les connexions entrantes. Le second doit gérer les demandes entrantes et le troisième doit envoyer une réponse. L'implémentation de ces trois blocs de construction est seulement un peu plus compliquée que l'implémentation du flux de clics:

def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = {
  import system.dispatcher

  val connectionHandler: Sink[Tcp.IncomingConnection, Future[Unit]] =
    Sink.foreach[Tcp.IncomingConnection] { conn =>
      println(s"Incoming connection from: ${conn.remoteAddress}")
      conn.handleWith(serverLogic)
    }

  val incomingCnnections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] =
    Tcp().bind(address, port)

  val binding: Future[Tcp.ServerBinding] =
    incomingCnnections.to(connectionHandler).run()

  binding onComplete {
    case Success(b) =>
      println(s"Server started, listening on: ${b.localAddress}")
    case Failure(e) =>
      println(s"Server could not be bound to $address:$port: ${e.getMessage}")
  }
}

La fonction mkServerprend (outre l'adresse et le port du serveur) également un système d'acteur et un matérialiseur comme paramètres implicites. Le flux de contrôle du serveur est représenté par binding, qui prend une source de connexions entrantes et les transmet à un récepteur de connexions entrantes. À l'intérieur de connectionHandler, qui est notre évier, nous traitons chaque connexion par le flux serverLogic, qui sera décrit plus loin. bindingrenvoie unFuture, qui se termine lorsque le serveur a été démarré ou que le démarrage a échoué, ce qui pourrait être le cas lorsque le port est déjà pris par un autre processus. Cependant, le code ne reflète pas complètement le graphique car nous ne pouvons pas voir un bloc de construction qui gère les réponses. La raison en est que la connexion fournit déjà cette logique par elle-même. Il s'agit d'un flux bidirectionnel et pas seulement unidirectionnel comme les flux que nous avons vus dans les exemples précédents. Comme ce fut le cas pour la matérialisation, de tels flux complexes ne seront pas expliqués ici. La documentation officielle contient de nombreux éléments pour couvrir des graphiques de flux plus complexes. Pour l'instant, il suffit de savoir que Tcp.IncomingConnectionreprésente une connexion qui sait comment recevoir des requêtes et comment envoyer des réponses. La partie qui manque encore est laserverLogicbloc de construction. Cela peut ressembler à ceci:

logique du serveur

Encore une fois, nous sommes en mesure de diviser la logique en plusieurs blocs de construction simples qui forment tous ensemble le flux de notre programme. Nous voulons d'abord diviser notre séquence d'octets en lignes, ce que nous devons faire chaque fois que nous trouvons un caractère de nouvelle ligne. Après cela, les octets de chaque ligne doivent être convertis en chaîne car travailler avec des octets bruts est fastidieux. Dans l'ensemble, nous pourrions recevoir un flux binaire d'un protocole compliqué, ce qui rendrait le travail avec les données brutes entrantes extrêmement difficile. Une fois que nous avons une chaîne lisible, nous pouvons créer une réponse. Pour des raisons de simplicité, la réponse peut être n'importe quoi dans notre cas. En fin de compte, nous devons reconvertir notre réponse en une séquence d'octets qui peuvent être envoyés sur le câble. Le code de la logique entière peut ressembler à ceci:

val serverLogic: Flow[ByteString, ByteString, Unit] = {
  val delimiter = Framing.delimiter(
    ByteString("\n"),
    maximumFrameLength = 256,
    allowTruncation = true)

  val receiver = Flow[ByteString].map { bytes =>
    val message = bytes.utf8String
    println(s"Server received: $message")
    message
  }

  val responder = Flow[String].map { message =>
    val answer = s"Server hereby responds to message: $message\n"
    ByteString(answer)
  }

  Flow[ByteString]
    .via(delimiter)
    .via(receiver)
    .via(responder)
}

Nous savons déjà que serverLogicc'est un flux qui prend un ByteStringet doit produire un ByteString. Avec delimiternous pouvons diviser un ByteStringen plus petites parties - dans notre cas, cela doit se produire chaque fois qu'un caractère de nouvelle ligne se produit. receiverest le flux qui prend toutes les séquences d'octets fractionnés et les convertit en chaîne. C'est bien sûr une conversion dangereuse, car seuls les caractères ASCII imprimables doivent être convertis en chaîne, mais pour nos besoins, c'est assez bon. responderest le dernier composant et est chargé de créer une réponse et de reconvertir la réponse en une séquence d'octets. Contrairement au graphique, nous n'avons pas divisé ce dernier composant en deux, car la logique est triviale. À la fin, nous connectons tous les flux à travers leviafonction. À ce stade, on peut se demander si nous avons pris soin de la propriété multi-utilisateurs mentionnée au début. Et en effet, nous l'avons fait même si ce n'est peut-être pas évident immédiatement. En regardant ce graphique, il devrait devenir plus clair:

serveur et logique de serveur combinés

Le serverLogiccomposant n'est rien d'autre qu'un flux contenant des flux plus petits. Ce composant prend une entrée, qui est une demande, et produit une sortie, qui est la réponse. Étant donné que les flux peuvent être construits plusieurs fois et qu'ils fonctionnent tous indépendamment les uns des autres, nous réalisons grâce à cette imbrication notre propriété multi-utilisateurs. Chaque demande est traitée dans sa propre demande et, par conséquent, une demande en cours d'exécution courte peut dépasser une demande en cours d'exécution précédemment démarrée. Au cas où vous vous le demanderiez, la définition de serverLogiccelle qui a été montrée précédemment peut bien sûr être écrite beaucoup plus courte en intégrant la plupart de ses définitions internes:

val serverLogic = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(msg => s"Server hereby responds to message: $msg\n")
  .map(ByteString(_))

Un test du serveur Web peut ressembler à ceci:

$ # Client
$ echo "Hello World\nHow are you?" | netcat 127.0.0.1 6666
Server hereby responds to message: Hello World
Server hereby responds to message: How are you?

Pour que l'exemple de code ci-dessus fonctionne correctement, nous devons d'abord démarrer le serveur, qui est décrit par le startServerscript:

$ # Server
$ ./startServer 127.0.0.1 6666
[DEBUG] Server started, listening on: /127.0.0.1:6666
[DEBUG] Incoming connection from: /127.0.0.1:37972
[DEBUG] Server received: Hello World
[DEBUG] Server received: How are you?

L'exemple de code complet de ce simple serveur TCP peut être trouvé ici . Nous ne pouvons pas seulement écrire un serveur avec Akka Streams mais aussi le client. Cela peut ressembler à ceci:

val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(println)
  .map(_ ⇒ StdIn.readLine("> "))
  .map(_+"\n")
  .map(ByteString(_))

connection.join(flow).run()

Le client TCP en code complet peut être trouvé ici . Le code semble assez similaire mais contrairement au serveur, nous n'avons plus à gérer les connexions entrantes.

Graphes complexes

Dans les sections précédentes, nous avons vu comment construire des programmes simples à partir de flux. Cependant, en réalité, il ne suffit souvent pas de simplement s'appuyer sur des fonctions déjà intégrées pour construire des flux plus complexes. Si nous voulons pouvoir utiliser Akka Streams pour des programmes arbitraires, nous devons savoir comment construire nos propres structures de contrôle personnalisées et flux combinables qui nous permettent de faire face à la complexité de nos applications. La bonne nouvelle est que Akka Streams a été conçu pour s'adapter aux besoins des utilisateurs et afin de vous donner une brève introduction dans les parties les plus complexes d'Akka Streams, nous ajoutons quelques fonctionnalités supplémentaires à notre exemple client / serveur.

Une chose que nous ne pouvons pas encore faire est de fermer une connexion. À ce stade, cela commence à devenir un peu plus compliqué car l'API de flux que nous avons vu jusqu'à présent ne nous permet pas d'arrêter un flux à un point arbitraire. Cependant, il y a l' GraphStageabstraction, qui peut être utilisée pour créer des étapes de traitement de graphe arbitraires avec un nombre illimité de ports d'entrée ou de sortie. Jetons d'abord un œil au côté serveur, où nous introduisons un nouveau composant, appelé closeConnection:

val closeConnection = new GraphStage[FlowShape[String, String]] {
  val in = Inlet[String]("closeConnection.in")
  val out = Outlet[String]("closeConnection.out")

  override val shape = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
      override def onPush() = grab(in) match {
        case "q" ⇒
          push(out, "BYE")
          completeStage()
        case msg ⇒
          push(out, s"Server hereby responds to message: $msg\n")
      }
    })
    setHandler(out, new OutHandler {
      override def onPull() = pull(in)
    })
  }
}

Cette API semble beaucoup plus encombrante que l'API de flux. Pas étonnant, nous devons faire beaucoup d'étapes impératives ici. En échange, nous avons plus de contrôle sur le comportement de nos flux. Dans l'exemple ci-dessus, nous spécifions uniquement un port d'entrée et un port de sortie et les mettons à la disposition du système en remplaçant la shapevaleur. De plus, nous avons défini un soi-disant InHandleret un OutHandler, qui sont dans cet ordre chargés de recevoir et d'émettre les éléments. Si vous avez examiné attentivement l'exemple de flux de clics complet, vous devez déjà reconnaître ces composants. Dans le InHandlernous prenons un élément et s'il s'agit d'une chaîne avec un seul caractère 'q', nous voulons fermer le flux. Afin de donner au client une chance de découvrir que le flux sera bientôt fermé, nous émettons la chaîne"BYE"puis nous fermons immédiatement la scène par la suite. Le closeConnectioncomposant peut être combiné avec un flux via la viaméthode, qui a été introduite dans la section sur les flux.

En plus de pouvoir fermer les connexions, il serait également intéressant de pouvoir afficher un message de bienvenue à une connexion nouvellement créée. Pour ce faire, nous devons encore une fois aller un peu plus loin:

def serverLogic
    (conn: Tcp.IncomingConnection)
    (implicit system: ActorSystem)
    : Flow[ByteString, ByteString, NotUsed]
    = Flow.fromGraph(GraphDSL.create() { implicit b ⇒
  import GraphDSL.Implicits._
  val welcome = Source.single(ByteString(s"Welcome port ${conn.remoteAddress}!\n"))
  val logic = b.add(internalLogic)
  val concat = b.add(Concat[ByteString]())
  welcome ~> concat.in(0)
  logic.outlet ~> concat.in(1)

  FlowShape(logic.in, concat.out)
})

La fonction serverLogic prend maintenant la connexion entrante comme paramètre. À l'intérieur de son corps, nous utilisons une DSL qui nous permet de décrire le comportement d'un flux complexe. Avec welcomenous créons un flux qui ne peut émettre qu'un seul élément - le message de bienvenue. logicest ce qui a été décrit comme serverLogicdans la section précédente. La seule différence notable est que nous y avons ajouté closeConnection. Maintenant vient en fait la partie intéressante de la DSL. La GraphDSL.createfonction met à disposition un générateur b, qui est utilisé pour exprimer le flux sous forme de graphique. Avec la ~>fonction, il est possible de connecter les ports d'entrée et de sortie entre eux. Le Concatcomposant utilisé dans l'exemple peut concaténer des éléments et est ici utilisé pour ajouter le message de bienvenue devant les autres éléments qui sortent deinternalLogic. Dans la dernière ligne, nous rendons uniquement le port d'entrée de la logique du serveur et le port de sortie du flux concaténé parce que tous les autres ports doivent rester un détail d'implémentation du serverLogiccomposant. Pour une introduction en profondeur au graphique DSL d'Akka Streams, visitez la section correspondante dans la documentation officielle . L'exemple de code complet du serveur TCP complexe et d'un client qui peut communiquer avec lui peut être trouvé ici . Chaque fois que vous ouvrez une nouvelle connexion à partir du client, vous devriez voir un message de bienvenue et en tapant "q"sur le client, vous devriez voir un message qui vous indique que la connexion a été annulée.

Il y a encore quelques sujets qui n'étaient pas couverts par cette réponse. En particulier, la matérialisation peut effrayer un lecteur ou un autre, mais je suis sûr qu'avec le matériel abordé ici, tout le monde devrait pouvoir passer les prochaines étapes par lui-même. Comme déjà dit, la documentation officielle est un bon endroit pour continuer à se renseigner sur Akka Streams.

kiritsuku
la source
4
@monksy Je n'avais pas prévu de publier ça ailleurs. N'hésitez pas à le republier sur votre blog si vous le souhaitez. L'API est aujourd'hui stable dans la plupart des régions, ce qui signifie que vous n'avez probablement même pas à vous soucier de la maintenance (la plupart des articles de blog sur Akka Streams sont obsolètes car ils montrent une API qui n'existe plus).
kiritsuku
3
Cela ne disparaîtra pas. Pourquoi cela?
kiritsuku
2
@sschaef Il pourrait bien disparaître car la question est hors sujet et a été fermée en tant que telle.
DavidG
7
@Magisch Rappelez-vous toujours: "Nous ne supprimons pas le bon contenu." Je ne suis pas tout à fait sûr, mais je suppose que cette réponse pourrait effectivement être qualifiée, malgré tout.
Déduplicateur
9
Ce message pourrait être bon pour la nouvelle fonctionnalité de documentation de Stack Overflow - une fois ouverte pour Scala.
SL Barth - Reinstate Monica