Insertion groupée avec SQLAlchemy ORM

131

Existe-t-il un moyen pour SQLAlchemy de faire une insertion en bloc plutôt que d'insérer chaque objet individuel. c'est à dire,

Faire:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

plutôt que:

INSERT INTO `foo` (`bar`) VALUES (1)
INSERT INTO `foo` (`bar`) VALUES (2)
INSERT INTO `foo` (`bar`) VALUES (3)

Je viens de convertir du code pour utiliser sqlalchemy plutôt que sql brut et bien qu'il soit maintenant beaucoup plus agréable de travailler avec, il semble être plus lent maintenant (jusqu'à un facteur de 10), je me demande si c'est la raison.

Peut-être pourrais-je améliorer la situation en utilisant des sessions plus efficacement. Pour le moment, je l'ai fait autoCommit=Falseet session.commit()j'ai ajouté quelques trucs. Bien que cela semble rendre les données obsolètes si la base de données est modifiée ailleurs, comme même si je fais une nouvelle requête, je récupère toujours les anciens résultats?

Merci de votre aide!

Nick Holden
la source
1
Cela pourrait aider: stackoverflow.com/questions/270879/…
Sean Vieira
1
Nick, je comprends que c'est un très vieux message. Serait-il possible de mettre à jour le titre avec quelque chose de correct comme "insertion d'enregistrements multiples avec SQLAlchemy ORM". Les instructions d'insertion multi-enregistrements comme celle que vous avez fournie sont assez différentes des opérations de chargement en masse au niveau de la base de données. Les insertions en masse sont destinées à des téléchargements de 1k + de données, généralement à partir de grands ensembles de données et effectués par des gestionnaires d'applications, pas des opérations REST ou du code au niveau de l'application .... Utilisons notre nomenclature correctement.
W4t3randWind
Pour ceux qui tombent sur cette question en cherchant des informations sur les opérations en bloc dans sqlalchemy Core (pas ORM), voir ma réponse à une autre question .
Nickolay

Réponses:

174

SQLAlchemy a introduit cela dans la version 1.0.0:

Opérations en masse - SQLAlchemy Docs

Avec ces opérations, vous pouvez désormais effectuer des insertions ou des mises à jour en masse!

Par exemple, vous pouvez faire:

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

Ici, une insertion en vrac sera faite.

Pierre
la source
30
Vous avez également besoin de s.commit () pour sauvegarder les enregistrements (il m'a fallu un peu de temps pour comprendre celui-ci).
horcle_buzz
3
J'ai essayé cela avec sqlachemy 1.0.11 et il fait toujours 3 instructions d'insertion. Mais c'est beaucoup plus rapide que les opérations orm normales.
zidarsk8
3
bien que cela ne soit pas pertinent pour la question des OP, il convient de mentionner que cela rompt certaines fonctionnalités de l'ORM. docs.sqlalchemy.org/en/rel_1_0/orm/…
dangel
@dangel oui merci d'avoir publié ceci. Bien que le titre d'OP concerne le "chargement en masse", sa question sur les instructions d'insertion multi-enregistrements n'a rien à voir avec la fonction de chargement en masse de sqlalchemy.
W4t3randWind
Par rapport à l'insertion des mêmes données à partir de CSV avec \copyavec psql (du même client vers le même serveur), je constate une énorme différence de performances côté serveur, ce qui entraîne environ 10 fois plus d'insert / s. Apparemment, le chargement en masse en utilisant \copy(ou COPYsur le serveur) en utilisant un emballage pour communiquer de client à serveur est BEAUCOUP mieux que d'utiliser SQL via SQLAlchemy. Plus d' infos: Grande différence d' insertion en bloc de performance PostgreSQL vs ... .
gertvdijk
42

Les documents sqlalchemy ont un aperçu des performances de diverses techniques pouvant être utilisées pour les insertions en masse:

Les ORM ne sont fondamentalement pas destinés aux insertions en masse hautes performances - c'est la raison pour laquelle SQLAlchemy propose le Core en plus de l'ORM en tant que composant de première classe.

Pour le cas d'utilisation d'insertions en masse rapides, le système de génération et d'exécution SQL sur lequel l'ORM construit fait partie du Core. En utilisant ce système directement, nous pouvons produire un INSERT qui est compétitif avec l'utilisation directe de l'API de base de données brute.

Alternativement, SQLAlchemy ORM propose la suite de méthodes Bulk Operations, qui fournit des crochets dans des sous-sections du processus d'unité de travail afin d'émettre des constructions INSERT et UPDATE de niveau Core avec un petit degré d'automatisation ORM.

L'exemple ci-dessous illustre des tests basés sur le temps pour plusieurs méthodes différentes d'insertion de lignes, allant du plus automatisé au moins. Avec cPython 2.7, les runtimes observés:

classics-MacBook-Pro:sqlalchemy classic$ python test.py
SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs
SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs
SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs
SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs
sqlite3: Total time for 100000 records 0.487842082977 sec

Scénario:

import time
import sqlite3

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String,  create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

Base = declarative_base()
DBSession = scoped_session(sessionmaker())
engine = None


class Customer(Base):
    __tablename__ = "customer"
    id = Column(Integer, primary_key=True)
    name = Column(String(255))


def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'):
    global engine
    engine = create_engine(dbname, echo=False)
    DBSession.remove()
    DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)


def test_sqlalchemy_orm(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer()
        customer.name = 'NAME ' + str(i)
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_pk_given(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer(id=i+1, name="NAME " + str(i))
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM pk given: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_bulk_insert(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    n1 = n
    while n1 > 0:
        n1 = n1 - 10000
        DBSession.bulk_insert_mappings(
            Customer,
            [
                dict(name="NAME " + str(i))
                for i in xrange(min(10000, n1))
            ]
        )
    DBSession.commit()
    print(
        "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
        [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )
    print(
        "SQLAlchemy Core: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def init_sqlite3(dbname):
    conn = sqlite3.connect(dbname)
    c = conn.cursor()
    c.execute("DROP TABLE IF EXISTS customer")
    c.execute(
        "CREATE TABLE customer (id INTEGER NOT NULL, "
        "name VARCHAR(255), PRIMARY KEY(id))")
    conn.commit()
    return conn


def test_sqlite3(n=100000, dbname='sqlite3.db'):
    conn = init_sqlite3(dbname)
    c = conn.cursor()
    t0 = time.time()
    for i in xrange(n):
        row = ('NAME ' + str(i),)
        c.execute("INSERT INTO customer (name) VALUES (?)", row)
    conn.commit()
    print(
        "sqlite3: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " sec")

if __name__ == '__main__':
    test_sqlalchemy_orm(100000)
    test_sqlalchemy_orm_pk_given(100000)
    test_sqlalchemy_orm_bulk_insert(100000)
    test_sqlalchemy_core(100000)
    test_sqlite3(100000)
Grant Humphries
la source
1
Je vous remercie. Vraiment utile et minutieux.
Steve B.
J'ai vu un autre exemple utilisant bindparams. La syntaxe semble succincte, est-ce que c'est bon?
Jay
35

Autant que je sache, il n'y a aucun moyen pour que l'ORM émette des insertions en masse. Je crois que la raison sous-jacente est que SQLAlchemy doit garder une trace de l'identité de chaque objet (c'est-à-dire, les nouvelles clés primaires), et les insertions en masse interfèrent avec cela. Par exemple, en supposant que votre footable contient une idcolonne et est mappée à une Fooclasse:

x = Foo(bar=1)
print x.id
# None
session.add(x)
session.flush()
# BEGIN
# INSERT INTO foo (bar) VALUES(1)
# COMMIT
print x.id
# 1

Puisque SQLAlchemy a récupéré la valeur pour x.idsans émettre une autre requête, nous pouvons en déduire qu'il a obtenu la valeur directement à partir de l' INSERTinstruction. Si vous n'avez pas besoin d'un accès ultérieur aux objets créés via les mêmes instances, vous pouvez ignorer la couche ORM pour votre insertion:

Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}])
# INSERT INTO foo (bar) VALUES ((1,), (2,), (3,))

SQLAlchemy ne peut pas faire correspondre ces nouvelles lignes avec des objets existants, vous devrez donc les interroger à nouveau pour les opérations suivantes.

En ce qui concerne les données obsolètes, il est utile de se rappeler que la session n'a aucun moyen intégré de savoir quand la base de données est modifiée en dehors de la session. Pour accéder aux données modifiées en externe via des instances existantes, les instances doivent être marquées comme expirées . Cela se produit par défaut sur session.commit(), mais peut être fait manuellement en appelantsession.expire_all() ou session.expire(instance). Un exemple (SQL omis):

x = Foo(bar=1)
session.add(x)
session.commit()
print x.bar
# 1
foo.update().execute(bar=42)
print x.bar
# 1
session.expire(x)
print x.bar
# 42

session.commit()expire x, donc la première instruction print ouvre implicitement une nouvelle transaction et ré-interrogex les attributs de. Si vous mettez en commentaire la première instruction d'impression, vous remarquerez que la seconde prend maintenant la valeur correcte, car la nouvelle requête n'est émise qu'après la mise à jour.

Cela a du sens du point de vue de l'isolation transactionnelle - vous ne devez prendre en compte que les modifications externes entre les transactions. Si cela vous cause des problèmes, je vous suggère de clarifier ou de repenser les limites de transaction de votre application au lieu de chercher immédiatement session.expire_all().

Dhaffey
la source
Merci pour votre réponse, je vais essayer. WRT le problème expirant, ce que j'ai vu n'était pas tout à fait le même. J'utilise une session scoped en turbogears. Exécution d'une requête getSession (). Query (Foo) .filter .... all () a retourné des choses différentes en fonction de la demande, n'a pas non plus renvoyé les enregistrements mis à jour qui étaient dans la base de données jusqu'à ce que je l'ai redémarré. J'ai résolu ce problème en faisant un autocommit = True et en ajoutant quelque chose qui .remove () d la session une fois la demande terminée (je suppose que vous êtes censé le faire de toute façon).
Nick Holden
Je suppose que cela a renvoyé des choses différentes en fonction de la demande, car il y avait une session étendue par thread dans le pool et les sessions étaient dans des états différents? Cependant, il semblait un peu étrange que sa n'obtienne pas de nouvelles données après une nouvelle demande. Je suppose que je comprends mal ce que fait l'autocommit = False
Nick Holden
Avec autocommit=False, je pense que vous devriez appeler session.commit()à la fin de la demande (je ne suis pas familier avec TurboGears, alors ignorez cela si cela est géré pour vous au niveau du cadre). En plus de vous assurer que vos modifications ont été apportées à la base de données, cela expirerait tout dans la session. La prochaine transaction ne commencerait pas avant la prochaine utilisation de cette session, donc les futures demandes sur le même thread ne verront pas les données périmées.
dhaffey
10
Style alternatif:session.execute(Foo.__table__.insert(), values)
Joril
6
Notez que les nouvelles versions de sqlalchemy ont des capacités d'insertion en bloc: docs.sqlalchemy.org/en/latest/orm/…
Wayne Werner
18

Je le fais habituellement en utilisant add_all.

from app import session
from models import User

objects = [User(name="u1"), User(name="u2"), User(name="u3")]
session.add_all(objects)
session.commit()
reubano
la source
2
Êtes-vous sûr que cela fonctionne? Cela ne fait pas simplement l'équivalent de .addles insérer à la session une à la fois?
Alec
Ce serait contre-intuitif étant donné le nom de la méthode, les documents ne rentrent pas dans les détails: Add the given collection of instances to this Session.avez-vous des raisons de croire qu'elle ne fait pas d'insertion en masse?
reubano
3
Je ne pense pas que ce soit trop contre-intuitif - cela ajoute en fait tout ce que vous lui demandez. Rien sur l'ajout de toutes les choses à la session ne semble impliquer quelles instructions SQL sous-jacentes sont émises. En regardant la source: github.com/zzzeek/sqlalchemy/blob/ ... il semble en fait juste .addchaque élément individuellement.
Alec
Cela fonctionne bien, comparé à bulk_save_objects(), avec a flush(), nous pouvons obtenir l'ID de l'objet, mais bulk_save_objects()pas (événement avec flush()appelé).
coanor
14

Le support direct a été ajouté à SQLAlchemy à partir de la version 0.8

Selon la documentation , connection.execute(table.insert().values(data))devrait faire l'affaire. (Notez que ce n'est pas la même chose que connection.execute(table.insert(), data)ce qui entraîne de nombreuses insertions de lignes individuelles via un appel à executemany). Sur tout sauf une connexion locale, la différence de performances peut être énorme.

user3805082
la source
10

SQLAlchemy a introduit cela dans la version 1.0.0:

Opérations en masse - SQLAlchemy Docs

Avec ces opérations, vous pouvez désormais effectuer des insertions ou des mises à jour en masse!

Par exemple (si vous voulez la plus faible surcharge pour les INSERT de table simples), vous pouvez utiliser Session.bulk_insert_mappings():

loadme = [(1, 'a'),
          (2, 'b'),
          (3, 'c')]
dicts = [dict(bar=t[0], fly=t[1]) for t in loadme]

s = Session()
s.bulk_insert_mappings(Foo, dicts)
s.commit()

Ou, si vous le souhaitez, ignorez les loadmetuples et écrivez les dictionnaires directement dans dicts(mais je trouve plus facile de laisser toute la verbosité des données et de charger une liste de dictionnaires en boucle).

Juanitogan
la source
7

La réponse de Piere est correcte, mais un problème est que, bulk_save_objectspar défaut, ne renvoie pas les clés primaires des objets, si cela vous concerne. Réglez return_defaultssur Truepour obtenir ce comportement.

La documentation est ici .

foos = [Foo(bar='a',), Foo(bar='b'), Foo(bar='c')]
session.bulk_save_objects(foos, return_defaults=True)
for foo in foos:
    assert foo.id is not None
session.commit()
Matthew Moisen
la source
2
Une prudence doit être prise avec le drapeau. Il insérera un objet à la fois séquentiellement et le gain de performance significatif peut ne pas être là [1]. Dans mon cas, les performances se sont dégradées, ce que je soupçonnais en raison de la surcharge. [1]: docs.sqlalchemy.org/en/13/orm/…
dhfromkorea
6

Toutes les routes mènent à Rome , mais certaines d'entre elles traversent des montagnes, nécessitent des ferries, mais si vous voulez vous y rendre rapidement, prenez simplement l'autoroute.


Dans ce cas, l'autoroute doit utiliser la fonction execute_batch () de psycopg2 . La documentation le dit le mieux:

La mise en œuvre actuelle de executemany()(en utilisant un euphémisme extrêmement charitable) n'est pas particulièrement performante. Ces fonctions peuvent être utilisées pour accélérer l'exécution répétée d'une instruction par rapport à un ensemble de paramètres. En réduisant le nombre d’allers-retours du serveur, les performances peuvent être bien meilleures que l’utilisation executemany().

Dans mon propre test, execute_batch()c'est environ deux fois plus rapide que executemany(), et donne la possibilité de configurer le page_size pour un peaufinage supplémentaire (si vous voulez extraire les derniers 2-3% des performances du pilote).

La même fonctionnalité peut facilement être activée si vous utilisez SQLAlchemy en définissant use_batch_mode=Truecomme paramètre lorsque vous instanciez le moteur aveccreate_engine()

chjortlund
la source
Remarque: psycopg2 execute_valuesest plus rapide que psycopg2 execute_batchlors de l'insertion groupée !
Fierr
5

C'est un moyen:

values = [1, 2, 3]
Foo.__table__.insert().execute([{'bar': x} for x in values])

Cela va insérer comme ceci:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

Référence: La FAQ SQLAlchemy inclut des benchmarks pour diverses méthodes de commit.

Eefret
la source
3

La meilleure réponse que j'ai trouvée jusqu'à présent était dans la documentation de sqlalchemy:

http://docs.sqlalchemy.org/en/latest/faq/performance.html#im-inserting-400-000-rows-with-the-orm-and-it-s-really-slow

Il existe un exemple complet de benchmark de solutions possibles.

Comme indiqué dans la documentation:

bulk_save_objects n'est pas la meilleure solution mais ses performances sont correctes.

Je pense que la deuxième meilleure implémentation en termes de lisibilité était avec SQLAlchemy Core:

def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
            [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )

Le contexte de cette fonction est donné dans l'article de documentation.

lelabo_m
la source