Existe-t-il un moyen de supprimer toutes les données d'une rubrique ou de supprimer la rubrique avant chaque exécution?
Puis-je modifier le fichier KafkaConfig.scala pour changer la logRetentionHours
propriété? Existe-t-il un moyen de supprimer les messages dès que le consommateur les lit?
J'utilise des producteurs pour récupérer les données de quelque part et envoyer les données à un sujet particulier où un consommateur consomme, puis-je supprimer toutes les données de ce sujet à chaque exécution? Je ne veux que de nouvelles données à chaque fois dans le sujet. Existe-t-il un moyen de réinitialiser le sujet d'une manière ou d'une autre?
Réponses:
Ne pensez pas qu'il est encore pris en charge.Jetez un œil à ce problème JIRA "Ajout de la prise en charge de la suppression de sujets".Pour supprimer manuellement:
log.dir
attribut dans le fichier de configuration kafka ) ainsi que les données du gardien de zooPour un sujet donné, ce que vous pouvez faire est
/tmp/kafka-logs/MyTopic-0
là où/tmp/kafka-logs
est spécifié par l'log.dir
attributC'est
NOT
une bonne approche recommandée, mais elle devrait fonctionner. Dans le fichier de configuration du courtier Kafka, l'log.retention.hours.per.topic
attribut est utilisé pour définirThe number of hours to keep a log file before deleting it for some specific topic
À partir de la documentation Kafka :
Pour trouver le décalage de début à lire dans Kafka 0.8 Exemple de consommateur simple, ils disent
Vous pouvez également y trouver l'exemple de code pour gérer l'offset chez votre consommateur.
la source
brokers/topics/<topic_to_delete>
ainsi que les journaux pour vous en débarrasser.kafka-run-class.sh kafka.admin.DeleteTopicCommand
.kafka-run-class.sh kafka.admin.TopicCommand --delete --topic [topic_to_delete] --zookeeper localhost:2181
Comme je l'ai mentionné ici Purge Kafka Queue :
Testé dans Kafka 0.8.2, pour l'exemple de démarrage rapide: Tout d'abord, ajoutez une ligne au fichier server.properties sous le dossier config:
ensuite, vous pouvez exécuter cette commande:
la source
Testé avec kafka 0.10
Remarque: si vous supprimez le (s) dossier (s) de sujets dans kafka-logs mais pas du dossier zookeeper-data, vous verrez que les sujets sont toujours là.
la source
Pour contourner le problème, vous pouvez ajuster les paramètres de rétention d'exécution par sujet, par exemple
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config retention.bytes=1
( retention.bytes = 0 peut également fonctionner)Après un court instant, kafka devrait libérer de l'espace. Je ne sais pas si cela a des implications par rapport à la recréation du sujet.
ps. Mieux vaut ramener les paramètres de rétention, une fois kafka terminé avec le nettoyage.
Vous pouvez également utiliser
retention.ms
pour conserver les données historiquesla source
Vous trouverez ci-dessous des scripts pour vider et supprimer une rubrique Kafka en supposant que localhost est le serveur zookeeper et que Kafka_Home est défini sur le répertoire d'installation:
Le script ci-dessous videra un sujet en définissant son temps de rétention sur 1 seconde, puis en supprimant la configuration:
Pour supprimer complètement les sujets, vous devez arrêter tous les courtiers kafka applicables et supprimer son ou ses répertoires du répertoire du journal kafka (par défaut: / tmp / kafka-logs), puis exécuter ce script pour supprimer le sujet de zookeeper. Pour vérifier qu'il a été supprimé de zookeeper, la sortie de ls / brokers / topics ne doit plus inclure le sujet:
la source
grep "log.retention.check.interval" $Kafka_Home/config/server.properties
--add config
plutôt le cas--add-config
Nous avons essayé à peu près ce que les autres réponses décrivent avec un niveau de succès modéré. Ce qui a vraiment fonctionné pour nous (Apache Kafka 0.8.1) est la commande de classe
sh kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic yourtopic --zookeeper localhost: 2181
la source
Error: Could not find or load main class kafka.admin.DeleteTopicCommand
Pour les utilisateurs de bière
Si vous utilisez
brew
comme moi et que vous avez perdu beaucoup de temps à chercher le fameuxkafka-logs
dossier, n'ayez plus peur. (et faites-le moi savoir si cela fonctionne pour vous et pour plusieurs versions différentes de Homebrew, Kafka, etc. :))Vous allez probablement le trouver sous:
Emplacement:
/usr/local/var/lib/kafka-logs
Comment trouver réellement ce chemin
(cela est également utile pour pratiquement toutes les applications que vous installez via brew)
1)
brew services list
2) Ouvrez et lisez ce que
plist
vous avez trouvé ci-dessus3) Trouvez la ligne définissant l'
server.properties
emplacement, ouvrez-la, dans mon cas:/usr/local/etc/kafka/server.properties
4) Recherchez la
log.dirs
ligne:5) Allez à cet emplacement et supprimez les journaux des sujets que vous souhaitez
6) Redémarrez Kafka avec
brew services restart kafka
la source
Toutes les données sur les sujets et ses partitions sont stockées dans
tmp/kafka-logs/
. De plus, ils sont stockés dans un formattopic-partionNumber
, donc si vous souhaitez supprimer un sujetnewTopic
, vous pouvez:rm -rf /tmp/kafka-logs/newTopic-*
la source
log.retention.hours
et ajouterlog.retention.ms=1000
. Il garderait le dossier sur Kafka Topic pendant une seconde seulement.log.retention.hours
sur la valeur souhaitée.la source
À partir de la version kafka 2.3.0, il existe un autre moyen de supprimer en douceur Kafka (l'ancienne approche est obsolète).
Mettez à jour retention.ms à 1 sec (1000ms) puis réglez-le à nouveau après une minute, au paramètre par défaut, c'est-à-dire 7 jours (168 heures, 604 800 000 en ms)
Suppression logicielle : - (rentention.ms = 1000) (en utilisant kafka-configs.sh)
Valeur par défaut: - 7 jours (168 heures, retention.ms = 604800000)
la source
En supprimant manuellement un sujet d'un cluster kafka, vous pouvez simplement vérifier ceci https://github.com/darrenfu/bigdata/issues/6 Une étape vitale manquée dans la plupart des solutions consiste à supprimer le
/config/topics/<topic_name>
dans ZK.la source
J'utilise ce script:
la source
J'utilise l'utilitaire ci-dessous pour nettoyer après mon test d'intégration.
Il utilise la dernière
AdminZkClient
api. L'ancienne API est obsolète.Il existe une option de suppression de sujet. Mais, cela marque le sujet de la suppression. Zookeeper supprime plus tard le sujet. Comme cela peut être d'une durée imprévisible, je préfère l'approche retention.ms
la source