J'ai besoin d'appeler un service en amont (Azure Blob Service) pour pousser les données vers un OutputStream, que je dois ensuite retourner et repousser vers le client, via akka. Sans akka (et juste du code de servlet), j'obtiendrais simplement le ServletOutputStream et le passerais à la méthode du service azure.
Le plus proche que je peux essayer de trébucher, et clairement c'est faux, c'est quelque chose comme ça
Source<ByteString, OutputStream> source = StreamConverters.asOutputStream().mapMaterializedValue(os -> {
blobClient.download(os);
return os;
});
ResponseEntity resposeEntity = HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, preAuthData.getFileSize(), source);
sender().tell(new RequestResult(resposeEntity, StatusCodes.OK), self());
L'idée est d'appeler un service en amont pour obtenir un flux de sortie rempli en appelant blobClient.download (os);
Il semble que la fonction lambda soit appelée et retourne, mais ensuite elle échoue, car il n'y a pas de données ou quelque chose. Comme si je ne suis pas censé avoir cette fonction lambda pour faire le travail, mais peut-être retourner un objet qui fait le travail? Pas certain.
Comment est-que quelqu'un peut faire ça?
la source
download
? Transmet-il des donnéesos
et ne les renvoie- t-il qu'une fois que les données ont été écrites?Réponses:
Le vrai problème ici est que l'API Azure n'est pas conçue pour une contre-pression. Il n'y a aucun moyen pour le flux de sortie de signaler à Azure qu'il n'est pas prêt pour plus de données. En d'autres termes: si Azure envoie des données plus rapidement que vous ne pouvez les consommer, il devra y avoir un échec de débordement de tampon laid quelque part.
En acceptant ce fait, la prochaine meilleure chose que nous pouvons faire est:
Source.lazySource
pour commencer à télécharger des données uniquement en cas de demande en aval (c'est-à-dire que la source est en cours d'exécution et que des données sont demandées).download
appel dans un autre thread afin qu'il continue à s'exécuter sans bloquer le retour de la source. Une fois que cela est possible avec unFuture
(je ne sais pas quelles sont les meilleures pratiques Java, mais cela devrait bien fonctionner dans les deux cas). Bien que cela n'ait pas d'importance au départ, vous devrez peut-être choisir un contexte d'exécution autre quesystem.dispatcher
- tout dépend dudownload
blocage ou non.Je m'excuse à l'avance si ce code Java est mal formé - j'utilise Akka avec Scala, donc tout cela en regardant l'API Java Akka et la référence de syntaxe Java.
la source
Dans
OutputStream
ce cas, la "valeur matérialisée" de laSource
et elle ne sera créée qu'une fois le flux exécuté (ou "matérialisée" dans un flux en cours d'exécution). L'exécuter est hors de votre contrôle puisque vous remettez leSource
à Akka HTTP et cela exécutera plus tard votre source..mapMaterializedValue(matval -> ...)
est généralement utilisé pour transformer la valeur matérialisée, mais comme il est invoqué dans le cadre de la matérialisation, vous pouvez l'utiliser pour effectuer des effets secondaires tels que l'envoi du matval dans un message, tout comme vous l'avez compris, il n'y a pas nécessairement de problème avec que même si ça a l'air génial. Il est important de comprendre que le flux ne terminera pas sa matérialisation et ne fonctionnera pas tant que ce lambda ne sera pas terminé. Cela signifie des problèmes sidownload()
bloque plutôt que de gâcher du travail sur un autre thread et de revenir immédiatement.Il existe cependant une autre solution:
Source.preMaterialize()
elle matérialise la source et vous donne unePair
valeur matérialisée et une nouvelleSource
qui peut être utilisée pour consommer la source déjà démarrée:Notez qu'il y a quelques éléments supplémentaires à penser dans votre code, surtout si l'
blobClient.download(os)
appel bloque jusqu'à ce qu'il soit fait et que vous appeliez cela depuis l'acteur, dans ce cas, vous devez vous assurer que votre acteur ne mourra pas de faim et s'arrêtera d'autres acteurs de votre application en cours d'exécution (voir la documentation Akka: https://doc.akka.io/docs/akka/current/typed/dispatchers.html#blocking-needs-careful-management ).la source