J'utilise apache kafka pour la messagerie. J'ai implémenté le producteur et le consommateur en Java. Comment pouvons-nous obtenir le nombre de messages dans un sujet?
la source
J'utilise apache kafka pour la messagerie. J'ai implémenté le producteur et le consommateur en Java. Comment pouvons-nous obtenir le nombre de messages dans un sujet?
Le seul moyen qui me vient à l'esprit pour cela du point de vue du consommateur est de réellement consommer les messages et de les compter ensuite.
Le courtier Kafka expose les compteurs JMX pour le nombre de messages reçus depuis le démarrage, mais vous ne pouvez pas savoir combien d'entre eux ont déjà été purgés.
Dans la plupart des scénarios courants, les messages dans Kafka sont mieux perçus comme un flux infini et l'obtention d'une valeur discrète du nombre de messages actuellement conservés sur le disque n'est pas pertinente. De plus, les choses se compliquent lorsqu'il s'agit d'un groupe de courtiers qui ont tous un sous-ensemble de messages dans un sujet.
Ce n'est pas Java, mais peut être utile
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <broker>: <port> --topic <topic-name> --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'
la source
bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -1 | awk -F ":" '{sum += $3} END {print sum}' 13818663 bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -2 | awk -F ":" '{sum += $3} END {print sum}' 12434609
Et puis la différence renvoie les messages en attente réels dans le sujet? Ai-je raison?J'utilise en fait cela pour évaluer mon POC. L'élément que vous souhaitez utiliser ConsumerOffsetChecker. Vous pouvez l'exécuter en utilisant le script bash comme ci-dessous.
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group testgroup
Et ci-dessous est le résultat: comme vous pouvez le voir sur la boîte rouge, 999 est le nombre de messages actuellement dans le sujet.
Mise à jour: ConsumerOffsetChecker est obsolète depuis la 0.10.0, vous souhaiterez peut-être commencer à utiliser ConsumerGroupCommand.
la source
Parfois, l'intérêt est de connaître le nombre de messages dans chaque partition, par exemple, lors du test d'un partitionneur personnalisé.Les étapes suivantes ont été testées pour fonctionner avec Kafka 0.10.2.1-2 de Confluent 3.2. Étant donné un sujet Kafka
kt
et la ligne de commande suivante:$ kafka-run-class kafka.tools.GetOffsetShell \ --broker-list host01:9092,host02:9092,host02:9092 --topic kt
Cela imprime l'exemple de sortie montrant le nombre de messages dans les trois partitions:
kt:2:6138 kt:1:6123 kt:0:6137
Le nombre de lignes peut être supérieur ou inférieur en fonction du nombre de partitions pour le sujet.
la source
Puisque
ConsumerOffsetChecker
n'est plus pris en charge, vous pouvez utiliser cette commande pour vérifier tous les messages de la rubrique:bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \ --group my-group \ --bootstrap-server localhost:9092 \ --describe
Où
LAG
est le nombre de messages dans la partition de rubrique:Vous pouvez également essayer d'utiliser kafkacat . Il s'agit d'un projet open source qui peut vous aider à lire les messages d'une rubrique et d'une partition et à les imprimer sur stdout. Voici un exemple qui lit les 10 derniers messages du
sample-kafka-topic
sujet, puis quittez:kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e
la source
Utilisez https://prestodb.io/docs/current/connector/kafka-tutorial.html
Un moteur super SQL, fourni par Facebook, qui se connecte sur plusieurs sources de données (Cassandra, Kafka, JMX, Redis ...).
PrestoDB fonctionne en tant que serveur avec des workers optionnels (il existe un mode autonome sans workers supplémentaires), puis vous utilisez un petit exécutable JAR (appelé presto CLI) pour effectuer des requêtes.
Une fois que vous avez bien configuré le serveur Presto, vous pouvez utiliser le SQL traditionnel:
SELECT count(*) FROM TOPIC_NAME;
la source
Commande Apache Kafka pour obtenir des messages non gérés sur toutes les partitions d'un sujet:
kafka-run-class kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group test_group
Impressions:
Group Topic Pid Offset logSize Lag Owner test_group test 0 11051 11053 2 none test_group test 1 10810 10812 2 none test_group test 2 11027 11028 1 none
La colonne 6 contient les messages non traités. Additionnez-les comme ceci:
kafka-run-class kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} END {print sum}'
awk lit les lignes, saute la ligne d'en-tête et ajoute la 6ème colonne et à la fin imprime la somme.
Tirages
5
la source
Pour obtenir tous les messages stockés pour le sujet, vous pouvez rechercher le consommateur au début et à la fin du flux pour chaque partition et additionner les résultats
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream() .map(p -> new TopicPartition(topic, p.partition())) .collect(Collectors.toList()); consumer.assign(partitions); consumer.seekToEnd(Collections.emptySet()); Map<TopicPartition, Long> endPartitions = partitions.stream() .collect(Collectors.toMap(Function.identity(), consumer::position)); consumer.seekToBeginning(Collections.emptySet()); System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());
la source
Exécutez ce qui suit (en supposant qu'il se
kafka-console-consumer.sh
trouve sur le chemin):kafka-console-consumer.sh --from-beginning \ --bootstrap-server yourbroker:9092 --property print.key=true \ --property print.value=false --property print.partition \ --topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
la source
--new-consumer
car cette option n'est plus disponible (ou apparemment nécessaire)En utilisant le client Java de Kafka 2.11-1.0.0, vous pouvez effectuer les opérations suivantes:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test")); while(true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); // after each message, query the number of messages of the topic Set<TopicPartition> partitions = consumer.assignment(); Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions); for(TopicPartition partition : offsets.keySet()) { System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition)); } } }
La sortie est quelque chose comme ceci:
offset = 10, key = null, value = un partition test is at 13 offset = 11, key = null, value = deux partition test is at 13 offset = 12, key = null, value = trois partition test is at 13
la source
seekToEnd(..)
et desseekToBeginning(..)
méthodes qui changent l'état duconsumer
.Dans les versions les plus récentes de Kafka Manager, il existe une colonne intitulée Summed Recent Offsets .
la source
J'avais cette même question et voici comment je le fais, d'un KafkaConsumer, à Kotlin:
Code très approximatif, car je viens de faire fonctionner cela, mais en gros, vous voulez soustraire le décalage de début du sujet du décalage de fin et ce sera le nombre de messages actuel pour le sujet.
Vous ne pouvez pas vous fier uniquement au décalage de fin à cause d'autres configurations (politique de nettoyage, rétention-ms, etc.) qui peuvent finir par provoquer la suppression d'anciens messages de votre rubrique. Les décalages "avancent" uniquement, c'est donc le décalage initial qui se rapproche du décalage de fin (ou éventuellement de la même valeur, si le sujet ne contient aucun message pour le moment).
Fondamentalement, le décalage de fin représente le nombre total de messages qui sont passés par cette rubrique et la différence entre les deux représente le nombre de messages que la rubrique contient actuellement.
la source
Extraits de documents Kafka
Dépréciations dans 0.9.0.0
Le kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) est obsolète. À l'avenir, veuillez utiliser kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) pour cette fonctionnalité.
J'utilise le courtier Kafka avec SSL activé pour le serveur et le client. Ci-dessous la commande que j'utilise
kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x
où / tmp / ssl_config est comme ci-dessous
la source
Si vous avez accès à l'interface JMX du serveur, les décalages de début et de fin sont présents à:
(vous devez remplacer
TOPICNAME
&PARTITIONNUMBER
). Gardez à l'esprit que vous devez vérifier chacune des répliques d'une partition donnée, ou vous devez savoir lequel des courtiers est le leader pour une partition donnée (et cela peut changer avec le temps).Vous pouvez également utiliser les méthodes Kafka Consumer
beginningOffsets
etendOffsets
.la source
Je n'ai pas essayé cela moi - même, mais cela semble logique.
Vous pouvez également utiliser
kafka.tools.ConsumerOffsetChecker
( source ).la source
Le moyen le plus simple que j'ai trouvé est d'utiliser l'API REST Kafdrop
/topic/topicName
et de spécifier la clé:"Accept"
/ value:"application/json"
header afin de récupérer une réponse JSON.Ceci est documenté ici .
la source
Vous pouvez utiliser kafkatool . Veuillez vérifier ce lien -> http://www.kafkatool.com/download.html
la source