J'envoie des messages String à Kafka V. 0.8 avec l'API Java Producer. Si la taille du message est d'environ 15 Mo, j'obtiens un MessageSizeTooLargeException
. J'ai essayé de régler message.max.bytes
à 40 Mo, mais j'obtiens toujours l'exception. Les petits messages fonctionnaient sans problème.
(L'exception apparaît dans le producteur, je n'ai pas de consommateur dans cette application.)
Que puis-je faire pour me débarrasser de cette exception?
Mon exemple de configuration de producteur
private ProducerConfig kafkaConfig() {
Properties props = new Properties();
props.put("metadata.broker.list", BROKERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("message.max.bytes", "" + 1024 * 1024 * 40);
return new ProducerConfig(props);
}
Journal des erreurs:
4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224]
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
java
apache-kafka
Sonson123
la source
la source
Réponses:
Vous devez ajuster trois (ou quatre) propriétés:
fetch.message.max.bytes
- cela déterminera la plus grande taille d'un message pouvant être récupérée par le consommateur.replica.fetch.max.bytes
- cela permettra aux répliques dans les courtiers d'envoyer des messages dans le cluster et de s'assurer que les messages sont correctement répliqués. S'il est trop petit, le message ne sera jamais répliqué et par conséquent, le consommateur ne verra jamais le message car le message ne sera jamais validé (entièrement répliqué).message.max.bytes
- c'est la plus grande taille du message qui peut être reçue par le courtier d'un producteur.max.message.bytes
- c'est la plus grande taille du message que le courtier autorisera à être ajouté au sujet. Cette taille est validée en pré-compression. (Par défaut, celui du courtiermessage.max.bytes
.)J'ai découvert à la dure le numéro 2 - vous n'obtenez AUCUNE exception, message ou avertissement de Kafka, alors assurez-vous d'en tenir compte lorsque vous envoyez de gros messages.
la source
message.max.bytes
dans le code source. Mais je dois définir ces valeurs dans la configuration du serveur Kafkaconfig/server.properties
. Désormais, les messages plus volumineux fonctionnent également :).fetch.message.max.bytes
mémoire pour CHAQUE partition. Cela signifie que si vous utilisez un grand nombre pourfetch.message.max.bytes
combiné avec un grand nombre de partitions, cela consommera beaucoup de mémoire. En fait, comme le processus de réplication entre les courtiers est également un consommateur spécialisé, cela consommera également de la mémoire sur les courtiers.max.message.bytes
configuration par sujet qui peut être inférieure à celle du courtiermessage.max.bytes
./.*fetch.*bytes/
ne semblent pas être des limites strictes: "Ce n'est pas un maximum absolu, si [...] plus grand que cette valeur, le lot d'enregistrement sera être renvoyé pour garantir que des progrès peuvent être réalisés. "Modifications mineures requises pour Kafka 0.10 et le nouveau consommateur par rapport à la réponse de rire_man :
message.max.bytes
etreplica.fetch.max.bytes
.message.max.bytes
doit être égal ou inférieur (*) àreplica.fetch.max.bytes
.max.request.size
pour envoyer le message plus large.max.partition.fetch.bytes
pour recevoir des messages plus volumineux.(*) Lisez les commentaires pour en savoir plus sur
message.max.bytes
<=replica.fetch.max.bytes
la source
message.max.bytes
doit être plus petit quereplica.fetch.max.bytes
?replica.fetch.max.bytes
devrait être strictement plus grandmessage.max.bytes
. Un employé de Confluent a confirmé plus tôt dans la journée ce que je soupçonnais: que les deux quantités peuvent, en fait, être égales.message.max.bytes<replica.fetch.max.bytes
oumessage.max.bytes=replica.fetch.max.bytes
@Kostas?Vous devez remplacer les propriétés suivantes:
Configurations du courtier ($ KAFKA_HOME / config / server.properties)
Consumer Configs ($ KAFKA_HOME / config / consumer.properties)
Cette étape n'a pas fonctionné pour moi. Je l'ajoute à l'application grand public et cela fonctionnait bien
Redémarrez le serveur.
consultez cette documentation pour plus d'informations: http://kafka.apache.org/08/configuration.html
la source
L'idée est d'avoir une taille égale de message envoyé du producteur Kafka au courtier Kafka, puis reçu par Kafka Consumer ie
Producteur Kafka -> Kafka Broker -> Kafka Consumer
Supposons que si l'exigence est d'envoyer 15 Mo de message, le producteur , le courtier et le consommateur , tous les trois, doivent être synchronisés.
Kafka Producer envoie 15 Mo -> Kafka Broker autorise / stocke 15 Mo -> Kafka Consumer reçoit 15 Mo
Le paramètre doit donc être:
a) sur le courtier:
b) sur le consommateur:
la source
Un élément clé à retenir que cet
message.max.bytes
attribut doit être synchronisé avec lafetch.message.max.bytes
propriété du consommateur . la taille de l'extraction doit être au moins aussi grande que la taille maximale du message sinon il pourrait y avoir une situation où les producteurs peuvent envoyer des messages plus grands que le consommateur ne peut consommer / récupérer. Cela pourrait valoir la peine d'y jeter un coup d'œil.Quelle version de Kafka utilisez-vous? Fournissez également plus de détails sur la trace que vous obtenez. y a-t-il quelque chose comme ...
payload size of xxxx larger than 1000000
dans le journal?la source
La réponse de @laughing_man est assez précise. Mais quand même, je voulais donner une recommandation que j'ai apprise de l'expert Kafka Stephane Maarek de Quora.
Kafka n'est pas destiné à gérer des messages volumineux.
Votre API doit utiliser le stockage dans le cloud (Ex AWS S3) et simplement transmettre à Kafka ou à tout courtier de messages une référence de S3. Vous devez trouver un endroit pour conserver vos données, peut-être que c'est un lecteur réseau, peut-être que c'est n'importe quoi, mais cela ne devrait pas être un courtier de messages.
Maintenant, si vous ne voulez pas utiliser la solution ci-dessus
La taille maximale du message est de 1 Mo (le paramètre de vos courtiers s'appelle
message.max.bytes
) Apache Kafka . Si vous en aviez vraiment besoin, vous pouvez augmenter cette taille et vous assurer d'augmenter les tampons réseau pour vos producteurs et consommateurs.Et si vous vous souciez vraiment de diviser votre message, assurez-vous que chaque message partagé a exactement la même clé afin qu'il soit poussé vers la même partition, et le contenu de votre message doit signaler un «identifiant de partie» afin que votre consommateur puisse entièrement reconstruire le message .
Vous pouvez également explorer la compression, si votre message est basé sur du texte (compression gzip, snappy, lz4), ce qui peut réduire la taille des données, mais pas par magie.
Encore une fois, vous devez utiliser un système externe pour stocker ces données et simplement pousser une référence externe à Kafka. C'est une architecture très courante, que vous devriez adopter et largement acceptée.
Gardez cela à l'esprit que Kafka ne fonctionne mieux que si les messages sont énormes en quantité mais pas en taille.
Source: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka
la source
Pour les personnes utilisant landoop kafka: vous pouvez transmettre les valeurs de configuration dans les variables d'environnement comme:
Et si vous utilisez rdkafka, passez le message.max.bytes dans la configuration du producteur comme:
De même, pour le consommateur,
la source