Comment UPSERT (MERGE, INSERT… ON DUPLICATE UPDATE) dans PostgreSQL?

268

Une question très fréquemment posée ici est de savoir comment faire un upsert, ce que MySQL appelle INSERT ... ON DUPLICATE UPDATEet la norme prend en charge dans le cadre de l' MERGEopération.

Étant donné que PostgreSQL ne le prend pas directement en charge (avant la page 9.5), comment procédez-vous? Considérer ce qui suit:

CREATE TABLE testtable (
    id integer PRIMARY KEY,
    somedata text NOT NULL
);

INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');

Maintenant , imaginez que vous voulez « upsert » les tuples (2, 'Joe'), (3, 'Alan'), de sorte que le nouveau contenu de la table serait:

(1, 'fred'),
(2, 'Joe'),    -- Changed value of existing tuple
(3, 'Alan')    -- Added new tuple

C'est de cela que les gens parlent lorsqu'ils discutent d'un upsert. Surtout, toute approche doit être sûre en présence de plusieurs transactions travaillant sur la même table - soit en utilisant un verrouillage explicite, soit en se défendant contre les conditions de concurrence qui en résultent.

Ce sujet est abordé en détail dans Insert, sur la mise à jour en double dans PostgreSQL? , mais il s'agit d'alternatives à la syntaxe MySQL, et elle a augmenté un peu de détails non liés au fil du temps. Je travaille sur des réponses définitives.

Ces techniques sont également utiles pour "insérer s'il n'existe pas, sinon ne rien faire", c'est-à-dire "insérer ... sur une clé en double ignorer".

Craig Ringer
la source
8
@MichaelHampton, le but ici était de créer une version définitive qui ne soit pas confondue par plusieurs réponses obsolètes - et verrouillée, afin que personne ne puisse rien y faire. Je ne suis pas d'accord avec le vote serré.
Craig Ringer
Eh bien, cela deviendrait bientôt obsolète - et verrouillé, donc personne ne pourrait rien y faire.
Michael Hampton
2
@MichaelHampton Si vous êtes inquiet, vous pouvez peut-être signaler celui auquel vous avez lié et demander qu'il soit déverrouillé afin qu'il puisse être nettoyé, alors nous pouvons fusionner cela. Je suis juste malade d'avoir le seul proche évident as-dup for upsert étant un gâchis si confus et mauvais.
Craig Ringer
1
Ce Q&R n'est pas verrouillé!
Michael Hampton

Réponses:

396

9.5 et plus récent:

PostgreSQL 9.5 et support plus récent INSERT ... ON CONFLICT UPDATE(et ON CONFLICT DO NOTHING), ie upsert.

Comparaison avecON DUPLICATE KEY UPDATE .

Explication rapide .

Pour l'utilisation, voir le manuel - en particulier la clause conflict_action dans le diagramme de syntaxe et le texte explicatif .

Contrairement aux solutions pour 9.4 et antérieures présentées ci-dessous, cette fonctionnalité fonctionne avec plusieurs lignes en conflit et ne nécessite pas de verrouillage exclusif ni de boucle de relance.

Le commit ajoutant la fonctionnalité est ici et la discussion autour de son développement est ici .


Si vous êtes sur 9.5 et n'avez pas besoin d'être rétrocompatible, vous pouvez arrêter la lecture maintenant .


9.4 et plus:

PostgreSQL n'a pas de fonction intégrée UPSERT(ou MERGE), et le faire efficacement face à une utilisation simultanée est très difficile.

Cet article décrit le problème en détail utile .

En général, vous devez choisir entre deux options:

  • Opérations d'insertion / mise à jour individuelles dans une boucle de nouvelle tentative; ou
  • Verrouillage de la table et fusion par lots

Boucle de relance de ligne individuelle

L'utilisation de sauts de ligne individuels dans une boucle de nouvelle tentative est l'option raisonnable si vous souhaitez que de nombreuses connexions essaient simultanément d'effectuer des insertions.

La documentation PostgreSQL contient une procédure utile qui vous permettra de le faire en boucle à l'intérieur de la base de données . Il protège contre les mises à jour perdues et les courses d'insertion, contrairement à la plupart des solutions naïves. Cela ne fonctionnera qu'en READ COMMITTEDmode et n'est sûr que si c'est la seule chose que vous faites dans la transaction. La fonction ne fonctionnera pas correctement si les déclencheurs ou les clés uniques secondaires provoquent des violations uniques.

Cette stratégie est très inefficace. Dans la mesure du possible, vous devez mettre le travail en file d'attente et effectuer une mise à jour groupée comme décrit ci-dessous.

De nombreuses tentatives de solutions à ce problème ne prennent pas en compte les annulations, elles entraînent donc des mises à jour incomplètes. Deux transactions se font la course; l'un d'eux a réussi INSERTs; l'autre obtient une erreur de clé en double et fait à la UPDATEplace. Les UPDATEblocs en attente INSERTde restauration ou de validation. Lorsqu'elle est annulée, la UPDATEnouvelle vérification de la condition correspond à zéro ligne, donc même si UPDATEles validations n'ont pas réellement fait l'upert que vous attendiez. Vous devez vérifier le nombre de lignes de résultats et réessayer si nécessaire.

Certaines solutions tentées ne tiennent pas compte non plus des races SELECT. Si vous essayez l'évidence et la simplicité:

-- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE.

BEGIN;

UPDATE testtable
SET somedata = 'blah'
WHERE id = 2;

-- Remember, this is WRONG. Do NOT COPY IT.

INSERT INTO testtable (id, somedata)
SELECT 2, 'blah'
WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2);

COMMIT;

puis, lorsque deux fonctionnent en même temps, il existe plusieurs modes de défaillance. L'un est le problème déjà discuté avec une nouvelle vérification de la mise à jour. Un autre est où les deux UPDATEen même temps, correspondant à zéro ligne et continuant. Ensuite, ils font tous les deux le EXISTStest, qui a lieu avant le INSERT. Les deux obtiennent zéro ligne, donc les deux font le INSERT. Un échoue avec une erreur de clé en double.

C'est pourquoi vous avez besoin d'une boucle de réessai. Vous pourriez penser que vous pouvez éviter les erreurs de clé en double ou les mises à jour perdues avec SQL intelligent, mais vous ne pouvez pas. Vous devez vérifier le nombre de lignes ou gérer les erreurs de clé en double (selon l'approche choisie) et réessayer.

Veuillez ne pas lancer votre propre solution pour cela. Comme pour la mise en file d'attente des messages, c'est probablement faux.

Upsert en vrac avec serrure

Parfois, vous souhaitez effectuer une mise à niveau groupée, dans laquelle vous disposez d'un nouvel ensemble de données que vous souhaitez fusionner avec un ancien ensemble de données existant. Ceci est beaucoup plus efficace que les sauts de rangs individuels et devrait être préféré chaque fois que cela est possible.

Dans ce cas, vous suivez généralement le processus suivant:

  • CREATEune TEMPORARYtable

  • COPY ou insérez en masse les nouvelles données dans la table temporaire

  • LOCKla table cible IN EXCLUSIVE MODE. Cela permet à d'autres transactions SELECT, mais sans apporter de modifications à la table.

  • Faites un UPDATE ... FROMdes enregistrements existants en utilisant les valeurs de la table temporaire;

  • Faites une INSERTdes lignes qui n'existent pas déjà dans la table cible;

  • COMMIT, libérant le verrou.

Par exemple, pour l'exemple donné dans la question, en utilisant plusieurs valeurs INSERTpour remplir la table temporaire:

BEGIN;

CREATE TEMPORARY TABLE newvals(id integer, somedata text);

INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan');

LOCK TABLE testtable IN EXCLUSIVE MODE;

UPDATE testtable
SET somedata = newvals.somedata
FROM newvals
WHERE newvals.id = testtable.id;

INSERT INTO testtable
SELECT newvals.id, newvals.somedata
FROM newvals
LEFT OUTER JOIN testtable ON (testtable.id = newvals.id)
WHERE testtable.id IS NULL;

COMMIT;

Lecture connexe

Et alors MERGE?

Le standard SQL MERGEa en fait une sémantique de concurrence d'accès mal définie et ne convient pas pour la mise à jour sans verrouiller d'abord une table.

C'est une instruction OLAP vraiment utile pour la fusion de données, mais ce n'est pas en fait une solution utile pour l'upsert concurrentiel sécurisé. Il y a beaucoup de conseils aux personnes utilisant d'autres SGBD MERGEpour les upserts, mais c'est en fait faux.

Autres DB:

Craig Ringer
la source
Dans l'upert en vrac, est-il possible de supprimer des newvals plutôt que de filtrer INSERT? Par exemple, avec upd AS (UPDATE ... RETURNING newvals.id) SUPPRIMER DE newvals EN UTILISANT upd WHERE newvals.id = upd.id, suivi par un INSERT INTO testtable nu SELECT * FROM newvals? Mon idée avec ceci: au lieu de filtrer deux fois dans INSERT (pour JOIN / WHERE et pour la contrainte unique), réutilisez les résultats du contrôle d'existence de UPDATE, qui sont déjà en RAM, et peuvent être beaucoup plus petits. Cela peut être une victoire si quelques lignes correspondent et / ou newvals est beaucoup plus petit que la table de test.
Gunnlaugur Briem
1
Il y a encore des problèmes non résolus et pour les autres fournisseurs, ce qui fonctionne et ce qui ne fonctionne pas n'est pas clair. 1. La solution de boucle Postgres, comme indiqué, ne fonctionne pas dans le cas de plusieurs clés uniques. 2. La clé on duplicate pour mysql ne fonctionne pas non plus pour plusieurs clés uniques. 3. Les autres solutions pour MySQL, SQL Server et Oracle publiées ci-dessus fonctionnent-elles? Des exceptions sont-elles possibles dans ces cas et devons-nous boucler?
dan b
@danb Il s'agit uniquement de PostgreSQL. Il n'y a pas de solution multi-fournisseurs. La solution pour PostgreSQL ne fonctionne pas pour plusieurs lignes, vous devez malheureusement effectuer une transaction par ligne. Les «solutions» utilisées MERGEpour SQL Server et Oracle sont incorrectes et sujettes aux conditions de concurrence, comme indiqué ci-dessus. Vous aurez besoin d'examiner spécifiquement chaque SGBD pour savoir comment les gérer, je ne peux vraiment offrir que des conseils sur PostgreSQL. La seule façon de faire un upsert multi-lignes en toute sécurité sur PostgreSQL sera de prendre en charge le upsert natif sur le serveur principal.
Craig Ringer
Même pour PostGresQL, la solution ne fonctionne pas dans le cas où une table a plusieurs clés uniques (mise à jour d'une seule ligne). Dans ce cas, vous devez spécifier quelle clé est mise à jour. Il peut y avoir une solution multi-fournisseurs utilisant jdbc par exemple.
dan b
2
Postgres prend désormais en charge UPSERT - git.postgresql.org/gitweb/…
Chris
32

J'essaie de contribuer avec une autre solution pour le problème d'insertion unique avec les versions antérieures à 9.5 de PostgreSQL. L'idée est simplement d'essayer d'effectuer d'abord l'insertion, et dans le cas où l'enregistrement est déjà présent, de le mettre à jour:

do $$
begin 
  insert into testtable(id, somedata) values(2,'Joe');
exception when unique_violation then
  update testtable set somedata = 'Joe' where id = 2;
end $$;

Notez que cette solution ne peut être appliquée que s'il n'y a aucune suppression de lignes de la table .

Je ne connais pas l'efficacité de cette solution, mais elle me semble assez raisonnable.

Renzo
la source
3
Merci, c'est exactement ce que je cherchais. Je ne comprends pas pourquoi c'était si difficile à trouver.
isapir
4
Oui. Cette simplification fonctionne si et seulement s'il n'y a pas de suppressions.
Craig Ringer
@CraigRinger Pouvez-vous expliquer ce qui se passera exactement s'il y avait des suppressions?
turbanoff
@turbanoff L'insertion peut échouer car l'enregistrement est déjà là, puis il est supprimé simultanément, et la mise à jour affecte alors zéro ligne car la ligne a été supprimée.
Craig Ringer
@CraigRinger So. La suppression se produit simultanément . Quels sont outways possibles si cela est fonctionne bien? Si la suppression fonctionne simultanément - alors elle peut être exécutée juste après notre blocage. Ce que j'essaie de dire - si nous avons des suppressions simultanées - alors ce code fonctionne de la même manière que boninsert on update
turbanoff
30

Voici quelques exemples pour insert ... on conflict ...( pg 9.5+ ):

  • Insérer, en cas de conflit - ne rien faire .
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict do nothing;`  
  • Insérer, en cas de conflit - mettre à jour , spécifier la cible du conflit via la colonne .
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict(id)
    do update set name = 'new_name', size = 3;  
  • Insérer, en cas de conflit - mettre à jour , spécifier la cible du conflit via le nom de la contrainte .
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict on constraint dummy_pkey
    do update set name = 'new_name', size = 4;
Eric Wang
la source
grande réponse - question: pourquoi ou dans quelle situation doit-on utiliser la spécification cible via le nom de la colonne ou de la contrainte? Y a-t-il un avantage / inconvénient pour différents cas d'utilisation?
Nathan Benton
1
@NathanBenton Je pense qu'il y a au moins 2 différences: (1) le nom de la colonne est spécifié par le programmeur, tandis que le nom de la contrainte peut être soit spécifié par le programmeur, soit généré par la base de données en fonction des noms de table / colonne. (2) chaque colonne peut avoir plusieurs contraintes. Cela dit, cela dépend de votre cas de choisir lequel utiliser.
Eric Wang
8

SQLAlchemy upsert pour Postgres> = 9.5

Étant donné que le grand message ci-dessus couvre de nombreuses approches SQL différentes pour les versions de Postgres (non seulement non-9.5 comme dans la question), je voudrais ajouter comment le faire dans SQLAlchemy si vous utilisez Postgres 9.5. Au lieu d'implémenter votre propre upsert, vous pouvez également utiliser les fonctions de SQLAlchemy (qui ont été ajoutées dans SQLAlchemy 1.1). Personnellement, je recommanderais de les utiliser si possible. Non seulement pour des raisons de commodité, mais aussi parce qu'il permet à PostgreSQL de gérer toutes les conditions de concurrence qui pourraient se produire.

Publication croisée d'une autre réponse que j'ai donnée hier ( https://stackoverflow.com/a/44395983/2156909 )

SQLAlchemy prend ON CONFLICTdésormais en charge deux méthodes on_conflict_do_update()et on_conflict_do_nothing():

Copie de la documentation:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='[email protected]', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
    )
conn.execute(stmt)

http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict#insert-on-conflict-upsert

PR
la source
4
Python et SQLAlchemy ne sont pas mentionnés dans la question.
Alexander Emelianov,
J'utilise souvent Python dans les solutions que j'écris. Mais je n'ai pas examiné SQLAlchemy (ou je ne le savais pas). Cela semble une option élégante. Je vous remercie. S'il vérifie, je présenterai ceci à mon organisation.
Robert
3
WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2 
RETURNING ID),
INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD))
INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS

Testé sur Postgresql 9.3

aristar
la source
@CraigRinger: pourriez-vous nous en dire plus? le cte n'est-il pas atomique?
parisni
2
@parisni Non. Chaque terme CTE obtient son propre instantané s'il effectue des écritures. De plus, il n'y a aucune sorte de verrouillage de prédicat effectué sur les lignes qui n'ont pas été trouvées afin qu'elles puissent toujours être créées simultanément par une autre session. Si vous utilisiez l' SERIALIZABLEisolement, vous obtiendriez un abandon avec un échec de sérialisation, sinon vous obtiendriez probablement une violation unique. Ne réinventez pas l'upert, la réinvention sera fausse. Utilisez INSERT ... ON CONFLICT .... Si votre PostgreSQL est trop ancien, mettez-le à jour.
Craig Ringer
@CraigRinger INSERT ... ON CLONFLICT ...n'est pas destiné au chargement en masse. De votre message, l' LOCK TABLE testtable IN EXCLUSIVE MODE;intérieur d'un CTE est une solution de contournement pour obtenir des choses atomiques. Non ?
parisni
@parisni Il n'est pas destiné au chargement en masse? Dit qui? postgresql.org/docs/current/sql-insert.html#SQL-ON-CONFLICT . Bien sûr, c'est beaucoup plus lent que le chargement en masse sans comportement de type upsert, mais c'est évident et ce sera le cas, quoi que vous fassiez. C'est bien plus rapide que d'utiliser des sous-transactions, c'est sûr. L'approche la plus rapide consiste à verrouiller la table cible, puis à faire une insert ... where not exists ...ou similaire, bien sûr.
Craig Ringer
1

Depuis que cette question a été fermée, je poste ici pour savoir comment vous le faites en utilisant SQLAlchemy. Par récursivité, il réessaye une insertion en masse ou une mise à jour pour lutter contre les conditions de course et les erreurs de validation.

D'abord les importations

import itertools as it

from functools import partial
from operator import itemgetter

from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts

Maintenant, quelques fonctions d'assistance

def chunk(content, chunksize=None):
    """Groups data into chunks each with (at most) `chunksize` items.
    https://stackoverflow.com/a/22919323/408556
    """
    if chunksize:
        i = iter(content)
        generator = (list(it.islice(i, chunksize)) for _ in it.count())
    else:
        generator = iter([content])

    return it.takewhile(bool, generator)


def gen_resources(records):
    """Yields a dictionary if the record's id already exists, a row object 
    otherwise.
    """
    ids = {item[0] for item in session.query(Posts.id)}

    for record in records:
        is_row = hasattr(record, 'to_dict')

        if is_row and record.id in ids:
            # It's a row but the id already exists, so we need to convert it 
            # to a dict that updates the existing record. Since it is duplicate,
            # also yield True
            yield record.to_dict(), True
        elif is_row:
            # It's a row and the id doesn't exist, so no conversion needed. 
            # Since it's not a duplicate, also yield False
            yield record, False
        elif record['id'] in ids:
            # It's a dict and the id already exists, so no conversion needed. 
            # Since it is duplicate, also yield True
            yield record, True
        else:
            # It's a dict and the id doesn't exist, so we need to convert it. 
            # Since it's not a duplicate, also yield False
            yield Posts(**record), False

Et enfin la fonction upsert

def upsert(data, chunksize=None):
    for records in chunk(data, chunksize):
        resources = gen_resources(records)
        sorted_resources = sorted(resources, key=itemgetter(1))

        for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
            items = [g[0] for g in group]

            if dupe:
                _upsert = partial(session.bulk_update_mappings, Posts)
            else:
                _upsert = session.add_all

            try:
                _upsert(items)
                session.commit()
            except IntegrityError:
                # A record was added or deleted after we checked, so retry
                # 
                # modify accordingly by adding additional exceptions, e.g.,
                # except (IntegrityError, ValidationError, ValueError)
                db.session.rollback()
                upsert(items)
            except Exception as e:
                # Some other error occurred so reduce chunksize to isolate the 
                # offending row(s)
                db.session.rollback()
                num_items = len(items)

                if num_items > 1:
                    upsert(items, num_items // 2)
                else:
                    print('Error adding record {}'.format(items[0]))

Voici comment vous l'utilisez

>>> data = [
...     {'id': 1, 'text': 'updated post1'}, 
...     {'id': 5, 'text': 'updated post5'}, 
...     {'id': 1000, 'text': 'new post1000'}]
... 
>>> upsert(data)

L'avantage que cela présente bulk_save_objectsest qu'il peut gérer les relations, la vérification des erreurs, etc. lors de l'insertion (contrairement aux opérations en bloc ).

reubano
la source
Cela me semble également faux. Que se passe-t-il si une session simultanée insère une ligne après avoir collecté votre liste d'ID? Ou en supprime un?
Craig Ringer
bon point @CraigRinger Je fais quelque chose de similaire, mais je n'ai qu'une seule session pour effectuer le travail. Quelle est alors la meilleure façon de gérer plusieurs sessions? Une transaction peut-être?
reubano
Les transactions ne sont pas la solution magique à tous les problèmes de concurrence. Vous pouvez utiliser des SERIALIZABLE transactions et gérer les échecs de sérialisation, mais c'est lent. Vous avez besoin de la gestion des erreurs et d'une boucle de nouvelle tentative. Voir ma réponse et la section «lectures connexes».
Craig Ringer
@CraigRinger gotcha. J'ai en fait implémenté une boucle de nouvelle tentative dans mon propre cas en raison d'autres échecs de validation. Je mettrai à jour cette réponse en conséquence.
reubano