Désérialisation asynchrone d'une liste à l'aide de System.Text.Json

11

Disons que je demande un gros fichier json qui contient une liste de nombreux objets. Je ne veux pas qu'ils soient en mémoire d'un seul coup, mais je préfère les lire et les traiter un par un. J'ai donc besoin de transformer un System.IO.Streamflux asynchrone en un IAsyncEnumerable<T>. Comment utiliser la nouvelle System.Text.JsonAPI pour ce faire?

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
    using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
    {
        using (var stream = await httpResponse.Content.ReadAsStreamAsync())
        {
            // Probably do something with JsonSerializer.DeserializeAsync here without serializing the entire thing in one go
        }
    }
}
Rick de Water
la source
1
Vous aurez probablement besoin de quelque chose comme DeserializeAsync méthode
Pavel Anikhouski
2
Désolé, il semble que la méthode ci-dessus charge tout le flux dans la mémoire. Vous pouvez lire les données par morceaux en utilisant de manière asynchrone Utf8JsonReader, veuillez consulter quelques exemples de github et le fil existant également
Pavel Anikhouski
GetAsyncretourne de lui-même lorsque la réponse entière est reçue. Vous devez utiliser SendAsyncavec `HttpCompletionOption.ResponseContentRead` à la place. Une fois que vous avez cela, vous pouvez utiliser JsonTextReader de JSON.NET . L'utilisation System.Text.Jsonpour cela n'est pas aussi simple que ce problème le montre . La fonctionnalité n'est pas disponible et l'implémenter dans une allocation à faible allocation à l'aide de structures n'est pas anodin
Panagiotis Kanavos
Le problème avec la désérialisation en morceaux est que vous devez savoir quand vous avez un morceau complet à désérialiser. Cela serait difficile à réaliser proprement pour les cas généraux. Cela nécessiterait une analyse préalable, ce qui pourrait être un mauvais compromis en termes de performances. Il serait plutôt difficile de généraliser. Mais si vous appliquez vos propres restrictions sur votre JSON, par exemple "un seul objet occupe exactement 20 lignes dans le fichier", vous pouvez essentiellement désérialiser de manière asynchrone en lisant le fichier en morceaux asynchrones. Vous auriez besoin d'un énorme JSON pour voir les avantages ici, j'imagine.
DetectivePikachu
On dirait que quelqu'un a déjà répondu à une question similaire ici avec le code complet.
Panagiotis Kanavos

Réponses:

4

Oui, un sérialiseur JSON (de) véritablement en streaming serait une belle amélioration des performances à avoir, dans tant d'endroits.

Malheureusement, System.Text.Jsonne le fait pas pour le moment. Je ne sais pas si ce sera le cas à l'avenir - je l'espère! La désérialisation de JSON en streaming réel s'avère plutôt difficile.

Vous pouvez vérifier si le Utf8Json prend en charge, peut-être.

Cependant, il peut y avoir une solution personnalisée pour votre situation spécifique, car vos besoins semblent limiter la difficulté.

L'idée est de lire manuellement un élément du tableau à la fois. Nous utilisons le fait que chaque élément de la liste est, en soi, un objet JSON valide.

Vous pouvez ignorer manuellement le [(pour le premier élément) ou le ,(pour chaque élément suivant). Ensuite, je pense que votre meilleur pari est d'utiliser .NET Core Utf8JsonReaderpour déterminer où se termine l'objet actuel et alimenter les octets numérisés JsonDeserializer.

De cette façon, vous ne tamponnez que légèrement sur un objet à la fois.

Et puisque nous parlons de performances, vous pouvez obtenir l'entrée d'un PipeReader, pendant que vous y êtes. :-)

Timo
la source
Il ne s'agit pas du tout de performance. Il ne s'agit pas de désérialisation asynchrone, ce qu'il fait déjà . Il s'agit de l'accès en streaming - le traitement des éléments JSON lorsqu'ils sont analysés à partir du flux, comme le fait JsonTextReader de JSON.NET.
Panagiotis Kanavos
La classe pertinente dans Utf8Json est JsonReader et comme le dit l'auteur, c'est bizarre. JsonTextReader de JSON.NET et Utf8JsonReader de System.Text.Json partagent la même bizarrerie - vous devez boucler et vérifier le type de l'élément actuel au fur et à mesure.
Panagiotis Kanavos
@PanagiotisKanavos Ah, oui, en streaming. C'est le mot que je cherchais! Je mets à jour le mot "asynchrone" en "streaming". Je crois que la raison de vouloir le streaming limite l'utilisation de la mémoire, ce qui est un problème de performances. OP peut peut-être confirmer.
Timo
La performance ne signifie pas la vitesse. Quelle que soit la vitesse du désérialiseur, si vous devez traiter des éléments 1M, vous ne voulez pas les stocker dans la RAM, ni attendre qu'ils soient tous désérialisés avant de pouvoir traiter le premier.
Panagiotis Kanavos
Sémantique, mon ami! Je suis content que nous essayions de réaliser la même chose après tout.
Timo
4

TL; DR Ce n'est pas anodin


On dirait que quelqu'un a déjà publié du code complet pour une Utf8JsonStreamReaderstructure qui lit les tampons d'un flux et les alimente à un Utf8JsonRreader, permettant une désérialisation facile avec JsonSerializer.Deserialize<T>(ref newJsonReader, options);. Le code n'est pas banal non plus. La question connexe est ici et la réponse est ici .

Mais cela ne suffit pas - HttpClient.GetAsync ne reviendra qu'après la réception de la réponse entière, mettant essentiellement tout en mémoire tampon.

Pour éviter cela, HttpClient.GetAsync (chaîne, HttpCompletionOption) doit être utilisé avecHttpCompletionOption.ResponseHeadersRead .

La boucle de désérialisation doit également vérifier le jeton d'annulation et quitter ou lancer s'il est signalé. Sinon, la boucle continuera jusqu'à ce que le flux entier soit reçu et traité.

Ce code est basé sur l'exemple de la réponse associée et utilise HttpCompletionOption.ResponseHeadersReadet vérifie le jeton d'annulation. Il peut analyser les chaînes JSON qui contiennent un tableau approprié d'éléments, par exemple:

[{"prop1":123},{"prop1":234}]

Le premier appel à jsonStreamReader.Read()se déplace au début du tableau tandis que le second se déplace au début du premier objet. La boucle elle-même se termine lorsque la fin du tableau ( ]) est détectée.

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
    //Don't cache the entire response
    using var httpResponse = await httpClient.GetAsync(url,                               
                                                       HttpCompletionOption.ResponseHeadersRead,  
                                                       cancellationToken);
    using var stream = await httpResponse.Content.ReadAsStreamAsync();
    using var jsonStreamReader = new Utf8JsonStreamReader(stream, 32 * 1024);

    jsonStreamReader.Read(); // move to array start
    jsonStreamReader.Read(); // move to start of the object

    while (jsonStreamReader.TokenType != JsonTokenType.EndArray)
    {
        //Gracefully return if cancellation is requested.
        //Could be cancellationToken.ThrowIfCancellationRequested()
        if(cancellationToken.IsCancellationRequested)
        {
            return;
        }

        // deserialize object
        var obj = jsonStreamReader.Deserialize<T>();
        yield return obj;

        // JsonSerializer.Deserialize ends on last token of the object parsed,
        // move to the first token of next object
        jsonStreamReader.Read();
    }
}

Fragments JSON, streaming AKA JSON aka ... *

Il est assez courant dans les scénarios de streaming ou de journalisation d'événements d'ajouter des objets JSON individuels à un fichier, un élément par ligne, par exemple:

{"eventId":1}
{"eventId":2}
...
{"eventId":1234567}

Ce n'est pas un document JSON valide mais les fragments individuels sont valides. Cela présente plusieurs avantages pour les Big Data / scénarios hautement concurrents. L'ajout d'un nouvel événement nécessite uniquement l'ajout d'une nouvelle ligne au fichier, et non l'analyse et la reconstruction de l'ensemble du fichier. Le traitement , en particulier le traitement parallèle , est plus facile pour deux raisons:

  • Les éléments individuels peuvent être récupérés un à la fois, simplement en lisant une ligne dans un flux.
  • Le fichier d'entrée peut être facilement partitionné et fractionné à travers les limites de ligne, alimentant chaque partie à un processus de travail séparé, par exemple dans un cluster Hadoop, ou simplement différents threads dans une application: calculez les points de partage, par exemple en divisant la longueur par le nombre de travailleurs , puis recherchez la première nouvelle ligne. Nourrissez tout jusqu'à ce point à un travailleur distinct.

Utilisation d'un StreamReader

La façon d'allouer-y pour ce faire serait d'utiliser un TextReader, de lire une ligne à la fois et de l'analyser avec JsonSerializer.Deserialize :

using var reader=new StreamReader(stream);
string line;
//ReadLineAsync() doesn't accept a CancellationToken 
while((line=await reader.ReadLineAsync()) != null)
{
    var item=JsonSerializer.Deserialize<T>(line);
    yield return item;

    if(cancellationToken.IsCancellationRequested)
    {
        return;
    }
}

C'est beaucoup plus simple que le code qui désérialise un tableau approprié. Il y a deux problèmes:

  • ReadLineAsync n'accepte pas de jeton d'annulation
  • Chaque itération alloue une nouvelle chaîne, l'une des choses que nous voulions éviter en utilisant System.Text.Json

Cela peut être suffisant, car essayer de produire les ReadOnlySpan<Byte>tampons nécessaires à JsonSerializer.Deserialize n'est pas anodin.

Pipelines et SequenceReader

Pour éviter les allocations, nous devons obtenir un ReadOnlySpan<byte>du flux. Pour ce faire, vous devez utiliser les canaux System.IO.Pipeline et la structure SequenceReader . Une introduction de Steve Gordon à SequenceReader explique comment cette classe peut être utilisée pour lire les données d'un flux à l'aide de délimiteurs.

Malheureusement, SequenceReaderc'est une structure ref qui signifie qu'elle ne peut pas être utilisée dans les méthodes asynchrones ou locales. Voilà pourquoi Steve Gordon dans son article crée un

private static SequencePosition ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

pour lire les éléments à partir d'une ReadOnlySequence et renvoyer la position de fin, afin que le PipeReader puisse en reprendre. Malheureusement, nous voulons retourner un IEnumerable ou IAsyncEnumerable, et les méthodes d'itérateur n'aiment pas inouout paramètres non plus .

Nous pourrions collecter les éléments désérialisés dans une liste ou une file d'attente et les renvoyer en tant que résultat unique, mais cela allouerait toujours des listes, des tampons ou des nœuds et devrait attendre que tous les éléments d'un tampon soient désérialisés avant de retourner:

private static (SequencePosition,List<T>) ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

Nous avons besoin de quelque chose qui agit comme un énumérable sans nécessiter une méthode d'itérateur, fonctionne avec async et ne met pas tout en mémoire tampon.

Ajout de canaux pour produire un IAsyncEnumerable

ChannelReader.ReadAllAsync renvoie un IAsyncEnumerable. Nous pouvons renvoyer un ChannelReader à partir de méthodes qui ne pouvaient pas fonctionner comme itérateurs et produire toujours un flux d'éléments sans mise en cache.

En adaptant le code de Steve Gordon pour utiliser des canaux, nous obtenons les ReadItems (ChannelWriter ...) et les ReadLastItemméthodes. Le premier, lit un élément à la fois, jusqu'à une nouvelle ligne en utilisant ReadOnlySpan<byte> itemBytes. Cela peut être utilisé par JsonSerializer.Deserialize. SiReadItems ne trouve pas le délimiteur, il renvoie sa position afin que le PipelineReader puisse extraire le morceau suivant du flux.

Lorsque nous atteignons le dernier morceau et qu'il n'y a pas d'autre délimiteur, ReadLastItem` lit les octets restants et les désérialise.

Le code est presque identique à celui de Steve Gordon. Au lieu d'écrire sur la console, nous écrivons sur ChannelWriter.

private const byte NL=(byte)'\n';
private const int MaxStackLength = 128;

private static SequencePosition ReadItems<T>(ChannelWriter<T> writer, in ReadOnlySequence<byte> sequence, 
                          bool isCompleted, CancellationToken token)
{
    var reader = new SequenceReader<byte>(sequence);

    while (!reader.End && !token.IsCancellationRequested) // loop until we've read the entire sequence
    {
        if (reader.TryReadTo(out ReadOnlySpan<byte> itemBytes, NL, advancePastDelimiter: true)) // we have an item to handle
        {
            var item=JsonSerializer.Deserialize<T>(itemBytes);
            writer.TryWrite(item);            
        }
        else if (isCompleted) // read last item which has no final delimiter
        {
            var item = ReadLastItem<T>(sequence.Slice(reader.Position));
            writer.TryWrite(item);
            reader.Advance(sequence.Length); // advance reader to the end
        }
        else // no more items in this sequence
        {
            break;
        }
    }

    return reader.Position;
}

private static T ReadLastItem<T>(in ReadOnlySequence<byte> sequence)
{
    var length = (int)sequence.Length;

    if (length < MaxStackLength) // if the item is small enough we'll stack allocate the buffer
    {
        Span<byte> byteBuffer = stackalloc byte[length];
        sequence.CopyTo(byteBuffer);
        var item=JsonSerializer.Deserialize<T>(byteBuffer);
        return item;        
    }
    else // otherwise we'll rent an array to use as the buffer
    {
        var byteBuffer = ArrayPool<byte>.Shared.Rent(length);

        try
        {
            sequence.CopyTo(byteBuffer);
            var item=JsonSerializer.Deserialize<T>(byteBuffer);
            return item;
        }
        finally
        {
            ArrayPool<byte>.Shared.Return(byteBuffer);
        }

    }    
}

La DeserializeToChannel<T>méthode crée un lecteur Pipeline au-dessus du flux, crée un canal et démarre une tâche de travail qui analyse les morceaux et les pousse vers le canal:

ChannelReader<T> DeserializeToChannel<T>(Stream stream, CancellationToken token)
{
    var pipeReader = PipeReader.Create(stream);    
    var channel=Channel.CreateUnbounded<T>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
        while (!token.IsCancellationRequested)
        {
            var result = await pipeReader.ReadAsync(token); // read from the pipe

            var buffer = result.Buffer;

            var position = ReadItems(writer,buffer, result.IsCompleted,token); // read complete items from the current buffer

            if (result.IsCompleted) 
                break; // exit if we've read everything from the pipe

            pipeReader.AdvanceTo(position, buffer.End); //advance our position in the pipe
        }

        pipeReader.Complete(); 
    },token)
    .ContinueWith(t=>{
        pipeReader.Complete();
        writer.TryComplete(t.Exception);
    });

    return channel.Reader;
}

ChannelReader.ReceiveAllAsync()peut être utilisé pour consommer tous les articles via IAsyncEnumerable<T>:

var reader=DeserializeToChannel<MyEvent>(stream,cts.Token);
await foreach(var item in reader.ReadAllAsync(cts.Token))
{
    //Do something with it 
}    
Panagiotis Kanavos
la source
0

Il semble que vous ayez besoin d'implémenter votre propre lecteur de flux. Vous devez lire les octets un par un et vous arrêter dès que la définition d'objet est terminée. Il est en effet assez bas niveau. En tant que tel, vous ne chargez PAS le fichier entier dans la RAM, mais prenez plutôt la partie avec laquelle vous traitez. Semble-t-il être une réponse?

Sereja Bogolubov
la source
-2

Vous pourriez peut-être utiliser le Newtonsoft.Jsonsérialiseur? https://www.newtonsoft.com/json/help/html/Performance.htm

Voir en particulier la section:

Optimiser l'utilisation de la mémoire

Éditer

Vous pouvez essayer de désérialiser les valeurs de JsonTextReader, par exemple

using (var textReader = new StreamReader(stream))
using (var reader = new JsonTextReader(textReader))
{
    while (await reader.ReadAsync(cancellationToken))
    {
        yield return reader.Value;
    }
}
Miłosz Wieczorek
la source
Cela ne répond pas à la question. Il ne s'agit pas du tout de performances, il s'agit d'un accès en streaming sans tout charger en mémoire
Panagiotis Kanavos
Avez-vous ouvert le lien connexe ou simplement dit ce que vous pensez? Dans le lien que j'ai envoyé dans la section que j'ai mentionnée, il y a un extrait de code expliquant comment désérialiser JSON du flux.
Miłosz Wieczorek
Relisez la question s'il vous plaît - l'OP demande comment traiter les éléments sans désérialiser tout en mémoire. Pas seulement lire à partir d'un flux, mais uniquement traiter ce qui provient du flux. I don't want them to be in memory all at once, but I would rather read and process them one by one.La classe appropriée dans JSON.NET est JsonTextReader.
Panagiotis Kanavos
Dans tous les cas, une réponse de lien uniquement n'est pas considérée comme une bonne réponse, et rien dans ce lien ne répond à la question du PO. Un lien vers JsonTextReader serait mieux
Panagiotis Kanavos