Enregistrement d'événements haute fréquence dans une base de données contrainte de limite de connexion

13

Nous avons une situation où je dois faire face à un afflux massif d'événements arrivant sur notre serveur, à environ 1000 événements par seconde, en moyenne (le pic pourrait être ~ 2000).

Le problème

Notre système est hébergé sur Heroku et utilise une base de données Heroku Postgres relativement chère , qui permet un maximum de 500 connexions DB. Nous utilisons le pool de connexions pour se connecter du serveur à la base de données.

Les événements arrivent plus rapidement que le pool de connexions DB ne peut gérer

Le problème que nous avons est que les événements arrivent plus rapidement que le pool de connexions ne peut les gérer. Au moment où une connexion a terminé l'aller-retour du réseau du serveur à la base de données, afin qu'elle puisse être renvoyée dans le pool, plus d' névénements supplémentaires arrivent.

Finalement, les événements s'empilent, attendant d'être enregistrés et parce qu'il n'y a pas de connexions disponibles dans le pool, ils expirent et l'ensemble du système est rendu non opérationnel.

Nous avons résolu l'urgence en émettant les événements haute fréquence incriminés à un rythme plus lent de la part des clients, mais nous voulons toujours savoir comment gérer ces scénarios dans le cas où nous aurions besoin de gérer ces événements haute fréquence.

Contraintes

D'autres clients pourraient vouloir lire les événements simultanément

D'autres clients demandent continuellement de lire tous les événements avec une clé particulière, même s'ils ne sont pas encore enregistrés dans la base de données.

Un client peut interroger GET api/v1/events?clientId=1et obtenir tous les événements envoyés par le client 1, même si ces événements ne sont pas encore enregistrés dans la base de données.

Existe-t-il des exemples «en classe» sur la façon de gérer cela?

Solutions possibles

Mettre les événements en file d'attente sur notre serveur

Nous pourrions mettre les événements en file d'attente sur le serveur (la file d'attente ayant une concurrence maximale de 400 afin que le pool de connexions ne s'épuise pas).

C'est une mauvaise idée car:

  • Il consommera de la mémoire disponible sur le serveur. Les événements mis en file d'attente empilés consomment d'énormes quantités de RAM.
  • Nos serveurs redémarrent toutes les 24 heures . Il s'agit d'une limite stricte imposée par Heroku. Le serveur peut redémarrer pendant la mise en file d'attente des événements, ce qui nous fait perdre les événements mis en file d'attente.
  • Il introduit l'état sur le serveur, ce qui nuit à l'évolutivité. Si nous avons une configuration multi-serveurs et qu'un client veut lire tous les événements mis en file d'attente + enregistrés, nous ne saurons pas sur quel serveur les événements mis en file d'attente sont en direct.

Utiliser une file d'attente de messages distincte

Je suppose que nous pourrions utiliser une file d'attente de messages (comme RabbitMQ ?), Où nous y pompons les messages et à l'autre extrémité, il y a un autre serveur qui ne s'occupe que de la sauvegarde des événements sur la base de données.

Je ne sais pas si les files d'attente de messages permettent d'interroger les événements mis en file d'attente (qui n'ont pas encore été enregistrés), donc si un autre client veut lire les messages d'un autre client, je peux simplement obtenir les messages enregistrés de la base de données et les messages en attente de la file d'attente et les concaténer ensemble afin que je puisse les renvoyer au client de demande de lecture.

Utilisez plusieurs bases de données, chacune enregistrant une partie des messages avec un serveur central de coordinateur de base de données pour les gérer

Une autre solution que nous avons envisagée consiste à utiliser plusieurs bases de données, avec un "coordinateur DB / équilibreur de charge" central. À la réception d'un événement, ce coordinateur choisirait l'une des bases de données dans lesquelles écrire le message. Cela devrait nous permettre d'utiliser plusieurs bases de données Heroku, augmentant ainsi la limite de connexion à 500 x nombre de bases de données.

Lors d'une requête de lecture, ce coordinateur peut émettre des SELECTrequêtes vers chaque base de données, fusionner tous les résultats et les renvoyer au client qui a demandé la lecture.

C'est une mauvaise idée car:

  • Cette idée ressemble à ... ahem .. une ingénierie excessive? Serait aussi un cauchemar à gérer (sauvegardes etc.). C'est compliqué à construire et à entretenir et à moins que ce ne soit absolument nécessaire, cela ressemble à une violation de KISS .
  • Il sacrifie la cohérence . Faire des transactions sur plusieurs bases de données est un no-go si nous allons avec cette idée.
Nik Kyriakides
la source
3
Où est votre goulot d'étranglement? Vous mentionnez votre pool de connexions, mais cela n'influence que le parallélisme, pas la vitesse par insert. Si vous avez 500 connexions et par exemple 2000QPS, cela devrait fonctionner correctement si chaque requête se termine dans les 250 ms, ce qui est très long. Pourquoi est-ce au-dessus de 15 ms? Notez également qu'en utilisant un PaaS, vous renoncez à d'importantes opportunités d'optimisation, telles que la mise à l'échelle du matériel de la base de données ou l'utilisation de réplicas en lecture pour réduire la charge sur la base de données principale. Heroku n'en vaut pas la peine sauf si le déploiement est votre plus gros problème.
amon
@amon Le goulot d'étranglement est en effet le pool de connexions. J'ai exécuté ANALYZEles requêtes elles-mêmes et elles ne posent aucun problème. J'ai également construit un prototype pour tester l'hypothèse du pool de connexions et vérifié que c'est bien le problème. La base de données et le serveur lui-même vivent sur des machines différentes d'où la latence. De plus, nous ne voulons pas abandonner Heroku à moins que cela ne soit absolument nécessaire, ne pas s'inquiéter des déploiements est un énorme avantage pour nous.
Nik Kyriakides
1
Cela étant dit, je comprends qu'il existe des micro-optimisations qui pourraient m'aider à résoudre le problème actuel . Je me demande s'il existe une solution architecturale évolutive à mon problème.
Nik Kyriakides
3
Comment avez-vous vérifié exactement que le pool de connexions est le problème? @amon a raison dans ses calculs. Essayez d'émettre select nullsur 500 connexions. Je parie que vous constaterez que le pool de connexions n'est pas le problème.
usr
1
Si sélectionner null est problématique, vous avez probablement raison. Bien que ce soit intéressant où tout ce temps est passé. Aucun réseau n'est aussi lent.
usr

Réponses:

9

Flux d'entrée

Il n'est pas clair si vos 1000 événements / seconde représentent des pics ou s'il s'agit d'une charge continue:

  • s'il s'agit d'un pic, vous pouvez utiliser une file d'attente de messages comme tampon pour répartir la charge sur le serveur de base de données sur une période plus longue;
  • s'il s'agit d'une charge constante, la file d'attente de messages seule ne suffit pas, car le serveur de base de données ne pourra jamais rattraper son retard. Ensuite, vous devez penser à une base de données distribuée.

Solution proposée

Intuitivement, dans les deux cas, je choisirais un flux d'événements basé sur Kafka :

  • Tous les événements sont systématiquement publiés sur un sujet kafka
  • Un consommateur souscrirait aux événements et les enregistrerait dans la base de données.
  • Un processeur de requêtes gérera les requêtes des clients et interrogera la base de données.

Ceci est hautement évolutif à tous les niveaux:

  • Si le serveur DB est le goulot d'étranglement, ajoutez simplement plusieurs consommateurs. Chacun peut s'abonner à la rubrique et écrire sur un autre serveur de base de données. Toutefois, si la distribution se produit de manière aléatoire sur les serveurs de base de données, le processeur de requêtes ne pourra pas prédire le serveur de base de données à prendre et devra interroger plusieurs serveurs de base de données. Cela pourrait entraîner un nouveau goulot d'étranglement du côté des requêtes.
  • Le schéma de distribution de la base de données pourrait donc être anticipé en organisant le flux d'événements en plusieurs rubriques (par exemple, en utilisant des groupes de clés ou de propriétés, pour partitionner la base de données selon une logique prévisible).
  • Si un serveur de messages n'est pas suffisant pour gérer un flot croissant d'événements d'entrée, vous pouvez ajouter des partitions kafka pour distribuer les rubriques kafka sur plusieurs serveurs physiques.

Offrir aux clients des événements non encore écrits dans la base de données

Vous souhaitez que vos clients puissent également accéder à des informations encore dans le canal et non encore écrites dans la base de données. C'est un peu plus délicat.

Option 1: utiliser un cache pour compléter les requêtes db

Je n'ai pas analysé en profondeur, mais la première idée qui me vient à l'esprit serait de faire du (des) processeur (s) de requête un ou des consommateurs des sujets kafka, mais dans un groupe de consommateurs kafka différent . Le processeur de requêtes recevrait alors tous les messages que le rédacteur de base de données recevra, mais indépendamment. Il pourrait ensuite les conserver dans un cache local. Les requêtes s'exécuteraient alors sur DB + cache (+ élimination des doublons).

Le design ressemblerait alors à:

entrez la description de l'image ici

L'évolutivité de cette couche de requête pourrait être obtenue en ajoutant plus de processeurs de requête (chacun dans son propre groupe de consommateurs).

Option 2: concevoir une double API

Une meilleure approche à mon humble avis serait d'offrir une double API (utiliser le mécanisme du groupe de consommateurs séparé):

  • une API de requête pour accéder aux événements dans la base de données et / ou faire des analyses
  • une API de streaming qui transmet simplement les messages directement à partir du sujet

L'avantage est que vous laissez le client décider de ce qui est intéressant. Cela pourrait éviter que vous fusionniez systématiquement les données de base de données avec des données fraîchement encaissées, lorsque le client ne s'intéresse qu'aux nouveaux événements entrants. Si la fusion délicate entre les événements frais et archivés est vraiment nécessaire, le client devra l'organiser.

Variantes

J'ai proposé kafka car il est conçu pour des volumes très élevés avec des messages persistants afin que vous puissiez redémarrer les serveurs si besoin.

Vous pouvez créer une architecture similaire avec RabbitMQ. Cependant, si vous avez besoin de files d'attente persistantes, cela peut réduire les performances . Aussi, pour autant que je sache, la seule façon d'obtenir la consommation parallèle des mêmes messages par plusieurs lecteurs (par exemple, écrivain + cache) avec RabbitMQ est de cloner les files d'attente . Une évolutivité plus élevée pourrait donc avoir un prix plus élevé.

Christophe
la source
Stellaire; Qu'entendez-vous par a distributed database (for example using a specialization of the server by group of keys)? Aussi pourquoi Kafka au lieu de RabbitMQ? Y a-t-il une raison particulière de choisir l'un plutôt que l'autre?
Nik Kyriakides
@NicholasKyriakides Merci! 1) Je pensais simplement à plusieurs serveurs de bases de données indépendants mais avec un schéma de partitionnement clair (clé, géographie, etc.) qui pouvait être utilisé pour distribuer efficacement les commandes. 2) Intuitivement , peut-être parce que Kafka est conçu pour un débit très élevé et que les messages persistants doivent redémarrer vos serveurs?). Je ne suis pas sûr que RabbitMQ soit aussi flexible pour les scénarios distribués, et les files d'attente persistantes diminuent les performances
Christophe
Pour 1) C'est donc assez similaire à mon Use multiple databasesidée, mais vous dites que je ne devrais pas simplement distribuer au hasard (ou à tour de rôle) les messages à chacune des bases de données. Droite?
Nik Kyriakides
Oui. Ma première pensée serait de ne pas opter pour une distribution aléatoire car cela pourrait augmenter la charge de traitement pour les requêtes (c'est-à-dire la requête des deux DB multiples la plupart du temps). Vous pouvez également envisager des moteurs de base de données distribués (egIgnite?). Mais faire un choix éclairé nécessiterait une bonne compréhension des modèles d'utilisation de la base de données (quoi d'autre est dans la base de données, à quelle fréquence est-elle interrogée, quel type de requêtes, y a-t-il des contraintes transactionnelles au-delà des événements individuels, etc.).
Christophe
3
Je veux juste dire que même si le kafka peut donner un débit très élevé, il dépasse probablement les besoins de la plupart des gens. J'ai trouvé que traiter avec kafka et son API était une grosse erreur pour nous. RabbitMQ n'est pas en reste et il a l'interface que vous attendez d'un MQ
imel96
11

Je suppose que vous devez explorer plus attentivement une approche que vous avez rejetée

  • Mettre les événements en file d'attente sur notre serveur

Ma suggestion serait de commencer à lire les différents articles publiés sur l' architecture LMAX . Ils ont réussi à faire fonctionner le traitement par lots à haut volume pour leur cas d'utilisation, et il peut être possible de faire ressembler vos compromis aux leurs.

En outre, vous voudrez peut-être voir si vous pouvez supprimer les lectures - idéalement, vous souhaitez pouvoir les mettre à l'échelle indépendamment des écritures. Cela peut signifier la recherche dans CQRS (ségrégation de responsabilité de requête de commande).

Le serveur peut redémarrer pendant la mise en file d'attente des événements, ce qui nous fait perdre les événements mis en file d'attente.

Dans un système distribué, je pense que vous pouvez être assez confiant que les messages vont se perdre. Vous pouvez être en mesure d'atténuer une partie de l'impact de cela en étant judicieux sur vos barrières de séquence (par exemple - en vous assurant que l'écriture sur le stockage durable se produit - avant que l'événement ne soit partagé en dehors du système).

  • Utilisez plusieurs bases de données, chacune enregistrant une partie des messages avec un serveur central de coordinateur de base de données pour les gérer

Peut-être - je serais plus susceptible d'examiner les limites de votre entreprise pour voir s'il existe des endroits naturels pour partager les données.

Dans certains cas, la perte de données est un compromis acceptable?

Eh bien, je suppose qu'il pourrait y en avoir, mais ce n'est pas là que j'allais. Le fait est que la conception aurait dû intégrer la robustesse requise pour progresser face à la perte de messages.

À quoi cela ressemble souvent, c'est un modèle basé sur l'extraction avec des notifications. Le fournisseur écrit les messages dans un magasin durable commandé. Le consommateur tire les messages du magasin et suit sa propre ligne des hautes eaux. Les notifications push sont utilisées comme un dispositif de réduction de la latence - mais si la notification est perdue, le message est toujours récupéré (éventuellement) car le consommateur tire sur un horaire régulier (la différence étant que si la notification est reçue, la traction se produit plus tôt ).

Voir Messagerie fiable sans transactions distribuées, par Udi Dahan (déjà référencé par Andy ) et Polyglot Data par Greg Young.

VoiceOfUnreason
la source
In a distributed system, I think you can be pretty confident that messages are going to get lost. Vraiment? Dans certains cas, la perte de données est un compromis acceptable? J'avais l'impression que perdre des données = échouer.
Nik Kyriakides
1
@NicholasKyriakides, ce n'est généralement pas acceptable, donc OP a suggéré la possibilité d'écrire dans un magasin durable avant d'émettre l'événement. Consultez cet article et cette vidéo d'Udi Dahan où il aborde le problème plus en détail.
Andy
6

Si je comprends bien, le flux actuel est:

  1. Recevoir et événement (je suppose via HTTP?)
  2. Demandez une connexion à la piscine.
  3. Insérez l'événement dans la base de données
  4. Libérez la connexion au pool.

Si c'est le cas, je pense que le premier changement à la conception serait d'arrêter d'avoir votre code de manipulation même des connexions de retour au pool à chaque événement. Au lieu de cela, créez un pool de threads / processus d'insertion qui est de 1 à 1 avec le nombre de connexions DB. Ceux-ci détiendront chacun une connexion DB dédiée.

À l'aide d'une sorte de file d'attente simultanée, ces threads extraient les messages de la file d'attente simultanée et les insèrent. En théorie, ils n'ont jamais besoin de renvoyer la connexion au pool ou d'en demander une nouvelle, mais vous devrez peut-être intégrer la gestion au cas où la connexion se détériorerait. Il peut être plus simple de tuer le thread / processus et d'en démarrer un nouveau.

Cela devrait éliminer efficacement la surcharge du pool de connexions. Vous devrez bien sûr être en mesure de pousser au moins 1000 événements / connexions par seconde sur chaque connexion. Vous voudrez peut-être essayer différents nombres de connexions, car avoir 500 connexions travaillant sur les mêmes tables pourrait créer des conflits sur la base de données, mais c'est une toute autre question. Une autre chose à considérer est l'utilisation d'inserts par lots, c'est-à-dire que chaque thread tire un certain nombre de messages et les pousse en une seule fois. Évitez également d'avoir plusieurs connexions pour tenter de mettre à jour les mêmes lignes.

JimmyJames
la source
5

Hypothèses

Je vais supposer que la charge que vous décrivez est constante, car c'est le scénario le plus difficile à résoudre.

Je vais également supposer que vous disposez d'un moyen d'exécuter des charges de travail déclenchées et de longue durée en dehors de votre processus d'application Web.

Solution

En supposant que vous avez correctement identifié votre goulot d'étranglement - latence entre votre processus et la base de données Postgres - c'est le principal problème à résoudre. La solution doit tenir compte de votre contrainte de cohérence avec les autres clients souhaitant lire les événements dès que possible après leur réception.

Pour résoudre le problème de latence, vous devez travailler de manière à minimiser la quantité de latence encourue par événement à stocker. C'est la chose clé que vous devez réaliser si vous ne voulez pas ou ne pouvez pas changer de matériel . Étant donné que vous êtes sur des services PaaS et que vous n'avez aucun contrôle sur le matériel ou le réseau, la seule façon de réduire la latence par événement sera une sorte d'écriture par lots d'événements.

Vous devrez stocker localement une file d'attente d'événements qui sera vidée et écrite périodiquement sur votre base de données, soit une fois qu'elle atteint une taille donnée, soit après un laps de temps écoulé. Un processus devra surveiller cette file d'attente pour déclencher le vidage vers le magasin. Il devrait y avoir beaucoup d'exemples sur la façon de gérer une file d'attente simultanée qui est vidée périodiquement dans la langue de votre choix - Voici un exemple en C # , du récepteur de traitement par lots périodique de la populaire bibliothèque de journalisation Serilog.

Cette réponse SO décrit le moyen le plus rapide de vider les données dans Postgres - même si cela nécessiterait que votre traitement par lots stocke la file d'attente sur le disque, et il y a probablement un problème à y résoudre lorsque votre disque disparaît lors du redémarrage dans Heroku.

Contrainte

Une autre réponse a déjà mentionné CQRS , et c'est la bonne approche à résoudre pour la contrainte. Vous souhaitez hydrater les modèles de lecture au fur et à mesure que chaque événement est traité - un modèle Mediator peut aider à encapsuler un événement et à le distribuer à plusieurs gestionnaires en cours de traitement. Ainsi, un gestionnaire peut ajouter à votre modèle de lecture l'événement qui est en mémoire que les clients peuvent interroger, et un autre gestionnaire peut être responsable de la mise en file d'attente de l'événement pour son éventuelle écriture par lots.

Le principal avantage du CQRS est que vous dissociez vos modèles conceptuels de lecture et d'écriture - ce qui est une façon élégante de dire que vous écrivez dans un modèle et que vous lisez à partir d'un autre modèle totalement différent. Pour bénéficier des avantages de l'évolutivité de CQRS, vous devez généralement vous assurer que chaque modèle est stocké séparément de manière optimale pour ses modèles d'utilisation. Dans ce cas, nous pouvons utiliser un modèle de lecture agrégé - par exemple, un cache Redis, ou simplement en mémoire - pour garantir que nos lectures sont rapides et cohérentes, tandis que nous utilisons toujours notre base de données transactionnelle pour écrire nos données.

Andrew Best
la source
3

Les événements arrivent plus rapidement que le pool de connexions DB ne peut gérer

Il s'agit d'un problème si chaque processus avait besoin d'une connexion à la base de données. Le système doit être conçu de manière à disposer d'un pool de travailleurs où chaque travailleur n'a besoin que d'une connexion à la base de données et chaque travailleur peut traiter plusieurs événements.

La file d'attente de messages peut être utilisée avec cette conception, vous avez besoin d'un ou de plusieurs producteurs de messages qui poussent les événements vers la file d'attente de messages et les travailleurs (consommateurs) traitent les messages de la file d'attente.

D'autres clients pourraient vouloir lire les événements simultanément

Cette contrainte n'est possible que si les événements stockés dans la base de données sans aucun traitement (événements bruts). Si les événements sont traités avant d'être stockés dans la base de données, alors la seule façon d'obtenir les événements est à partir de la base de données.

Si les clients veulent simplement interroger les événements bruts, je suggère d'utiliser un moteur de recherche comme Elastic Search. Vous obtiendrez même gratuitement l'API de requête / recherche.

Étant donné qu'il semble important d'interroger les événements avant qu'ils ne soient enregistrés dans la base de données, une solution simple comme Elastic Search devrait fonctionner. Vous stockez simplement tous les événements et ne dupliquez pas les mêmes données en les copiant dans la base de données.

La mise à l'échelle de la recherche élastique est facile, mais même avec une configuration de base, elle est assez performante.

Lorsque vous avez besoin d'un traitement, votre processus peut obtenir les événements d'ES, les traiter et les stocker dans la base de données. Je ne sais pas quel est le niveau de performance dont vous avez besoin de ce traitement, mais ce serait complètement distinct de l'interrogation des événements à partir d'ES. Vous ne devriez pas avoir de problème de connexion de toute façon, car vous pouvez avoir un nombre fixe de travailleurs et chacun avec une connexion à la base de données.

imel96
la source
2

1 k ou 2 k événements (5 Ko) par seconde, ce n'est pas tant pour une base de données si elle a un schéma et un moteur de stockage appropriés. Comme suggéré par @eddyce, un maître avec un ou plusieurs esclaves peut séparer les requêtes de lecture des validations d'écriture. L'utilisation de moins de connexions DB vous donnera un meilleur débit global.

D'autres clients pourraient vouloir lire les événements simultanément

Pour ces demandes, ils devraient également lire à partir de la base de données principale car il y aurait un retard de réplication vers les esclaves de lecture.

J'ai utilisé (Percona) MySQL avec le moteur TokuDB pour des écritures à très haut volume. Il existe également un moteur MyRocks basé sur LSMtrees qui est bon pour les charges d'écriture. Pour ces deux moteurs et probablement aussi PostgreSQL, il existe des paramètres pour l'isolement des transactions ainsi que le comportement de synchronisation de validation qui peuvent augmenter considérablement la capacité d'écriture. Dans le passé, nous avons accepté jusqu'à 1 seconde de données perdues qui ont été signalées au client db comme validées. Dans d'autres cas, il y avait des SSD avec batterie pour éviter les pertes.

Amazon RDS Aurora dans la version MySQL aurait un débit d'écriture 6 fois plus élevé avec une réplication à coût nul (semblable aux esclaves partageant un système de fichiers avec le maître). La saveur Aurora PostgreSQL dispose également d'un mécanisme de réplication avancé différent.

karmakaze
la source
TBH toute base de données bien administrée sur un matériel suffisant devrait être en mesure de faire face à cette charge. Le problème d'OP ne semble pas être la performance de la base de données mais la latence de connexion; je suppose que Heroku en tant que fournisseur PaaS leur vend une instance Postgres dans une autre région AWS.
amon
1

Je laisserais tomber Heroku tous ensemble, c'est-à-dire que je laisserais tomber une approche centralisée: plusieurs écritures qui atteignent le pic de la connexion maximale au pool sont l'une des principales raisons pour lesquelles les clusters db ont été inventés, principalement parce que vous ne chargez pas l'écriture db (s) avec des demandes de lecture qui peuvent être effectuées par d'autres db du cluster, j'essaierais avec une topologie maître-esclave, en plus - comme quelqu'un d'autre l'a déjà mentionné, avoir vos propres installations db permettrait de régler l'ensemble pour vous assurer que le temps de propagation des requêtes sera correctement géré.

Bonne chance

Edoardo
la source