Comment envoyer des messages volumineux avec Kafka (plus de 15 Mo)?

118

J'envoie des messages String à Kafka V. 0.8 avec l'API Java Producer. Si la taille du message est d'environ 15 Mo, j'obtiens un MessageSizeTooLargeException. J'ai essayé de régler message.max.bytesà 40 Mo, mais j'obtiens toujours l'exception. Les petits messages fonctionnaient sans problème.

(L'exception apparaît dans le producteur, je n'ai pas de consommateur dans cette application.)

Que puis-je faire pour me débarrasser de cette exception?

Mon exemple de configuration de producteur

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

Journal des erreurs:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
Sonson123
la source
5
Mon premier instinct serait de vous demander de diviser cet énorme message en plusieurs plus petits: - / Je suppose que ce n'est pas possible pour une raison quelconque, mais vous voudrez peut-être le reconsidérer néanmoins: des messages énormes signifient généralement qu'il y a un défaut de conception quelque part qui devrait vraiment être corrigé.
Aaron Digulla
1
Merci, mais cela rendrait ma logique beaucoup plus complexe. Pourquoi est-ce une mauvaise idée d'utiliser Kafka pour des messages d'environ 15 Mo? La taille maximale des messages pouvant être utilisée est-elle de 1 Mo? Je n'ai pas trouvé grand-chose sur la limite de taille des messages dans la documentation de Kafka.
Sonson123 le
2
Ceci n'a aucun rapport avec Kafka ou tout autre système de traitement de messages. Mon raisonnement: si quelque chose ne va pas avec votre fichier de 15 Mo, le nettoyage du désordre par la suite est très coûteux. C'est pourquoi je divise généralement les gros fichiers en plusieurs petits travaux (qui peuvent ensuite être exécutés en parallèle également).
Aaron Digulla
avez-vous utilisé une compression? pourriez-vous s'il vous plaît partager quelques détails, c'est un peu difficile de deviner quelque chose d'un seul mot
user2720864

Réponses:

181

Vous devez ajuster trois (ou quatre) propriétés:

  • Côté consommateur: fetch.message.max.bytes- cela déterminera la plus grande taille d'un message pouvant être récupérée par le consommateur.
  • Côté courtier: replica.fetch.max.bytes- cela permettra aux répliques dans les courtiers d'envoyer des messages dans le cluster et de s'assurer que les messages sont correctement répliqués. S'il est trop petit, le message ne sera jamais répliqué et par conséquent, le consommateur ne verra jamais le message car le message ne sera jamais validé (entièrement répliqué).
  • Côté courtier: message.max.bytes- c'est la plus grande taille du message qui peut être reçue par le courtier d'un producteur.
  • Côté courtier (par sujet): max.message.bytes- c'est la plus grande taille du message que le courtier autorisera à être ajouté au sujet. Cette taille est validée en pré-compression. (Par défaut, celui du courtier message.max.bytes.)

J'ai découvert à la dure le numéro 2 - vous n'obtenez AUCUNE exception, message ou avertissement de Kafka, alors assurez-vous d'en tenir compte lorsque vous envoyez de gros messages.

rire_homme
la source
3
Ok, vous et user2720864 aviez raison. Je n'avais mis le message.max.bytesdans le code source. Mais je dois définir ces valeurs dans la configuration du serveur Kafka config/server.properties. Désormais, les messages plus volumineux fonctionnent également :).
Sonson123
3
Y a-t-il des inconvénients connus à définir ces valeurs trop élevées?
Ivan Balashov
7
Oui. Du côté du consommateur, vous allouez de la fetch.message.max.bytesmémoire pour CHAQUE partition. Cela signifie que si vous utilisez un grand nombre pour fetch.message.max.bytescombiné avec un grand nombre de partitions, cela consommera beaucoup de mémoire. En fait, comme le processus de réplication entre les courtiers est également un consommateur spécialisé, cela consommera également de la mémoire sur les courtiers.
rire_man
3
Notez qu'il existe également une max.message.bytesconfiguration par sujet qui peut être inférieure à celle du courtier message.max.bytes.
Peter Davis
1
Selon la doc officielle, les paramètres côté consommateur et ceux concernant la réplication entre courtiers /.*fetch.*bytes/ne semblent pas être des limites strictes: "Ce n'est pas un maximum absolu, si [...] plus grand que cette valeur, le lot d'enregistrement sera être renvoyé pour garantir que des progrès peuvent être réalisés. "
Bluu
56

Modifications mineures requises pour Kafka 0.10 et le nouveau consommateur par rapport à la réponse de rire_man :

  • Courtier: Aucun changement, vous devez encore augmenter les propriétés message.max.byteset replica.fetch.max.bytes. message.max.bytesdoit être égal ou inférieur (*) à replica.fetch.max.bytes.
  • Producteur: Augmentez max.request.sizepour envoyer le message plus large.
  • Consommateur: augmentez max.partition.fetch.bytespour recevoir des messages plus volumineux.

(*) Lisez les commentaires pour en savoir plus sur message.max.bytes<=replica.fetch.max.bytes

Sascha Vetter
la source
2
Savez-vous pourquoi message.max.bytesdoit être plus petit que replica.fetch.max.bytes?
Kostas
2
" replica.fetch.max.bytes (par défaut: 1 Mo) - Taille maximale des données qu'un courtier peut répliquer. Cette taille doit être supérieure à message.max.bytes , sinon un courtier acceptera les messages et ne pourra pas les répliquer. perte potentielle de données. " Source: handling-large-messages-kafka
Sascha Vetter
2
Merci de m'avoir répondu avec un lien. Cela semble également faire écho à ce que suggère le guide Cloudera . Cependant, les deux sont faux - notez qu'ils n'offrent aucune raison technique pour expliquer pourquoi replica.fetch.max.bytes devrait être strictement plus grand message.max.bytes. Un employé de Confluent a confirmé plus tôt dans la journée ce que je soupçonnais: que les deux quantités peuvent, en fait, être égales.
Kostas
2
Y a-t-il des mises à jour concernant message.max.bytes<replica.fetch.max.bytesou message.max.bytes=replica.fetch.max.bytes@Kostas?
Sascha Vetter
2
Oui, ils peuvent être égaux: mail-archive.com/[email protected]/msg25494.html (Ismael travaille pour Confluent)
Kostas
13

Vous devez remplacer les propriétés suivantes:

Configurations du courtier ($ KAFKA_HOME / config / server.properties)

  • replica.fetch.max.bytes
  • message.max.bytes

Consumer Configs ($ KAFKA_HOME / config / consumer.properties)
Cette étape n'a pas fonctionné pour moi. Je l'ajoute à l'application grand public et cela fonctionnait bien

  • fetch.message.max.bytes

Redémarrez le serveur.

consultez cette documentation pour plus d'informations: http://kafka.apache.org/08/configuration.html

user2550587
la source
1
pour le consommateur en ligne de commande, je dois utiliser l'indicateur --fetch-size = <bytes>. Il ne semble pas lire le fichier consumer.properties (kafka 0.8.1). Je recommanderais également d'activer la compression du côté producteur à l'aide de l'option compression.codec.
Ziggy Eunicien
Le commentaire de Ziggy a fonctionné pour moi kafka 0.8.1.1. Je vous remercie!
James
se pourrait-il que fetch.message.max.bytes soit remplacé par max.partition.fetch.bytes dans ConsumerConfig?
s_bei
12

L'idée est d'avoir une taille égale de message envoyé du producteur Kafka au courtier Kafka, puis reçu par Kafka Consumer ie

Producteur Kafka -> Kafka Broker -> Kafka Consumer

Supposons que si l'exigence est d'envoyer 15 Mo de message, le producteur , le courtier et le consommateur , tous les trois, doivent être synchronisés.

Kafka Producer envoie 15 Mo -> Kafka Broker autorise / stocke 15 Mo -> Kafka Consumer reçoit 15 Mo

Le paramètre doit donc être:

a) sur le courtier:

message.max.bytes=15728640 
replica.fetch.max.bytes=15728640

b) sur le consommateur:

fetch.message.max.bytes=15728640
Ravi
la source
2
se pourrait-il que fetch.message.max.bytes soit remplacé par max.partition.fetch.bytes dans ConsumerConfig?
s_bei
7

Un élément clé à retenir que cet message.max.bytesattribut doit être synchronisé avec la fetch.message.max.bytespropriété du consommateur . la taille de l'extraction doit être au moins aussi grande que la taille maximale du message sinon il pourrait y avoir une situation où les producteurs peuvent envoyer des messages plus grands que le consommateur ne peut consommer / récupérer. Cela pourrait valoir la peine d'y jeter un coup d'œil.
Quelle version de Kafka utilisez-vous? Fournissez également plus de détails sur la trace que vous obtenez. y a-t-il quelque chose comme ... payload size of xxxx larger than 1000000dans le journal?

utilisateur2720864
la source
1
J'ai mis à jour ma question avec plus d'informations: Kafka Version 2.8.0-0.8.0; maintenant je n'ai besoin que du producteur.
Sonson123
6

La réponse de @laughing_man est assez précise. Mais quand même, je voulais donner une recommandation que j'ai apprise de l'expert Kafka Stephane Maarek de Quora.

Kafka n'est pas destiné à gérer des messages volumineux.

Votre API doit utiliser le stockage dans le cloud (Ex AWS S3) et simplement transmettre à Kafka ou à tout courtier de messages une référence de S3. Vous devez trouver un endroit pour conserver vos données, peut-être que c'est un lecteur réseau, peut-être que c'est n'importe quoi, mais cela ne devrait pas être un courtier de messages.

Maintenant, si vous ne voulez pas utiliser la solution ci-dessus

La taille maximale du message est de 1 Mo (le paramètre de vos courtiers s'appelle message.max.bytes) Apache Kafka . Si vous en aviez vraiment besoin, vous pouvez augmenter cette taille et vous assurer d'augmenter les tampons réseau pour vos producteurs et consommateurs.

Et si vous vous souciez vraiment de diviser votre message, assurez-vous que chaque message partagé a exactement la même clé afin qu'il soit poussé vers la même partition, et le contenu de votre message doit signaler un «identifiant de partie» afin que votre consommateur puisse entièrement reconstruire le message .

Vous pouvez également explorer la compression, si votre message est basé sur du texte (compression gzip, snappy, lz4), ce qui peut réduire la taille des données, mais pas par magie.

Encore une fois, vous devez utiliser un système externe pour stocker ces données et simplement pousser une référence externe à Kafka. C'est une architecture très courante, que vous devriez adopter et largement acceptée.

Gardez cela à l'esprit que Kafka ne fonctionne mieux que si les messages sont énormes en quantité mais pas en taille.

Source: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka

Bhanu Hoysala
la source
4
Vous pouvez noter que "votre" recommandation est une copie presque mot pour mot de la recommandation Quora de Stéphane Maarek sur quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka
Mike
Kafka fonctionne avec des messages volumineux, absolument aucun problème. La page d'introduction sur la page d'accueil de Kafka y fait même référence en tant que système de stockage.
calloc_org
3

Pour les personnes utilisant landoop kafka: vous pouvez transmettre les valeurs de configuration dans les variables d'environnement comme:

docker run -d --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083  -p 9581-9585:9581-9585 -p 9092:9092
 -e KAFKA_TOPIC_MAX_MESSAGE_BYTES=15728640 -e KAFKA_REPLICA_FETCH_MAX_BYTES=15728640  landoop/fast-data-dev:latest `

Et si vous utilisez rdkafka, passez le message.max.bytes dans la configuration du producteur comme:

  const producer = new Kafka.Producer({
        'metadata.broker.list': 'localhost:9092',
        'message.max.bytes': '15728640',
        'dr_cb': true
    });

De même, pour le consommateur,

  const kafkaConf = {
   "group.id": "librd-test",
   "fetch.message.max.bytes":"15728640",
   ... .. }                                                                                                                                                                                                                                                      
informateur
la source