Meilleur moyen de déplacer les messages hors DLQ dans Amazon SQS?

87

Quelle est la meilleure pratique pour déplacer les messages d'une file d'attente de lettres mortes vers la file d'attente d'origine dans Amazon SQS?

Serait-ce

  1. Obtenir un message de DLQ
  2. Écrire un message dans la file d'attente
  3. Supprimer le message du DLQ

Ou y a-t-il un moyen plus simple?

De plus, AWS aura-t-il éventuellement un outil dans la console pour déplacer les messages hors du DLQ?

Matt Dell
la source
github.com/garryyao/replay-aws-dlq fonctionne plutôt bien
Ulad Kasach
aussi une autre alternative github.com/mercury2269/sqsmover
Sergey

Réponses:

131

Voici un hack rapide. Ce n'est certainement pas la meilleure option ou l'option recommandée.

  1. Définissez la file d'attente SQS principale comme DLQ pour le DLQ réel avec le nombre maximal de réceptions sur 1.
  2. Afficher le contenu dans DLQ (Cela déplacera les messages vers la file d'attente principale car il s'agit du DLQ pour le DLQ réel)
  3. Supprimez le paramètre afin que la file d'attente principale ne soit plus le DLQ du DLQ réel
Rajkumar
la source
12
Oui, ce qui est très bien un hack - mais une option agréable pour une solution rapide si vous savez ce que vous faites et ne pas avoir le temps de résoudre ce #YOLO la bonne façon
Thomas Watson
14
Mais le compte de réception n'est pas remis à 0 lorsque vous faites cela. Faites attention.
Rajdeep Siddhapura
1
La bonne approche consiste à configurer la stratégie Redrive dans SQS avec le nombre maximal de réceptions et elle déplacera automatiquement le message vers DLQ lorsqu'il franchira le nombre de réceptions défini, puis écrira un thread de lecture à lire à partir de DLQ.
Cendres
5
Tu es un génie.
JefClaes
1
J'ai créé un outil CLI pour ce problème il y a quelques mois: github.com/renanvieira/phoenix-letter
MaltMaster
14

Il existe quelques scripts qui le font pour vous:

# install
npm install replay-aws-dlq;

# use
npx replay-aws-dlq [source_queue_url] [dest_queue_url]
# compile: https://github.com/mercury2269/sqsmover#compiling-from-source

# use
sqsmover -s [source_queue_url] -d [dest_queue_url] 
Ulad Kasach
la source
1
C'est la manière la plus simple, contrairement à la réponse acceptée. Exécutez simplement ceci à partir du terminal sur lequel la propriété AWS env vars est définie:npx replay-aws-dlq DL_URI MAIN_URI
Vasyl Boroviak
Remarque typo: dql -> dlq # install npm install replay-aws-dlq;
Lee Oades
Cela a parfaitement fonctionné pour moi (notez que je n'ai essayé que celui basé sur le go). Semblait déplacer les messages par étapes et pas tous à la fois (une bonne chose) et avait même une barre de progression. Mieux que la réponse acceptée IMO.
Yevgeny Ananin
13

Vous n'avez pas besoin de déplacer le message car il comportera de nombreux autres défis tels que les messages en double, les scénarios de récupération, le message perdu, la vérification de la déduplication, etc.

Voici la solution que nous avons mise en place -

Habituellement, nous utilisons le DLQ pour les erreurs transitoires, pas pour les erreurs permanentes. Alors pris ci-dessous l'approche -

  1. Lisez le message de DLQ comme une file d'attente normale

    Avantages
    • Pour éviter le traitement des messages en double
    • Meilleur contrôle sur DLQ - Comme j'ai mis un chèque, à traiter uniquement lorsque la file d'attente normale est complètement traitée.
    • Augmentez le processus en fonction du message sur DLQ
  2. Suivez ensuite le même code que la file d'attente régulière.

  3. Plus fiable en cas d'abandon de la tâche ou lorsque le processus s'est arrêté pendant le traitement (par exemple, instance tuée ou processus arrêté)

    Avantages
    • Réutilisation du code
    • La gestion des erreurs
    • Récupération et relecture des messages
  4. Étendez la visibilité des messages afin qu'aucun autre thread ne les traite.

    Avantage
    • Évitez de traiter le même enregistrement par plusieurs threads.
  5. Supprimez le message uniquement en cas d'erreur permanente ou de réussite.

    Avantage
    • Continuez le traitement jusqu'à ce que nous obtenions une erreur passagère.
Cendre
la source
J'aime beaucoup votre approche! Comment définissez-vous «erreur permanente» dans ce cas?
DMac the Destroyer
Tout ce qui est supérieur au code d'état HTTP> 200 <500 est une erreur permanente
Ash
c'est en effet une bonne approche en production. Cependant, je pense que cet article demande simplement comment re-poster les messages de DLQ dans la file d'attente normale. ce qui est parfois utile si vous savez ce que vous faites.
linehrr
C'est ce que je dis que vous ne devriez pas le faire. Parce que si vous le faites, cela créera plus de problèmes. Nous pouvons déplacer le message comme n'importe quel autre message push, mais nous perdrons les fonctionnalités DLQ telles que le nombre de réception, la visibilité et tout. Il sera traité comme un nouveau message.
Ash
6

Cela ressemble à votre meilleure option. Il est possible que votre processus échoue après l'étape 2. Dans ce cas, vous finirez par copier le message deux fois, mais votre application devrait de toute façon gérer la redistribution des messages (ou ne pas s'en soucier).

Dave
la source
6

ici:

import boto3
import sys
import Queue
import threading

work_queue = Queue.Queue()

sqs = boto3.resource('sqs')

from_q_name = sys.argv[1]
to_q_name = sys.argv[2]
print("From: " + from_q_name + " To: " + to_q_name)

from_q = sqs.get_queue_by_name(QueueName=from_q_name)
to_q = sqs.get_queue_by_name(QueueName=to_q_name)

def process_queue():
    while True:
        messages = work_queue.get()

        bodies = list()
        for i in range(0, len(messages)):
            bodies.append({'Id': str(i+1), 'MessageBody': messages[i].body})

        to_q.send_messages(Entries=bodies)

        for message in messages:
            print("Coppied " + str(message.body))
            message.delete()

for i in range(10):
     t = threading.Thread(target=process_queue)
     t.daemon = True
     t.start()

while True:
    messages = list()
    for message in from_q.receive_messages(
            MaxNumberOfMessages=10,
            VisibilityTimeout=123,
            WaitTimeSeconds=20):
        messages.append(message)
    work_queue.put(messages)

work_queue.join()
Brian Dilley
la source
Est-ce Python?
carlin.scott
python2 en fait
Kristof Jozsa
4

Il existe un autre moyen d'y parvenir sans écrire une seule ligne de code. Considérez votre nom de file d'attente réel est SQS_Queue et le DLQ correspondant est SQS_DLQ. Suivez maintenant ces étapes:

  1. Définissez SQS_Queue comme dlq de SQS_DLQ. Puisque SQS_DLQ est déjà un dlq de SQS_Queue. Maintenant, les deux agissent comme le dlq de l'autre.
  2. Définissez le nombre de réception maximum de votre SQS_DLQ sur 1.
  3. Lisez maintenant les messages de la console SQS_DLQ. Puisque le nombre de messages reçus est 1, il enverra tout le message à sa propre dlq qui est votre file d'attente SQS_Queue réelle.
Priyanka Agarwal
la source
Cela ira à l'encontre de l'objectif de maintenir un DLQ. DLQ est conçu pour ne pas surcharger votre système lorsque vous observez des pannes afin que vous puissiez le faire plus tard.
Buddha
Cela ira certainement à l'encontre de l'objectif et vous ne serez pas en mesure d'obtenir d'autres avantages tels que l'augmentation, la limitation et le nombre de réception. De plus, vous devez utiliser la file d'attente normale comme file d'attente de traitement et si le nombre de réception de messages atteint «N», il doit passer à DLQ. C'est ce que idéalement, il devrait être configuré.
Cendres
3
En tant que solution unique pour relancer un grand nombre de messages, cela fonctionne comme un charme. Pas une bonne solution à long terme, cependant.
nmio
Oui, cela est extrêmement utile en tant que solution unique pour rediriger les messages (après avoir résolu le problème dans la file d'attente principale). La commande est je sur AWS CLI: aws sqs receive-message --queue-url <url of DLQ> --max-number-of-messages 10. Étant donné que le maximum de messages, vous pouvez lire les majuscules à 10, je suggère d'exécuter la commande dans une boucle comme celle-ci:for i in {1..1000}; do <CMD>; done
Patrick Finnigan
3

J'ai écrit un petit script python pour ce faire, en utilisant boto3 lib:

conf = {
  "sqs-access-key": "",
  "sqs-secret-key": "",
  "reader-sqs-queue": "",
  "writer-sqs-queue": "",
  "message-group-id": ""
}

import boto3
client = boto3.client(
    'sqs',
        aws_access_key_id       = conf.get('sqs-access-key'),
        aws_secret_access_key   = conf.get('sqs-secret-key')
)

while True:
    messages = client.receive_message(QueueUrl=conf['reader-sqs-queue'], MaxNumberOfMessages=10, WaitTimeSeconds=10)

    if 'Messages' in messages:
        for m in messages['Messages']:
            print(m['Body'])
            ret = client.send_message( QueueUrl=conf['writer-sqs-queue'], MessageBody=m['Body'], MessageGroupId=conf['message-group-id'])
            print(ret)
            client.delete_message(QueueUrl=conf['reader-sqs-queue'], ReceiptHandle=m['ReceiptHandle'])
    else:
        print('Queue is currently empty or messages are invisible')
        break

vous pouvez obtenir ce script dans ce lien

ce script peut essentiellement déplacer des messages entre toutes les files d'attente arbitraires. et il prend en charge les files d'attente fifo ainsi que vous pouvez fournir le message_group_idchamp.

linehrr
la source
3

Nous utilisons le script suivant pour rediriger le message de la file d'attente src vers la file d'attente tgt:

nom de fichier: redrive.py

usage: python redrive.py -s {source queue name} -t {target queue name}

'''
This script is used to redrive message in (src) queue to (tgt) queue

The solution is to set the Target Queue as the Source Queue's Dead Letter Queue.
Also set Source Queue's redrive policy, Maximum Receives to 1. 
Also set Source Queue's VisibilityTimeout to 5 seconds (a small period)
Then read data from the Source Queue.

Source Queue's Redrive Policy will copy the message to the Target Queue.
'''
import argparse
import json
import boto3
sqs = boto3.client('sqs')


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('-s', '--src', required=True,
                        help='Name of source SQS')
    parser.add_argument('-t', '--tgt', required=True,
                        help='Name of targeted SQS')

    args = parser.parse_args()
    return args


def verify_queue(queue_name):
    queue_url = sqs.get_queue_url(QueueName=queue_name)
    return True if queue_url.get('QueueUrl') else False


def get_queue_attribute(queue_url):
    queue_attributes = sqs.get_queue_attributes(
        QueueUrl=queue_url,
        AttributeNames=['All'])['Attributes']
    print(queue_attributes)

    return queue_attributes


def main():
    args = parse_args()
    for q in [args.src, args.tgt]:
        if not verify_queue(q):
            print(f"Cannot find {q} in AWS SQS")

    src_queue_url = sqs.get_queue_url(QueueName=args.src)['QueueUrl']

    target_queue_url = sqs.get_queue_url(QueueName=args.tgt)['QueueUrl']
    target_queue_attributes = get_queue_attribute(target_queue_url)

    # Set the Source Queue's Redrive policy
    redrive_policy = {
        'deadLetterTargetArn': target_queue_attributes['QueueArn'],
        'maxReceiveCount': '1'
    }
    sqs.set_queue_attributes(
        QueueUrl=src_queue_url,
        Attributes={
            'VisibilityTimeout': '5',
            'RedrivePolicy': json.dumps(redrive_policy)
        }
    )
    get_queue_attribute(src_queue_url)

    # read all messages
    num_received = 0
    while True:
        try:
            resp = sqs.receive_message(
                QueueUrl=src_queue_url,
                MaxNumberOfMessages=10,
                AttributeNames=['All'],
                WaitTimeSeconds=5)

            num_message = len(resp.get('Messages', []))
            if not num_message:
                break

            num_received += num_message
        except Exception:
            break
    print(f"Redrive {num_received} messages")

    # Reset the Source Queue's Redrive policy
    sqs.set_queue_attributes(
        QueueUrl=src_queue_url,
        Attributes={
            'VisibilityTimeout': '30',
            'RedrivePolicy': ''
        }
    )
    get_queue_attribute(src_queue_url)


if __name__ == "__main__":
    main()
menrfa
la source
0

DLQ n'entre en jeu que lorsque le consommateur d'origine ne parvient pas à consommer le message avec succès après plusieurs tentatives. Nous ne voulons pas supprimer le message car nous pensons que nous pouvons toujours faire quelque chose avec lui (peut-être essayer de le traiter à nouveau, de le consigner ou de collecter des statistiques) et nous ne voulons pas continuer à rencontrer ce message encore et encore et arrêter la possibilité de traiter les autres messages derrière celui-ci.

DLQ n'est rien d'autre qu'une autre file d'attente. Ce qui signifie que nous aurions besoin d'écrire un consommateur pour DLQ qui fonctionnerait idéalement moins fréquemment (par rapport à la file d'attente d'origine) qui consommerait de DLQ et produirait le message dans la file d'attente d'origine et le supprimerait de DLQ - si c'est le comportement prévu et nous pensons le consommateur d'origine serait maintenant prêt à le traiter à nouveau. Cela devrait être OK si ce cycle se poursuit pendant un certain temps, car nous avons maintenant également la possibilité d'inspecter manuellement et d'apporter les modifications nécessaires et de déployer une autre version du consommateur d'origine sans perdre le message (dans la période de rétention du message bien sûr - qui est de 4 jours par défaut).

Ce serait bien si AWS fournissait cette capacité prête à l'emploi, mais je ne la vois pas encore - ils laissent cela à l'utilisateur final pour l'utiliser de la manière qu'il juge appropriée.

rd2
la source