passer un flux Akka à un service en amont pour peupler

9

J'ai besoin d'appeler un service en amont (Azure Blob Service) pour pousser les données vers un OutputStream, que je dois ensuite retourner et repousser vers le client, via akka. Sans akka (et juste du code de servlet), j'obtiendrais simplement le ServletOutputStream et le passerais à la méthode du service azure.

Le plus proche que je peux essayer de trébucher, et clairement c'est faux, c'est quelque chose comme ça

        Source<ByteString, OutputStream> source = StreamConverters.asOutputStream().mapMaterializedValue(os -> {
            blobClient.download(os);
            return os;
        });

        ResponseEntity resposeEntity = HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, preAuthData.getFileSize(), source);

        sender().tell(new RequestResult(resposeEntity, StatusCodes.OK), self());

L'idée est d'appeler un service en amont pour obtenir un flux de sortie rempli en appelant blobClient.download (os);

Il semble que la fonction lambda soit appelée et retourne, mais ensuite elle échoue, car il n'y a pas de données ou quelque chose. Comme si je ne suis pas censé avoir cette fonction lambda pour faire le travail, mais peut-être retourner un objet qui fait le travail? Pas certain.

Comment est-que quelqu'un peut faire ça?

MeBigFatGuy
la source
Quel est le comportement de download? Transmet-il des données oset ne les renvoie- t-il qu'une fois que les données ont été écrites?
Alec

Réponses:

2

Le vrai problème ici est que l'API Azure n'est pas conçue pour une contre-pression. Il n'y a aucun moyen pour le flux de sortie de signaler à Azure qu'il n'est pas prêt pour plus de données. En d'autres termes: si Azure envoie des données plus rapidement que vous ne pouvez les consommer, il devra y avoir un échec de débordement de tampon laid quelque part.

En acceptant ce fait, la prochaine meilleure chose que nous pouvons faire est:

  • Utilisez cette option Source.lazySourcepour commencer à télécharger des données uniquement en cas de demande en aval (c'est-à-dire que la source est en cours d'exécution et que des données sont demandées).
  • Placez l' downloadappel dans un autre thread afin qu'il continue à s'exécuter sans bloquer le retour de la source. Une fois que cela est possible avec un Future(je ne sais pas quelles sont les meilleures pratiques Java, mais cela devrait bien fonctionner dans les deux cas). Bien que cela n'ait pas d'importance au départ, vous devrez peut-être choisir un contexte d'exécution autre que system.dispatcher- tout dépend du downloadblocage ou non.

Je m'excuse à l'avance si ce code Java est mal formé - j'utilise Akka avec Scala, donc tout cela en regardant l'API Java Akka et la référence de syntaxe Java.

ResponseEntity responseEntity = HttpEntities.create(
  ContentTypes.APPLICATION_OCTET_STREAM,
  preAuthData.getFileSize(),

  // Wait until there is downstream demand to intialize the source...
  Source.lazySource(() -> {
    // Pre-materialize the outputstream before the source starts running
    Pair<OutputStream, Source<ByteString, NotUsed>> pair =
      StreamConverters.asOutputStream().preMaterialize(system);

    // Start writing into the download stream in a separate thread
    Futures.future(() -> { blobClient.download(pair.first()); return pair.first(); }, system.getDispatcher());

    // Return the source - it should start running since `lazySource` indicated demand
    return pair.second();
  })
);

sender().tell(new RequestResult(responseEntity, StatusCodes.OK), self());
Alec
la source
Fantastique. Merci beaucoup. Une petite modification de votre exemple est: Futures.future (() -> {blobClient.download (pair.first ()); return pair.first ();}, system.getDispatcher ());
MeBigFatGuy
@MeBigFatGuy D'accord, merci!
Alec
1

Dans OutputStreamce cas, la "valeur matérialisée" de la Sourceet elle ne sera créée qu'une fois le flux exécuté (ou "matérialisée" dans un flux en cours d'exécution). L'exécuter est hors de votre contrôle puisque vous remettez le Sourceà Akka HTTP et cela exécutera plus tard votre source.

.mapMaterializedValue(matval -> ...)est généralement utilisé pour transformer la valeur matérialisée, mais comme il est invoqué dans le cadre de la matérialisation, vous pouvez l'utiliser pour effectuer des effets secondaires tels que l'envoi du matval dans un message, tout comme vous l'avez compris, il n'y a pas nécessairement de problème avec que même si ça a l'air génial. Il est important de comprendre que le flux ne terminera pas sa matérialisation et ne fonctionnera pas tant que ce lambda ne sera pas terminé. Cela signifie des problèmes sidownload() bloque plutôt que de gâcher du travail sur un autre thread et de revenir immédiatement.

Il existe cependant une autre solution: Source.preMaterialize()elle matérialise la source et vous donne une Pairvaleur matérialisée et une nouvelle Sourcequi peut être utilisée pour consommer la source déjà démarrée:

Pair<OutputStream, Source<ByteString, NotUsed>> pair = 
  StreamConverters.asOutputStream().preMaterialize(system);
OutputStream os = pair.first();
Source<ByteString, NotUsed> source = pair.second();

Notez qu'il y a quelques éléments supplémentaires à penser dans votre code, surtout si l' blobClient.download(os)appel bloque jusqu'à ce qu'il soit fait et que vous appeliez cela depuis l'acteur, dans ce cas, vous devez vous assurer que votre acteur ne mourra pas de faim et s'arrêtera d'autres acteurs de votre application en cours d'exécution (voir la documentation Akka: https://doc.akka.io/docs/akka/current/typed/dispatchers.html#blocking-needs-careful-management ).

johanandren
la source
1
Merci pour la réponse. Je ne vois pas comment cela pourrait éventuellement fonctionner? où vont les octets lorsque blobClient.download (os) est appelé (si je l'appelle moi-même)? Imaginez qu'il y ait un téraoctet de données en attente d'être écrit. il me semble que l'appel blobClient.download doit être appelé à partir de l'appel sender.tell afin que ce soit fondamentalement une opération de type IOUtils.copy. En utilisant preMaterialize, je ne vois pas comment cela se produit?
MeBigFatGuy
Le OutputStream a un tampon interne, il commencera à accepter les écritures jusqu'à ce que ce tampon se remplisse, si l'aval asynchrone n'a pas commencé à consommer des éléments alors il bloquera le thread d'écriture (c'est pourquoi j'ai mentionné qu'il est important de gérer le blocage).
johanandren
1
Mais si je preMaterialize et que j'obtiens OutputStream, c'est mon code qui fait le blobClient.download (os); correct? Cela signifie qu'il doit se terminer avant que je puisse continuer, ce qui est impossible.
MeBigFatGuy
Si le téléchargement (os) ne crée pas de thread, vous devrez faire face à son blocage et vous assurer que cela n'arrête pas une autre opération. Une façon serait de bifurquer un fil pour faire le travail, une autre serait de répondre d'abord de l'acteur et ensuite de faire le travail de blocage là-bas, dans ce cas, vous devez vous assurer que l'acteur ne mourra pas de faim d'autres acteurs, voir le lien à la fin de ma réponse.
johanandren
à ce stade, j'essaie simplement de le faire fonctionner. Il ne peut même pas traiter un fichier de 10 octets.
MeBigFatGuy