Consommez à nouveau le même message si le traitement du message échoue

10

J'utilise la version 1.3.0 du client Confluent.Kafka .NET. Je suis les docs :

var consumerConfig = new ConsumerConfig
{
    BootstrapServers = "server1, server2",
    AutoOffsetReset = AutoOffsetReset.Earliest,
    EnableAutoCommit = true,
    EnableAutoOffsetStore = false,
    GroupId = this.groupId,
    SecurityProtocol = SecurityProtocol.SaslPlaintext,
    SaslMechanism = SaslMechanism.Plain,
    SaslUsername = this.kafkaUsername,
    SaslPassword = this.kafkaPassword,
};

using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
    var cancellationToken = new CancellationTokenSource();
    Console.CancelKeyPress += (_, e) =>
    {
        e.Cancel = true;
        cancellationToken.Cancel();
    };

    consumer.Subscribe("my-topic");
    while (true)
    {
        try
        {
            var consumerResult = consumer.Consume();
            // process message
            consumer.StoreOffset(consumerResult);
        }
        catch (ConsumeException e)
        {
            // log
        }
        catch (KafkaException e)
        {
            // log
        }
        catch (OperationCanceledException e)
        {
            // log
        }
    }
}

Le problème est que même si je commente la ligne consumer.StoreOffset(consumerResult);, je continue à recevoir le prochain message non consommé la prochaine fois que je consomme , c'est-à-dire que le décalage continue d'augmenter, ce qui ne semble pas être ce que la documentation prétend faire, c'est- à- dire au moins une livraison .

Même si je définis EnableAutoCommit = falseet supprime «EnableAutoOffsetStore = false» de la configuration et consumer.StoreOffset(consumerResult)que consumer.Commit()je remplace par , je vois toujours le même comportement, c'est-à-dire que même si je commente le Commit, je continue à recevoir les prochains messages non consommés.

J'ai l'impression de manquer quelque chose de fondamental ici, mais je ne sais pas quoi. Toute aide est appréciée!

havij
la source
Les messages ont déjà été renvoyés à l'application du point de vue kafka, donc lorsque vous validez, ils sont enregistrés en tant que derniers décalages validés, mais consume continuera de renvoyer les messages suivants, que vous en ayez consommé ou non. Quelle est votre attente ici? Pourriez-vous s'il vous plaît expliquer ce que vous attendez avant / après la validation et la consommation?
Sagar Veeram
Les messages ne sont pas récupérés tant que vous n'utilisez pas la recherche de décalage. Cela affectera la consommation et les messages seront renvoyés par le décalage de recherche.
Sagar Veeram
@ user2683814 Dans mon article, j'ai mentionné deux scénarios en fonction de ce qui EnableAutoCommitest défini sur. Disons que nous avons EnableAutoCommit = false, et quand je Consume, je récupère le message avec l'offset 11. Je m'attendais à continuer à recevoir le même message avec l'offset 11 encore et encore si le traitement du message continue à être lancé et donc pas d'appel à Commit.
havij
Non, ce n'est pas le cas. Vous ne pouvez pas contrôler quoi poll ( Consume) en utilisant une Commitfois que vous avez déjà Subscribeau sujet .. Kafka (comme dans la bibliothèque cliente) derrière la scène conserve tous les décalages qu'il a envoyés à l'application dans le Consumeet il les enverra linéairement. Donc, pour retraiter un message comme dans un scénario d'échec, vous devez les suivre dans votre code et chercher à compenser et commencer à traiter le message et vous devez également savoir quoi ignorer s'il a déjà été traité dans des demandes antérieures. Je ne connais pas la bibliothèque .net mais cela ne devrait pas vraiment avoir d'importance car il s'agit de la conception de kafka.
Sagar Veeram
Je pense que vous devez utiliser une combinaison d'abonnement et d'attribution et que vous aurez peut-être besoin de différents consommateurs pour prendre en charge votre cas d'utilisation. En cas d'échecs, utilisez l'attribution / la recherche pour compenser les partitions de sujet avec un consommateur pour retraiter les messages et pour le traitement normal, utilisez un autre consommateur avec le flux d'abonnement / de consommation / de validation.
Sagar Veeram

Réponses:

0

Vous voudrez peut-être avoir une logique de réessai pour traiter chacun de vos messages pour un nombre fixe de fois comme par exemple 5. Si cela ne réussit pas pendant ces 5 tentatives, vous voudrez peut-être ajouter ce message à une autre rubrique pour gérer tous les messages ayant échoué qui ont priorité sur votre sujet actuel. Ou vous pouvez ajouter le message ayant échoué au même sujet afin qu'il soit récupéré plus tard une fois que tous ces autres messages seront consommés.

Si le traitement d'un message réussit au cours de ces 5 tentatives, vous pouvez passer au message suivant dans la file d'attente.

Raju Dasupally
la source