J'utilise la version 1.3.0 du client Confluent.Kafka .NET. Je suis les docs :
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "server1, server2",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true,
EnableAutoOffsetStore = false,
GroupId = this.groupId,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = this.kafkaUsername,
SaslPassword = this.kafkaPassword,
};
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
var cancellationToken = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cancellationToken.Cancel();
};
consumer.Subscribe("my-topic");
while (true)
{
try
{
var consumerResult = consumer.Consume();
// process message
consumer.StoreOffset(consumerResult);
}
catch (ConsumeException e)
{
// log
}
catch (KafkaException e)
{
// log
}
catch (OperationCanceledException e)
{
// log
}
}
}
Le problème est que même si je commente la ligne consumer.StoreOffset(consumerResult);
, je continue à recevoir le prochain message non consommé la prochaine fois que je consomme , c'est-à-dire que le décalage continue d'augmenter, ce qui ne semble pas être ce que la documentation prétend faire, c'est- à- dire au moins une livraison .
Même si je définis EnableAutoCommit = false
et supprime «EnableAutoOffsetStore = false» de la configuration et consumer.StoreOffset(consumerResult)
que consumer.Commit()
je remplace par , je vois toujours le même comportement, c'est-à-dire que même si je commente le Commit
, je continue à recevoir les prochains messages non consommés.
J'ai l'impression de manquer quelque chose de fondamental ici, mais je ne sais pas quoi. Toute aide est appréciée!
EnableAutoCommit
est défini sur. Disons que nous avonsEnableAutoCommit = false
, et quand jeConsume
, je récupère le message avec l'offset 11. Je m'attendais à continuer à recevoir le même message avec l'offset 11 encore et encore si le traitement du message continue à être lancé et donc pas d'appel àCommit
.Consume
) en utilisant uneCommit
fois que vous avez déjàSubscribe
au sujet .. Kafka (comme dans la bibliothèque cliente) derrière la scène conserve tous les décalages qu'il a envoyés à l'application dans leConsume
et il les enverra linéairement. Donc, pour retraiter un message comme dans un scénario d'échec, vous devez les suivre dans votre code et chercher à compenser et commencer à traiter le message et vous devez également savoir quoi ignorer s'il a déjà été traité dans des demandes antérieures. Je ne connais pas la bibliothèque .net mais cela ne devrait pas vraiment avoir d'importance car il s'agit de la conception de kafka.Réponses:
Désolé, je ne peux pas encore ajouter de commentaire. Le consommateur Kafka consomme des messages par lots, alors peut-être que vous parcourez toujours le lot pré-récupéré par le fil d'arrière-plan .
Vous pouvez vérifier si votre consommateur s'engage vraiment à compenser ou non en utilisant kafka util
kafka-consumer-groups.sh
la source
Vous voudrez peut-être avoir une logique de réessai pour traiter chacun de vos messages pour un nombre fixe de fois comme par exemple 5. Si cela ne réussit pas pendant ces 5 tentatives, vous voudrez peut-être ajouter ce message à une autre rubrique pour gérer tous les messages ayant échoué qui ont priorité sur votre sujet actuel. Ou vous pouvez ajouter le message ayant échoué au même sujet afin qu'il soit récupéré plus tard une fois que tous ces autres messages seront consommés.
Si le traitement d'un message réussit au cours de ces 5 tentatives, vous pouvez passer au message suivant dans la file d'attente.
la source