2009-08-25 9 views
8

Ich benutze SQLAlchemy mit einem Postgres-Backend, um ein Bulk-Insert-or-update durchzuführen. Um zu versuchen, die Leistung zu verbessern, versuche ich, nur einmal alle tausend Zeilen oder so festschreiben:Wie führe ich eine Masseneinfügung oder -aktualisierung mit SQLAlchemy effizient durch?

trans = engine.begin() 
    for i, rec in enumerate(records): 
    if i % 1000 == 0: 
     trans.commit() 
     trans = engine.begin() 
    try: 
     inserter.execute(...) 
    except sa.exceptions.SQLError: 
     my_table.update(...).execute() 
trans.commit() 

Allerdings funktioniert dies nicht. Es scheint, dass wenn die INSERT fehlschlägt, Dinge in einem seltsamen Zustand bleiben, der das UPDATE verhindert. Wird die Transaktion automatisch zurückgesetzt? Wenn ja, kann dies gestoppt werden? Ich möchte nicht, dass meine gesamte Transaktion im Falle eines Problems zurückgesetzt wird. Deshalb versuche ich, die Ausnahme überhaupt zu erfassen.

Die Fehlermeldung, die ich bekomme, BTW, ist "sqlalchemy.exc.InternalError: (InternalError) aktuelle Transaktion wird abgebrochen, Befehle bis zum Ende des Transaktionsblocks ignoriert", und es geschieht auf dem update().) Anruf.

Antwort

5

Sie treffen ein seltsames Postgresql-spezifisches Verhalten: Wenn ein Fehler in einer Transaktion auftritt, wird die gesamte Transaktion rückgängig gemacht. Ich halte das für einen Postgres Design Bug; Es bedarf einiger SQL-Kontorsionen, um in einigen Fällen zu umgehen.

Eine Problemumgehung besteht darin, das UPDATE zuerst durchzuführen. Ermitteln, ob eine Zeile tatsächlich geändert wurde, indem Sie auf cursor.rowcount schauen; Wenn es keine Zeilen geändert hat, war es nicht vorhanden, also das INSERT. (Dies wird schneller, wenn Sie mehr aktualisieren häufig als Sie einfügen, natürlich.)

Eine andere Lösung ist Savepoints zu verwenden:

SAVEPOINT a; 
INSERT INTO ....; 
-- on error: 
ROLLBACK TO SAVEPOINT a; 
UPDATE ...; 
-- on success: 
RELEASE SAVEPOINT a; 

Dies hat ein ernstes Problem für die Produktion hochwertigen Code: Sie müssen Ermitteln Sie den Fehler genau. Vermutlich erwarten Sie eine eindeutige Constraint-Prüfung, aber Sie treffen möglicherweise etwas Unerwartetes und es ist nahezu unmöglich, den erwarteten Fehler zuverlässig vom unerwarteten zu unterscheiden. Wenn dies den Fehlerzustand falsch trifft, führt dies zu obskuren Problemen, bei denen nichts aktualisiert oder eingefügt wird und kein Fehler angezeigt wird. Sei sehr vorsichtig damit. Sie können den Fehlerfall eingrenzen, indem Sie sich den Fehlercode von Postgresql ansehen, um sicherzustellen, dass es sich um den Fehlertyp handelt, den Sie erwarten, aber das potenzielle Problem ist immer noch vorhanden.

Schließlich, wenn Sie wirklich Batch-Insert-oder-Update tun möchten, möchten Sie tatsächlich viele von ihnen in ein paar Befehle, nicht ein Element pro Befehl. Dies erfordert komplexere SQL: SELECT verschachtelt innerhalb eines INSERT, Filterung der richtigen Elemente zum Einfügen und Aktualisieren.

+1

"Wenn in einer Transaktion ein Fehler auftritt, wird die gesamte Transaktion rückgängig gemacht. Dies ist ein Postgres-Designfehler." - Ist das nicht der Punkt der Transaktionen? Aus [Wikipedia] (http: //en.wikipedia.org/wiki/Database_transaction): "Transaktionen bieten einen Alles-oder-Nichts-Vorschlag, der besagt, dass jede Arbeitseinheit, die in einer Datenbank ausgeführt wird, entweder vollständig ausgeführt werden muss oder überhaupt keinen Effekt hat." – spiffytech

+0

@ Spiffytech Gute Antwort. Das hat mich wirklich LOL gemacht. –

4

Dieser Fehler stammt von PostgreSQL. PostgreSQL erlaubt Ihnen nicht, Befehle in derselben Transaktion auszuführen, wenn ein Befehl einen Fehler erzeugt. Um dies zu beheben, können Sie verschachtelte Transaktionen (implementiert mit SQL-Sicherungspunkten) über conn.begin_nested() verwenden. Hier ist etwas, das funktionieren könnte. Ich machte den Code mit expliziten Verbindungen, faktorisierte den Chunking-Teil und ließ den Code den Kontextmanager verwenden, um Transaktionen korrekt zu verwalten.

from itertools import chain, islice 
def chunked(seq, chunksize): 
    """Yields items from an iterator in chunks.""" 
    it = iter(seq) 
    while True: 
     yield chain([it.next()], islice(it, chunksize-1)) 

conn = engine.commit() 
for chunk in chunked(records, 1000): 
    with conn.begin(): 
     for rec in chunk: 
      try: 
       with conn.begin_nested(): 
        conn.execute(inserter, ...) 
      except sa.exceptions.SQLError: 
       conn.execute(my_table.update(...)) 

Dies wird immer noch nicht stellaren Leistung, obwohl verschachtelten Transaktion Overhead. Wenn Sie eine bessere Leistung wünschen, versuchen Sie, mit einer Select-Abfrage zu erkennen, welche Zeilen Fehler verursachen, und verwenden Sie ausführbare Unterstützung (Ausführen kann eine Liste von Dicts aufnehmen, wenn alle Inserts dieselben Spalten verwenden). Wenn Sie mit gleichzeitigen Aktualisierungen arbeiten müssen, müssen Sie die Fehlerbehandlung entweder durch wiederholtes oder wiederholtes Einfügen von Einfügungen durchführen.