0

Ich habe mehrere Threads, die Daten verarbeiten und in eine Warteschlange stellen, und einen einzelnen Thread, der Daten aus einer Warteschlange aufnimmt und dann in einer Datenbank speichert.Wie sqlite-Verbindung im Daemon-Thread zu schließen?

denke ich, die folgenden wird ein Speicherleck verursachen:

class DBThread(threading.Thread): 
    def __init__(self, myqueue): 
     threading.Thread.__init__(self) 
     self.myqueue = myqueue 

    def run(self): 
     conn = sqlite3.connect("test.db") 
     c = conn.cursor() 

     while True: 
      data = myqueue.get() 
      if data: 
       c.execute("INSERT INTO test (data) VALUES (?)", (data,)) 
       conn.commit() 

      self.myqueue.task_done() 

     #conn.close() <--- never reaches this point 

q = Queue.Queue() 

# Create other threads 
.... 

# Create DB thread 
t = DBThread(q) 
t.setDaemon(True) 
t.start() 

q.join() 

ich nicht die conn.close() in der while-Schleife setzen kann, weil ich denke, dass die Verbindung auf der ersten Schleife wird geschlossen. Ich kann es nicht in die if data:-Anweisung schreiben, weil dann keine Daten gespeichert werden, die später in die Warteschlange gestellt werden.

Wo schließe ich die db Verbindung? Wenn ich es nicht schließe, führt das nicht zu einem Speicherleck?

Antwort

0

Wenn Sie einen Sentinel-Wert verwenden können, der in Ihren normalen Daten nicht angezeigt wird, z. None, können Sie den Faden Signal an die Datenbankverbindung in einer finally Klausel zu stoppen und schließen:

import threading 
import Queue 
import sqlite3 

class DBThread(threading.Thread): 
    def __init__(self, myqueue, db_path): 
     threading.Thread.__init__(self) 
     self.myqueue = myqueue 
     self.db_path = db_path 

    def run(self): 
     conn = sqlite3.connect(self.db_path) 

     try: 
      while True: 
       data = self.myqueue.get()  
       if data is None: # check for sentinel value 
        break 

       with conn: 
        conn.execute("INSERT INTO test (data) VALUES (?)", (data,)) 
       self.myqueue.task_done() 
     finally: 
      conn.close() 


q = Queue.Queue() 
for i in range(100): 
    q.put(str(i)) 

conn = sqlite3.connect('test.db') 
conn.execute('create table if not exists test (data text)') 
conn.close() 

t = DBThread(q, 'test.db') 
t.start() 

q.join() 
q.put(None) # tell database thread to terminate 

Wenn Sie keinen Sentinel-Wert verwenden, können Sie einen Flag zur Klasse hinzufügen können, die in der while Schleife überprüft wird. Fügen Sie der Thread-Klasse, die das Flag festlegt, auch eine stop()-Methode hinzu. Sie werden Queue.get() verwenden müssen, eine non-blocking:

class DBThread(threading.Thread): 
    def __init__(self, myqueue, db_path): 
     threading.Thread.__init__(self) 
     self.myqueue = myqueue 
     self.db_path = db_path 
     self._terminate = False 

    def terminate(self): 
     self._terminate = True 

    def run(self): 
     conn = sqlite3.connect(self.db_path) 

     try: 
      while not self._terminate: 
       try: 
        data = self.myqueue.get(timeout=1) 
       except Queue.Empty: 
        continue 

       with conn: 
        conn.execute("INSERT INTO test (data) VALUES (?)", (data,)) 
       self.myqueue.task_done() 
     finally: 
      conn.close() 

.... 
q.join() 
t.terminate() # tell database thread to terminate 

Schließlich ist es wert zu erwähnen, dass Ihr Programm beenden könnte, wenn der db-Thread die Warteschlange zu entwässern verwaltet, das heißt, wenn q.join() zurückkehrt. Dies liegt daran, dass der DB-Thread ein Daemon-Thread ist und nicht verhindert, dass der Hauptthread beendet wird. Sie müssen sicherstellen, dass Ihre Worker-Threads genügend Daten erzeugen, um den db-Thread beschäftigt zu halten, andernfalls wird q.join() zurückgegeben und der Haupt-Thread wird beendet.

+0

Was bedeutet 'mit conn:' in diesem Fall? Normalerweise räumt es Ressourcen auf (in diesem Fall die Verbindung), was wir an dieser Stelle nicht wollen? – Caramiriel

+1

@Caramiriel: Das ist richtig, wir wollen an diesem Punkt nicht aufräumen. Es tut jedoch nicht, was Sie denken; Es implementiert ein automatisches Commit/Rollback der Transaktion. Siehe [Verwenden der Verbindung als Kontextmanager] (https://docs.python.org/2/library/sqlite3.html#using-the-connection-as-a-context-manager). – mhawke