Existe-t-il un moyen de purger le sujet dans kafka?
J'ai poussé un message trop gros dans un sujet de message kafka sur ma machine locale, maintenant j'obtiens une erreur:
kafka.common.InvalidMessageSizeException: invalid message size
Augmenter le fetch.size
n'est pas idéal ici, car je ne veux pas vraiment accepter de messages aussi gros.
apache-kafka
purge
Peter Klipfel
la source
la source
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
--delete-config retention.ms
e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000
Cela vous permet également de vérifier la période de rétention actuelle, par exemple kafka-configs --zookeeper <zkhost>: 2181 --describe --entity-type topics --entity-name <topic name>Pour purger la file d'attente, vous pouvez supprimer le sujet:
puis recréez-le:
la source
delete.topic.enable=true
dans le fichierconfig/server.properties
, comme le dit l'avertissement imprimé par la commande mentionnéeNote: This will have no impact if delete.topic.enable is not set to true.
Voici les étapes que je suis pour supprimer un sujet nommé
MyTopic
:rm -rf /tmp/kafka-logs/MyTopic-0
. Répétez pour les autres partitions et toutes les répliqueszkCli.sh
puisrmr /brokers/MyTopic
Si vous manquez l'étape 3, Apache Kafka continuera à signaler le sujet comme présent (par exemple, si vous exécutez
kafka-list-topic.sh
).Testé avec Apache Kafka 0.8.0.
la source
./zookeeper-shell.sh localhost:2181
et./kafka-topics.sh --list --zookeeper localhost:2181
zookeeper-client
place dezkCli.sh
(essayé sur Cloudera CDH5)Bien que la réponse acceptée soit correcte, cette méthode est obsolète. La configuration du sujet doit maintenant être effectuée via
kafka-configs
.Les configurations définies via cette méthode peuvent être affichées avec la commande
la source
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --delete-config retention.ms --entity-name MyTopic
Testé dans Kafka 0.8.2, pour l'exemple de démarrage rapide: Tout d'abord, ajoutez une ligne au fichier server.properties sous le dossier config:
ensuite, vous pouvez exécuter cette commande:
la source
Depuis kafka 1.1
Purger un sujet
attendez 1 minute, pour être sûr que kafka purge le sujet, supprimez la configuration, puis passez à la valeur par défaut
la source
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --add-config rentention.ms=100
kafka n'a pas de méthode directe pour purger / nettoyer le sujet (files d'attente), mais peut le faire en supprimant ce sujet et en le recréant.
tout d'abord, assurez-vous que le fichier sever.properties contient et sinon ajoutez
delete.topic.enable=true
puis, Supprimer le sujet
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic
puis créez-le à nouveau.
la source
Parfois, si vous avez un cluster saturé (trop de partitions, ou en utilisant des données de sujet chiffrées, ou en utilisant SSL, ou le contrôleur est sur un mauvais nœud, ou la connexion est irrégulière, il faudra beaucoup de temps pour purger ledit sujet .
Je suis ces étapes, en particulier si vous utilisez Avro.
1: Exécutez avec les outils kafka:
2: Exécuter sur le nœud de registre Schema:
kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning
3: redéfinissez la rétention des sujets sur le paramètre d'origine, une fois que le sujet est vide.
J'espère que cela aide quelqu'un, car ce n'est pas facilement annoncé.
la source
kafka-avro-console-consumer
n'est pas nécessaireMISE À JOUR: Cette réponse est pertinente pour Kafka 0.6. Pour Kafka 0.8 et versions ultérieures, voir la réponse de @Patrick.
Oui, arrêtez kafka et supprimez manuellement tous les fichiers du sous-répertoire correspondant (il est facile de le trouver dans le répertoire de données kafka). Après le redémarrage de kafka, le sujet sera vide.
la source
L'approche la plus simple consiste à définir la date des fichiers journaux individuels pour qu'elle soit antérieure à la période de rétention. Ensuite, le courtier doit les nettoyer et les supprimer pour vous en quelques secondes. Cela offre plusieurs avantages:
D'après mon expérience avec Kafka 0.7.x, la suppression des fichiers journaux et le redémarrage du courtier pourraient entraîner des exceptions de décalage invalides pour certains consommateurs. Cela se produit parce que le courtier redémarre les décalages à zéro (en l'absence de tous les fichiers journaux existants), et un consommateur qui consommait auparavant à partir de la rubrique se reconnecterait pour demander un décalage [une fois valide] spécifique. Si ce décalage tombe en dehors des limites des nouveaux journaux de rubrique, alors aucun dommage et le consommateur reprend au début ou à la fin. Mais, si le décalage tombe dans les limites des nouveaux journaux de rubrique, le courtier tente d'extraire l'ensemble de messages mais échoue car le décalage ne s'aligne pas sur un message réel.
Cela pourrait être atténué en effaçant également les compensations des consommateurs dans zookeeper pour ce sujet. Mais si vous n'avez pas besoin d'un sujet vierge et que vous voulez simplement supprimer le contenu existant, il est beaucoup plus facile et plus fiable de simplement `` toucher '' quelques journaux de sujets, que d'arrêter les courtiers, de supprimer les journaux de sujets et d'effacer certains nœuds de gardien de zoo. .
la source
Les conseils de Thomas sont excellents mais malheureusement
zkCli
dans les anciennes versions de Zookeeper (par exemple 3.3.6) ne semblent pas le supporterrmr
. Par exemple, comparez l'implémentation de la ligne de commande dans Zookeeper moderne avec la version 3.3 .Si vous êtes confronté à une ancienne version de Zookeeper, une solution consiste à utiliser une bibliothèque cliente telle que zc.zk pour Python. Pour les personnes qui ne sont pas familières avec Python, vous devez l'installer à l'aide de pip ou easy_install . Ensuite, démarrez un shell Python (
python
) et vous pouvez faire:ou même
si vous souhaitez supprimer tous les sujets de Kafka.
la source
Pour nettoyer tous les messages d'une rubrique particulière à l'aide de votre groupe d'applications (GroupName doit être identique au nom du groupe kafka de l'application).
./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group
la source
Suite à la réponse @steven appleyard, j'ai exécuté les commandes suivantes sur Kafka 2.2.0 et elles ont fonctionné pour moi.
la source
Beaucoup de bonnes réponses ici, mais parmi elles, je n'en ai pas trouvé sur docker. J'ai passé un certain temps à comprendre que l'utilisation du conteneur de courtier est incorrecte pour ce cas (évidemment !!!)
et j'aurais dû utiliser
zookeeper:2181
au lieu de--zookeeper localhost:2181
selon mon fichier de compositionla commande correcte serait
J'espère que cela fera gagner du temps à quelqu'un.
Sachez également que les messages ne seront pas supprimés immédiatement et que cela se produira lorsque le segment du journal sera fermé.
la source
localhost:2181
... Par exemple, vous ne comprenez pas bien les fonctionnalités réseau de Docker. De plus, tous les conteneurs Zookeeper ne l'ont paskafka-topics
, il est donc préférable de ne pas l'utiliser de cette façon. Les dernières installations de Kafka permettent--bootstrap-servers
de modifier un sujet au lieu de--zookeeper
you can use
--zookeeper zookeeper: 2181` du conteneur Kafka est mon point. Ou même grep la ligne Zookeeper à partir du fichierImpossible d'ajouter en tant que commentaire en raison de la taille: Je ne sais pas si c'est vrai, en plus de la mise à jour de retention.ms et retention.bytes, mais j'ai remarqué que la politique de nettoyage de la rubrique devrait être "delete" (par défaut), si "compact", elle va conserver les messages plus longtemps, c'est-à-dire que s'il est "compact", vous devez également spécifier delete.retention.ms .
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1
Il fallait également surveiller les décalages les plus anciens / les plus récents pour confirmer que cela s'est produit avec succès, peut également vérifier le du -h / tmp / kafka-logs / test-topic-3-100- *
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}' 26599762
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}' 26599762
L'autre problème est, vous devez obtenir config actuelle d' abord si vous vous souvenez de revenir après la suppression est réussie:
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
la source
Une autre approche, plutôt manuelle, pour purger un sujet est:
chez les courtiers:
sudo service kafka stop
sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*
dans zookeeper:
sudo /usr/lib/zookeeper/bin/zkCli.sh
rmr /brokers/topic/<some_topic_name>
dans les courtiers à nouveau:
sudo service kafka start
la source
Cela devrait donner
retention.ms
configuré. Ensuite, vous pouvez utiliser la commande alter ci-dessus pour passer à 1 seconde (et revenir plus tard à la valeur par défaut).la source
Depuis Java, en utilisant le nouveau
AdminZkClient
au lieu du obsolèteAdminUtils
:la source
AdminClient
ouKafkaAdminClient