Disons que je demande un gros fichier json qui contient une liste de nombreux objets. Je ne veux pas qu'ils soient en mémoire d'un seul coup, mais je préfère les lire et les traiter un par un. J'ai donc besoin de transformer un System.IO.Stream
flux asynchrone en un IAsyncEnumerable<T>
. Comment utiliser la nouvelle System.Text.Json
API pour ce faire?
private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
{
using (var stream = await httpResponse.Content.ReadAsStreamAsync())
{
// Probably do something with JsonSerializer.DeserializeAsync here without serializing the entire thing in one go
}
}
}
c#
.net-core
.net-core-3.0
c#-8.0
system.text.json
Rick de Water
la source
la source
Utf8JsonReader
, veuillez consulter quelques exemples de github et le fil existant égalementGetAsync
retourne de lui-même lorsque la réponse entière est reçue. Vous devez utiliserSendAsync
avec `HttpCompletionOption.ResponseContentRead` à la place. Une fois que vous avez cela, vous pouvez utiliser JsonTextReader de JSON.NET . L'utilisationSystem.Text.Json
pour cela n'est pas aussi simple que ce problème le montre . La fonctionnalité n'est pas disponible et l'implémenter dans une allocation à faible allocation à l'aide de structures n'est pas anodinRéponses:
Oui, un sérialiseur JSON (de) véritablement en streaming serait une belle amélioration des performances à avoir, dans tant d'endroits.
Malheureusement,
System.Text.Json
ne le fait pas pour le moment. Je ne sais pas si ce sera le cas à l'avenir - je l'espère! La désérialisation de JSON en streaming réel s'avère plutôt difficile.Vous pouvez vérifier si le Utf8Json prend en charge, peut-être.
Cependant, il peut y avoir une solution personnalisée pour votre situation spécifique, car vos besoins semblent limiter la difficulté.
L'idée est de lire manuellement un élément du tableau à la fois. Nous utilisons le fait que chaque élément de la liste est, en soi, un objet JSON valide.
Vous pouvez ignorer manuellement le
[
(pour le premier élément) ou le,
(pour chaque élément suivant). Ensuite, je pense que votre meilleur pari est d'utiliser .NET CoreUtf8JsonReader
pour déterminer où se termine l'objet actuel et alimenter les octets numérisésJsonDeserializer
.De cette façon, vous ne tamponnez que légèrement sur un objet à la fois.
Et puisque nous parlons de performances, vous pouvez obtenir l'entrée d'un
PipeReader
, pendant que vous y êtes. :-)la source
TL; DR Ce n'est pas anodin
On dirait que quelqu'un a déjà publié du code complet pour une
Utf8JsonStreamReader
structure qui lit les tampons d'un flux et les alimente à un Utf8JsonRreader, permettant une désérialisation facile avecJsonSerializer.Deserialize<T>(ref newJsonReader, options);
. Le code n'est pas banal non plus. La question connexe est ici et la réponse est ici .Mais cela ne suffit pas -
HttpClient.GetAsync
ne reviendra qu'après la réception de la réponse entière, mettant essentiellement tout en mémoire tampon.Pour éviter cela, HttpClient.GetAsync (chaîne, HttpCompletionOption) doit être utilisé avec
HttpCompletionOption.ResponseHeadersRead
.La boucle de désérialisation doit également vérifier le jeton d'annulation et quitter ou lancer s'il est signalé. Sinon, la boucle continuera jusqu'à ce que le flux entier soit reçu et traité.
Ce code est basé sur l'exemple de la réponse associée et utilise
HttpCompletionOption.ResponseHeadersRead
et vérifie le jeton d'annulation. Il peut analyser les chaînes JSON qui contiennent un tableau approprié d'éléments, par exemple:Le premier appel à
jsonStreamReader.Read()
se déplace au début du tableau tandis que le second se déplace au début du premier objet. La boucle elle-même se termine lorsque la fin du tableau (]
) est détectée.Fragments JSON, streaming AKA JSON aka ... *
Il est assez courant dans les scénarios de streaming ou de journalisation d'événements d'ajouter des objets JSON individuels à un fichier, un élément par ligne, par exemple:
Ce n'est pas un document JSON valide mais les fragments individuels sont valides. Cela présente plusieurs avantages pour les Big Data / scénarios hautement concurrents. L'ajout d'un nouvel événement nécessite uniquement l'ajout d'une nouvelle ligne au fichier, et non l'analyse et la reconstruction de l'ensemble du fichier. Le traitement , en particulier le traitement parallèle , est plus facile pour deux raisons:
Utilisation d'un StreamReader
La façon d'allouer-y pour ce faire serait d'utiliser un TextReader, de lire une ligne à la fois et de l'analyser avec JsonSerializer.Deserialize :
C'est beaucoup plus simple que le code qui désérialise un tableau approprié. Il y a deux problèmes:
ReadLineAsync
n'accepte pas de jeton d'annulationCela peut être suffisant, car essayer de produire les
ReadOnlySpan<Byte>
tampons nécessaires à JsonSerializer.Deserialize n'est pas anodin.Pipelines et SequenceReader
Pour éviter les allocations, nous devons obtenir un
ReadOnlySpan<byte>
du flux. Pour ce faire, vous devez utiliser les canaux System.IO.Pipeline et la structure SequenceReader . Une introduction de Steve Gordon à SequenceReader explique comment cette classe peut être utilisée pour lire les données d'un flux à l'aide de délimiteurs.Malheureusement,
SequenceReader
c'est une structure ref qui signifie qu'elle ne peut pas être utilisée dans les méthodes asynchrones ou locales. Voilà pourquoi Steve Gordon dans son article crée unpour lire les éléments à partir d'une ReadOnlySequence et renvoyer la position de fin, afin que le PipeReader puisse en reprendre. Malheureusement, nous voulons retourner un IEnumerable ou IAsyncEnumerable, et les méthodes d'itérateur n'aiment pas
in
ouout
paramètres non plus .Nous pourrions collecter les éléments désérialisés dans une liste ou une file d'attente et les renvoyer en tant que résultat unique, mais cela allouerait toujours des listes, des tampons ou des nœuds et devrait attendre que tous les éléments d'un tampon soient désérialisés avant de retourner:
Nous avons besoin de quelque chose qui agit comme un énumérable sans nécessiter une méthode d'itérateur, fonctionne avec async et ne met pas tout en mémoire tampon.
Ajout de canaux pour produire un IAsyncEnumerable
ChannelReader.ReadAllAsync renvoie un IAsyncEnumerable. Nous pouvons renvoyer un ChannelReader à partir de méthodes qui ne pouvaient pas fonctionner comme itérateurs et produire toujours un flux d'éléments sans mise en cache.
En adaptant le code de Steve Gordon pour utiliser des canaux, nous obtenons les ReadItems (ChannelWriter ...) et les
ReadLastItem
méthodes. Le premier, lit un élément à la fois, jusqu'à une nouvelle ligne en utilisantReadOnlySpan<byte> itemBytes
. Cela peut être utilisé parJsonSerializer.Deserialize
. SiReadItems
ne trouve pas le délimiteur, il renvoie sa position afin que le PipelineReader puisse extraire le morceau suivant du flux.Lorsque nous atteignons le dernier morceau et qu'il n'y a pas d'autre délimiteur, ReadLastItem` lit les octets restants et les désérialise.
Le code est presque identique à celui de Steve Gordon. Au lieu d'écrire sur la console, nous écrivons sur ChannelWriter.
La
DeserializeToChannel<T>
méthode crée un lecteur Pipeline au-dessus du flux, crée un canal et démarre une tâche de travail qui analyse les morceaux et les pousse vers le canal:ChannelReader.ReceiveAllAsync()
peut être utilisé pour consommer tous les articles viaIAsyncEnumerable<T>
:la source
Il semble que vous ayez besoin d'implémenter votre propre lecteur de flux. Vous devez lire les octets un par un et vous arrêter dès que la définition d'objet est terminée. Il est en effet assez bas niveau. En tant que tel, vous ne chargez PAS le fichier entier dans la RAM, mais prenez plutôt la partie avec laquelle vous traitez. Semble-t-il être une réponse?
la source
Vous pourriez peut-être utiliser le
Newtonsoft.Json
sérialiseur? https://www.newtonsoft.com/json/help/html/Performance.htmVoir en particulier la section:
Éditer
Vous pouvez essayer de désérialiser les valeurs de JsonTextReader, par exemple
la source
I don't want them to be in memory all at once, but I would rather read and process them one by one.
La classe appropriée dans JSON.NET est JsonTextReader.