Qu'est-ce qui détermine la compensation des consommateurs Kafka?

170

Je suis relativement nouveau à Kafka. J'ai fait un peu d'expérimentation avec cela, mais certaines choses ne sont pas claires pour moi concernant la compensation des consommateurs. D'après ce que j'ai compris jusqu'à présent, lorsqu'un consommateur démarre, le décalage à partir duquel il commencera à lire est déterminé par le paramètre de configuration auto.offset.reset(corrigez-moi si je me trompe).

Maintenant, disons par exemple qu'il y a 10 messages (décalages de 0 à 9) dans le sujet, et qu'un consommateur en a consommé 5 avant qu'il ne tombe en panne (ou avant que je ne tue le consommateur). Puis dites que je redémarre ce processus de consommation. Mes questions sont:

  1. Si le auto.offset.resetest défini sur smallest, va-t-il toujours commencer à consommer à partir de l'offset 0?

  2. Si le auto.offset.resetest défini sur largest, va-t-il commencer à consommer à partir de l'offset 5?

  3. Le comportement vis-à-vis de ce type de scénario est-il toujours déterministe?

N'hésitez pas à commenter si quelque chose dans ma question n'est pas clair. Merci d'avance.

Asif Iqbal
la source

Réponses:

260

C'est un peu plus complexe que vous ne l'avez décrit.
La auto.offset.resetconfiguration démarre UNIQUEMENT si votre groupe de consommateurs n'a pas de décalage valide commis quelque part (2 stockages de décalage pris en charge sont maintenant Kafka et Zookeeper), et cela dépend également du type de consommateur que vous utilisez.

Si vous utilisez un consommateur Java de haut niveau, imaginez les scénarios suivants:

  1. Vous avez un consommateur dans un groupe de consommateurs group1qui a consommé 5 messages et est décédé. La prochaine fois que vous démarrez ce consommateur, il n'utilisera même pas cette auto.offset.resetconfiguration et continuera à partir de l'endroit où il est mort car il récupérera simplement le décalage stocké dans le stockage offset (Kafka ou ZK comme je l'ai mentionné).

  2. Vous avez des messages dans un sujet (comme vous l'avez décrit) et vous démarrez un consommateur dans un nouveau groupe de consommateurs group2. Il n'y a pas de décalage stocké nulle part et cette fois, la auto.offset.resetconfiguration décidera de commencer par le début du sujet ( earliest) ou de la fin du sujet ( latest)

Une autre chose qui affecte la valeur de décalage à laquelle correspondra earliestet les latestconfigurations est la politique de rétention des journaux. Imaginez que vous avez un sujet avec une rétention configurée sur 1 heure. Vous produisez 5 messages, puis une heure plus tard, vous publiez 5 autres messages. Le latestdécalage restera toujours le même que dans l'exemple précédent, mais earliestcelui-ci ne pourra pas l'être 0car Kafka supprimera déjà ces messages et donc le premier décalage disponible sera 5.

Tout ce qui est mentionné ci-dessus n'est pas lié SimpleConsumeret chaque fois que vous l'exécutez, il décidera par où commencer à utiliser la auto.offset.resetconfiguration.

Si vous utilisez la version Kafka plus de 0,9, il faut remplacer earliest, latestavec smallest, largest.

serejja
la source
3
Merci beaucoup pour la réponse. Donc, pour le consommateur de haut niveau, une fois qu'un consommateur a commis quelque chose (que ce soit en ZK ou en Kafka), auto.offset.resetcela n'a plus de signification par la suite? La seule signification de ce paramètre est quand il n'y a rien d'engagement (et idéalement ce serait au premier démarrage du consommateur)?
Asif Iqbal
2
Exactement comme vous l'avez décrit
serejja
1
@serejja Bonjour - que diriez-vous si j'ai toujours 1 consommateur par groupe, et que le scénario n ° 1 de votre réponse se produit pour moi? Serait-ce la même chose?
ha9u63ar
1
@ ha9u63ar n'a pas très bien compris votre question. Si vous redémarrez votre consommateur dans le même groupe, oui, il ne l'utilisera pas auto.offset.resetet continuera à partir du décalage validé. Si vous utilisez toujours un groupe de consommateurs différent (comme le générer lors du démarrage du consommateur), le consommateur respectera toujoursauto.offset.reset
serejja
@serejja oui et cela ne fonctionne pas pour moi. pourriez-vous s'il vous plaît jeter un oeil à ceci - c'est mon problème
ha9u63ar
83

Juste une mise à jour: à partir de Kafka 0.9 et les suivants, Kafka utilise une nouvelle version Java du consommateur et les noms des paramètres auto.offset.reset ont changé; À partir du manuel:

Que faire quand il n'y a pas de décalage initial dans Kafka ou si le décalage actuel n'existe plus sur le serveur (par exemple parce que ces données ont été supprimées):

au plus tôt : réinitialise automatiquement le décalage au décalage le plus ancien

dernier : réinitialise automatiquement le décalage au dernier décalage

none : lance une exception au consommateur si aucun décalage précédent n'est trouvé pour le groupe du consommateur

autre chose: jetez une exception au consommateur.

J'ai passé du temps à le trouver après avoir vérifié la réponse acceptée, alors j'ai pensé qu'il pourrait être utile pour la communauté de l'afficher.

Israël Zinc
la source
9

De plus, il y a offsets.retention.minutes. Si le temps écoulé depuis le dernier commit est> offsets.retention.minutes, alors auto.offset.resetdémarre également

Sasa Ninkovic
la source
1
cela ne semble-t-il pas redondant avec la rétention des journaux? la rétention ofset doit-elle être basée sur la rétention des journaux?
mike01010
@ mike01010 c'est vrai. Il doit être basé sur la rétention des journaux, c'est l'une des solutions proposées dans le ticket. Prolong default value of offsets.retention.minutes to be at least twice larger than log.retention.hours. issues.apache.org/jira/browse/KAFKA-3806
saheb
Cette réponse m'a fait peur pendant un moment, jusqu'à ce que je vérifie la documentation de offsets.retention.minutes: <b> Une fois qu'un groupe de consommateurs perd tous ses consommateurs (c'est-à-dire devient vide), ses compensations seront conservées pendant cette période de conservation avant d'être supprimées. </b> consommateurs (en utilisant l'affectation manuelle), les offsets expireront après l'heure du dernier commit plus cette période de rétention. (C'est pour Kafka 2.3)
jumping_monkey