Table de files d'attente FIFO pour plusieurs travailleurs dans SQL Server

15

J'essayais de répondre à la question de stackoverflow suivante:

Après avoir affiché une réponse quelque peu naïve, je pensais que je mettrais mon argent là où ma bouche était et fait tester le scénario que je voulais dire, pour être sûr que je ne l'envoie sur une OP de chasse aux oies sauvages. Eh bien, cela s'est avéré être beaucoup plus difficile que je ne le pensais (pas de surprise pour personne, j'en suis sûr).

Voici ce que j'ai essayé et pensé:

  • J'ai d'abord essayé une mise à jour TOP 1 avec un ORDER BY dans une table dérivée, en utilisant ROWLOCK, READPAST. Cela a entraîné des blocages et a également traité des articles hors service. Il doit être aussi proche de FIFO que possible, à l'exception des erreurs qui nécessitent d'essayer de traiter la même ligne plusieurs fois.

  • J'ai ensuite essayé sélectionner la prochaine QueueID désirée dans une variable, en utilisant diverses combinaisons de READPAST, UPDLOCK, HOLDLOCKet ROWLOCKde conserver exclusivement la ligne de mise à jour par cette session. Toutes les variantes que j'ai essayées ont souffert des mêmes problèmes qu'auparavant ainsi que, pour certaines combinaisons avec READPAST, se plaignant:

    Vous pouvez uniquement spécifier le verrou READPAST dans les niveaux d'isolement READ COMMITTED ou REPEATABLE READ.

    Cela a été source de confusion parce qu'il était READ COMMITTED. J'ai déjà rencontré cela auparavant et c'est frustrant.

  • Depuis que j'ai commencé à écrire cette question, Remus Rusani a posté une nouvelle réponse à la question. J'ai lu son article lié et je vois qu'il utilise des lectures destructrices, car il a dit dans sa réponse qu'il "n'était pas possible de garder les verrous pendant la durée des appels Web". Après avoir lu ce que dit son article concernant les points chauds et les pages nécessitant un verrouillage pour effectuer une mise à jour ou une suppression, je crains que même si je pouvais trouver les verrous corrects pour faire ce que je cherchais, il ne serait pas évolutif et pourrait pas gérer une concurrence massive.

Pour l'instant, je ne sais pas où aller. Est-il vrai que le maintien des verrous pendant le traitement de la ligne ne peut pas être réalisé (même s'il ne prend pas en charge des tps élevés ou une concurrence massive)? Qu'est-ce que je rate?

Dans l'espoir que des personnes plus intelligentes que moi et des personnes plus expérimentées que moi puissent vous aider, voici le script de test que j'utilisais. Il est revenu à la méthode TOP 1 UPDATE mais j'ai laissé l'autre méthode dedans, commentée, au cas où vous voudriez l'explorer aussi.

Collez chacun de ces éléments dans une session distincte, exécutez la session 1, puis rapidement toutes les autres. Dans environ 50 secondes, le test sera terminé. Regardez les messages de chaque session pour voir quel travail il a fait (ou comment il a échoué). La première session affichera un ensemble de lignes avec un instantané pris une fois par seconde détaillant les verrous présents et les éléments de file d'attente en cours de traitement. Cela fonctionne parfois, et d'autres fois ne fonctionne pas du tout.

Session 1

/* Session 1: Setup and control - Run this session first, then immediately run all other sessions */
IF Object_ID('dbo.Queue', 'U') IS NULL
   CREATE TABLE dbo.Queue (
      QueueID int identity(1,1) NOT NULL,
      StatusID int NOT NULL,
      QueuedDate datetime CONSTRAINT DF_Queue_QueuedDate DEFAULT (GetDate()),
      CONSTRAINT PK_Queue PRIMARY KEY CLUSTERED (QueuedDate, QueueID)
   );

IF Object_ID('dbo.QueueHistory', 'U') IS NULL
   CREATE TABLE dbo.QueueHistory (
      HistoryDate datetime NOT NULL,
      QueueID int NOT NULL
   );

IF Object_ID('dbo.LockHistory', 'U') IS NULL
   CREATE TABLE dbo.LockHistory (
      HistoryDate datetime NOT NULL,
      ResourceType varchar(100),
      RequestMode varchar(100),
      RequestStatus varchar(100),
      ResourceDescription varchar(200),
      ResourceAssociatedEntityID varchar(200)
   );

IF Object_ID('dbo.StartTime', 'U') IS NULL
   CREATE TABLE dbo.StartTime (
      StartTime datetime NOT NULL
   );

SET NOCOUNT ON;

IF (SELECT Count(*) FROM dbo.Queue) < 10000 BEGIN
   TRUNCATE TABLE dbo.Queue;

   WITH A (N) AS (SELECT 1 UNION ALL SELECT 1 UNION ALL SELECT 1 UNION ALL SELECT 1),
   B (N) AS (SELECT 1 FROM A Z, A I, A P),
   C (N) AS (SELECT Row_Number() OVER (ORDER BY (SELECT 1)) FROM B O, B W)
   INSERT dbo.Queue (StatusID, QueuedDate)
   SELECT 1, DateAdd(millisecond, C.N * 3, GetDate() - '00:05:00')
   FROM C
   WHERE C.N <= 10000;
END;

TRUNCATE TABLE dbo.StartTime;
INSERT dbo.StartTime SELECT GetDate() + '00:00:15'; -- or however long it takes you to go run the other sessions
GO
TRUNCATE TABLE dbo.QueueHistory;
SET NOCOUNT ON;

DECLARE
   @Time varchar(8),
   @Now datetime;
SELECT @Time = Convert(varchar(8), StartTime, 114)
FROM dbo.StartTime;
WAITFOR TIME @Time;

DECLARE @i int,
@QueueID int;
SET @i = 1;
WHILE @i <= 33 BEGIN
   SET @Now  = GetDate();
   INSERT dbo.QueueHistory
   SELECT
      @Now,
      QueueID
   FROM
      dbo.Queue Q WITH (NOLOCK)
   WHERE
      Q.StatusID <> 1;

   INSERT dbo.LockHistory
   SELECT
      @Now,
      L.resource_type,
      L.request_mode,
      L.request_status,
      L.resource_description,
      L.resource_associated_entity_id
   FROM
      sys.dm_tran_current_transaction T
      INNER JOIN sys.dm_tran_locks L
         ON L.request_owner_id = T.transaction_id;
   WAITFOR DELAY '00:00:01';
   SET @i = @i + 1;
END;

WITH Cols AS (
   SELECT *, Row_Number() OVER (PARTITION BY HistoryDate ORDER BY QueueID) Col
   FROM dbo.QueueHistory
), P AS (
   SELECT *
   FROM
      Cols
      PIVOT (Max(QueueID) FOR Col IN ([1], [2], [3], [4], [5], [6], [7], [8])) P
)
SELECT L.*, P.[1], P.[2], P.[3], P.[4], P.[5], P.[6], P.[7], P.[8]
FROM
   dbo.LockHistory L
   FULL JOIN P
      ON L.HistoryDate = P.HistoryDate

/* Clean up afterward
DROP TABLE dbo.StartTime;
DROP TABLE dbo.LockHistory;
DROP TABLE dbo.QueueHistory;
DROP TABLE dbo.Queue;
*/

Session 2

/* Session 2: Simulate an application instance holding a row locked for a long period, and eventually abandoning it. */
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET NOCOUNT ON;
SET XACT_ABORT ON;

DECLARE
   @QueueID int,
   @Time varchar(8);
SELECT @Time = Convert(varchar(8), StartTime + '0:00:01', 114)
FROM dbo.StartTime;
WAITFOR TIME @Time;
BEGIN TRAN;

--SET @QueueID = (
--   SELECT TOP 1 QueueID
--   FROM dbo.Queue WITH (READPAST, UPDLOCK)
--   WHERE StatusID = 1 -- ready
--   ORDER BY QueuedDate, QueueID
--);

--UPDATE dbo.Queue
--SET StatusID = 2 -- in process
----OUTPUT Inserted.*
--WHERE QueueID = @QueueID;

SET @QueueID = NULL;
UPDATE Q
SET Q.StatusID = 1, @QueueID = Q.QueueID
FROM (
   SELECT TOP 1 *
   FROM dbo.Queue WITH (ROWLOCK, READPAST)
   WHERE StatusID = 1
   ORDER BY QueuedDate, QueueID
) Q

PRINT @QueueID;

WAITFOR DELAY '00:00:20'; -- Release it partway through the test

ROLLBACK TRAN; -- Simulate client disconnecting

Session 3

/* Session 3: Run a near-continuous series of "failed" queue processing. */
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET XACT_ABORT ON;
SET NOCOUNT ON;
DECLARE
   @QueueID int,
   @EndDate datetime,
   @NextDate datetime,
   @Time varchar(8);

SELECT
   @EndDate = StartTime + '0:00:33',
   @Time = Convert(varchar(8), StartTime, 114)
FROM dbo.StartTime;

WAITFOR TIME @Time;

WHILE GetDate() < @EndDate BEGIN
   BEGIN TRAN;

   --SET @QueueID = (
   --   SELECT TOP 1 QueueID
   --   FROM dbo.Queue WITH (READPAST, UPDLOCK)
   --   WHERE StatusID = 1 -- ready
   --   ORDER BY QueuedDate, QueueID
   --);

   --UPDATE dbo.Queue
   --SET StatusID = 2 -- in process
   ----OUTPUT Inserted.*
   --WHERE QueueID = @QueueID;

   SET @QueueID = NULL;
   UPDATE Q
   SET Q.StatusID = 1, @QueueID = Q.QueueID
   FROM (
      SELECT TOP 1 *
      FROM dbo.Queue WITH (ROWLOCK, READPAST)
      WHERE StatusID = 1
      ORDER BY QueuedDate, QueueID
   ) Q

   PRINT @QueueID;

   SET @NextDate = GetDate() + '00:00:00.015';
   WHILE GetDate() < @NextDate SET NOCOUNT ON;
   ROLLBACK TRAN;
END

Session 4 et plus - autant que vous le souhaitez

/* Session 4: "Process" the queue normally, one every second for 30 seconds. */
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET XACT_ABORT ON;
SET NOCOUNT ON;

DECLARE @Time varchar(8);
SELECT @Time = Convert(varchar(8), StartTime, 114)
FROM dbo.StartTime;
WAITFOR TIME @Time;

DECLARE @i int,
@QueueID int;
SET @i = 1;
WHILE @i <= 30 BEGIN
   BEGIN TRAN;

   --SET @QueueID = (
   --   SELECT TOP 1 QueueID
   --   FROM dbo.Queue WITH (READPAST, UPDLOCK)
   --   WHERE StatusID = 1 -- ready
   --   ORDER BY QueuedDate, QueueID
   --);

   --UPDATE dbo.Queue
   --SET StatusID = 2 -- in process
   --WHERE QueueID = @QueueID;

   SET @QueueID = NULL;
   UPDATE Q
   SET Q.StatusID = 1, @QueueID = Q.QueueID
   FROM (
      SELECT TOP 1 *
      FROM dbo.Queue WITH (ROWLOCK, READPAST)
      WHERE StatusID = 1
      ORDER BY QueuedDate, QueueID
   ) Q

   PRINT @QueueID;
   WAITFOR DELAY '00:00:01'
   SET @i = @i + 1;
   DELETE dbo.Queue
   WHERE QueueID = @QueueID;   
   COMMIT TRAN;
END
ErikE
la source
2
Les files d'attente décrites dans l'article lié peuvent évoluer à des centaines ou à des milliers d'opérations par seconde. Les problèmes de conflit de points chauds ne sont pertinents qu'à plus grande échelle. Il existe des stratégies d'atténuation connues qui peuvent atteindre un débit plus élevé sur un système haut de gamme, allant jusqu'à des dizaines de milliers par seconde, mais ces atténuations nécessitent une évaluation minutieuse et sont déployées sous la supervision de SQLCAT .
Remus Rusanu
Une ride intéressante est qu'avec READPAST, UPDLOCK, ROWLOCKmon script pour capturer des données dans la table QueueHistory, cela ne fait rien. Je me demande si c'est parce que le StatusID n'est pas engagé? Son utilisation WITH (NOLOCK)devrait donc théoriquement fonctionner ... et cela fonctionnait avant! Je ne sais pas pourquoi cela ne fonctionne pas maintenant, mais c'est probablement une autre expérience d'apprentissage.
ErikE
Pourriez-vous réduire votre code au plus petit exemple qui présente le blocage et d'autres problèmes que vous essayez de résoudre?
Nick Chammas
@ Nick, je vais essayer de réduire le code. À propos de vos autres commentaires, il existe une colonne d'identité qui fait partie de l'index clusterisé et triée par après la date. Je suis tout à fait disposé à entretenir une "lecture destructrice" (SUPPRIMER avec SORTIE) mais l'une des exigences demandées était, dans le cas d'une instance d'application échouant, que la ligne revienne au traitement automatiquement. Ma question ici est donc de savoir si cela est possible.
ErikE
Essayez l'approche de lecture destructive et placez les éléments retirés de la file d'attente dans une table distincte d'où ils peuvent être remis en file d'attente si nécessaire. Si cela résout le problème, vous pouvez investir dans le bon déroulement de ce processus de remise en file d'attente.
Nick Chammas

Réponses:

10

Vous avez besoin exactement de 3 astuces de verrouillage

  • READPAST
  • UPDLOCK
  • DAME DE NAGE

J'ai déjà répondu à cela sur SO: /programming/939831/sql-server-process-queue-race-condition/940001#940001

Comme le dit Remus, l'utilisation de Service Broker est plus agréable, mais ces conseils fonctionnent

Votre erreur sur le niveau d'isolement signifie généralement la réplication ou NOLOCK est impliqué.

gbn
la source
L'utilisation de ces conseils sur mon script comme indiqué ci-dessus génère des blocages et des processus hors service. ( UPDATE SET ... FROM (SELECT TOP 1 ... FROM ... ORDER BY ...)) Cela signifie-t-il que mon modèle UPDATE avec maintien d'un verrou ne peut pas fonctionner? En outre, le moment où vous combinez READPASTavec HOLDLOCKvous obtenez l'erreur. Il n'y a pas de réplication sur ce serveur et le niveau d'isolement est READ COMMITTED.
ErikE
2
@ErikE - La façon dont la table est structurée est tout aussi importante que la façon dont vous interrogez la table. La table que vous utilisez comme file d'attente doit être regroupée dans l'ordre de retrait de sorte que l'élément suivant à retirer de la file d'attente soit sans ambiguïté . C'est critique. En parcourant votre code ci-dessus, je ne vois aucun index cluster défini.
Nick Chammas
@Nick qui a un sens parfaitement éminent et je ne sais pas pourquoi je n'y ai pas pensé. J'ai ajouté la contrainte PK appropriée (et mis à jour mon script ci-dessus), et j'ai toujours des blocages. Cependant, les articles étaient désormais traités dans le bon ordre, à l'exception du traitement répété pour les articles bloqués.
ErikE
@ErikE - 1. Votre file d'attente ne doit contenir que des éléments en file d'attente. La mise en file d'attente et l'élément doivent signifier leur suppression de la table de file d'attente. Je constate que vous mettez plutôt à jour le StatusIDpour retirer un élément de la file d'attente. Est-ce exact? 2. Votre ordre de retrait doit être sans ambiguïté. Si vous placez des éléments en file d'attente GETDATE(), alors à des volumes élevés, il est très probable que plusieurs éléments seront également éligibles pour la file d'attente en même temps. Cela entraînera des blocages. Je suggère d'ajouter un IDENTITYà l'index cluster pour garantir un ordre de file d'attente sans ambiguïté.
Nick Chammas
1

Le serveur SQL fonctionne très bien pour stocker des données relationnelles. Quant à une file d'attente, ce n'est pas si génial. Voir cet article écrit pour MySQL mais il peut également s'appliquer ici. https://blog.engineyard.com/2011/5-subtle-ways-youre-using-mysql-as-a-queue-and-why-itll-bite-you

Eric Humphrey - lotsahelp
la source
Merci Eric. Dans ma réponse d'origine à la question, je proposais d'utiliser SQL Server Service Broker parce que je sais pertinemment que la méthode de la table en tant que file d'attente n'est pas vraiment la raison pour laquelle la base de données a été conçue. Mais je pense que ce n'est plus une bonne recommandation car SB est vraiment juste pour les messages. Les propriétés ACID des données placées dans la base de données en font un conteneur très intéressant à essayer d'utiliser (ab). Pouvez-vous suggérer un autre produit à faible coût qui fonctionnera bien comme file d'attente générique? Et peut être sauvegardé, etc. etc.?
ErikE
8
L'article est coupable d'une erreur connue dans le traitement des files d'attente: combinez l'état et les événements dans une seule table (en fait, si vous regardez les commentaires de l'article, vous verrez que je m'y suis opposé il y a quelque temps). Le symptôme typique de ce problème est le champ «traité / traitement». La combinaison de l'état avec les événements (c'est-à-dire faire de la table d'état la «file d'attente») entraîne une augmentation de la «file d'attente» à des tailles énormes (puisque la table d'état est la file d'attente). Séparer les événements dans une vraie file d'attente conduit à une file d'attente qui «se vide» (devient vide) et cela se comporte beaucoup mieux.
Remus Rusanu
L'article ne suggère-t-il pas exactement cela: la table de file d'attente ne contient que des éléments prêts à fonctionner.?
ErikE
2
@ErikE: vous faites référence à ce paragraphe, non? il est également très facile d'éviter le syndrome d'une grande table. Créez simplement une table distincte pour les nouveaux e-mails, et lorsque vous avez terminé de les traiter, INSÉREZ-les dans un stockage à long terme, puis supprimez-les de la table de file d'attente. Le tableau des nouveaux e-mails restera généralement très petit et les opérations seront rapides . Ma querelle avec cela est que cela est donné comme solution de contournement pour la question des «grosses files d'attente». Cette recommandation aurait dû être dans l'ouverture de l'article, est une question fondamentale .
Remus Rusanu
Si vous commencez à penser dans une séparation claire de l'état par rapport à l'événement, vous démarrez vdown un chemin beaucoup plus facile. Même la recommandation ci-dessus se transformerait en insérer de nouveaux e-mails dans la emailstable et dans la new_emailsfile d'attente. Le traitement interroge la new_emailsfile d'attente et met à jour l'état de la emailstable . Cela évite également le problème de l'état «gras» se déplaçant dans les files d'attente. Si nous parlons de traitement distribué et de vraies files d'attente, avec la communication (par exemple SSB), alors les choses deviennent plus compliquées car l'état partagé est problématique dans les systèmes distirbuted.
Remus Rusanu