2010-01-18 12 views
6

Ich habe eine große XML-Datendatei (> 160M) zu verarbeiten, und es scheint wie SAX/Expat/Pulldom Parsing ist der Weg zu gehen. Ich würde gerne einen Thread haben, der sich durch die Knoten bewegt und Knoten zur Verarbeitung in eine Warteschlange schiebt, und dann ziehen andere Arbeiter-Threads den nächsten verfügbaren Knoten aus der Warteschlange und verarbeiten ihn.Wie kann ich XML in Python asynchron verarbeiten?

Ich habe folgendes (es sollte Schlösser haben, ich weiß - es wird, später)

import sys, time 
import xml.parsers.expat 
import threading 

q = [] 

def start_handler(name, attrs): 
    q.append(name) 

def do_expat(): 
    p = xml.parsers.expat.ParserCreate() 
    p.StartElementHandler = start_handler 
    p.buffer_text = True 
    print("opening {0}".format(sys.argv[1])) 
    with open(sys.argv[1]) as f: 
     print("file is open") 
     p.ParseFile(f) 
     print("parsing complete") 


t = threading.Thread(group=None, target=do_expat) 
t.start() 

while True: 
    print(q) 
    time.sleep(1) 

Das Problem ist, dass der Körper des while Block nur einmal aufgerufen wird, und dann kann ich nicht sogar Ctrl-C unterbricht es. Bei kleineren Dateien ist die Ausgabe wie erwartet, aber das scheint anzuzeigen, dass der Handler nur aufgerufen wird, wenn das Dokument vollständig geparst ist, was den Zweck eines SAX-Parsers zu besiegen scheint.

Ich bin sicher, es ist meine eigene Ignoranz, aber ich sehe nicht, wo ich den Fehler mache.

PS: Ich habe auch versucht start_handler so zu ändern:

def start_handler(name, attrs): 
    def app(): 
     q.append(name) 
    u = threading.Thread(group=None, target=app) 
    u.start() 

keine Liebe, though.

Antwort

7

ParseFile, wie Sie bemerkt haben, nur "schluckt" alles - nicht gut für die inkrementelle Parsing Sie tun möchten! So füttern Sie einfach die Datei an den Parser ein wenig zu einem Zeitpunkt, um sicherzustellen, bedingt Kontrolle zu anderen Threads zu ergeben, wie Sie gehen - zB:

while True: 
    data = f.read(BUFSIZE) 
    if not data: 
    p.Parse('', True) 
    break 
    p.Parse(data, False) 
    time.sleep(0.0) 

der time.sleep(0.0) Aufruf ist der Weg der Python zu sagen: „Ausbeute an andere Threads, wenn welche bereit sind und warten "; Die Parse Methode ist dokumentiert here.

Der zweite Punkt ist, vergessen Schlösser für diese Verwendung! - Verwenden Sie stattdessen Queue.Queue, es ist eigentlich Threadsafe und fast immer die beste und einfachste Möglichkeit, mehrere Threads in Python zu koordinieren. Machen Sie einfach eine Queue Instanz q, q.put(name) darauf, und haben Threads Block auf q.get() gearbeitet warten auf etwas mehr Arbeit zu tun - es ist so einfach!

(Es gibt mehrere Hilfsstrategien, die Sie verwenden können, um die Beendigung von Worker-Threads zu koordinieren, wenn es für sie keine Arbeit mehr gibt, aber die einfachsten, fehlenden speziellen Anforderungen sind, sie nur zu Daemon-Threads zu machen beenden, wenn der Haupt-Thread - siehe the docs).

+0

Sind für die Warteschlangenvorschläge ausgewählt, aber sind Sie sicher, dass ParseFile alles auf einmal verschlingt? Es ruft zurück in die Python-Handler, um die Tags zu behandeln, das ist der ganze Zweck des SAX-Parsens ... oder sagst du, dass das nicht ausreicht, um einen Threadwechsel in Python auszulösen? –

+1

Wenn Sie SAX möchten, können Sie xml.sax verwenden, siehe http://docs.python.org/library/xml.sax.html?highlight=sax#module-xml.sax; Das OP verwendet nicht SAX, sondern eher xml.parsers.expat, eine Schnittstelle mit niedrigerer Abstraktion, die ** keine ** inkrementelle Strategie vorschreibt (sie unterstützt es, aber nicht _implementiert es, so dass sie der Python-Code-Ebene entspricht) auswählen und auswählen). –

+0

Die Wahl des Expats war etwas willkürlich, ich konnte keine gute Erklärung für den Unterschied zwischen Expat und Saxophon finden. Das Sax-Modul funktioniert genauso gut - vielleicht sogar besser, weil es so asynchron zu sein scheint, wie ich es brauchte. Ich entschied mich, die Methode "füttere es ein Stück nach dem anderen" zu verwenden, da es mir die Möglichkeit gibt, die Saiten, die ich füttere, zu sterilisieren, bevor der Parser zu ihnen gelangt. Sehr hilfreiche Antwort, danke. – decitrig

1

Das einzige, was ich sehe, ist falsch ist, dass Sie gleichzeitig auf q von verschiedenen Threads zugreifen - keine Verriegelung wie Sie tatsächlich schreiben. Das verlangt Ärger - und Sie bekommen wahrscheinlich Ärger in Form des Python-Interpreters, der Sie blockiert. :)

Try-Verriegelung, es ist wirklich nicht sehr schwierig:

import sys, time 
import xml.parsers.expat 
import threading 

q = [] 
q_lock = threading.Lock() <--- 

def start_handler(name, attrs): 
    q_lock.acquire() <--- 
    q.append(name) 
    q_lock.release() <--- 

def do_expat(): 
    p = xml.parsers.expat.ParserCreate() 
    p.StartElementHandler = start_handler 
    p.buffer_text = True 
    print("opening {0}".format(sys.argv[1])) 
    with open(sys.argv[1]) as f: 
     print("file is open") 
     p.ParseFile(f) 
     print("parsing complete") 


t = threading.Thread(group=None, target=do_expat) 
t.start() 

while True: 
    q_lock.acquire() <--- 
    print(q) 
    q_lock.release() <--- 
    time.sleep(1) 

Sie sehen, es war wirklich einfach, wir gerade ein Sperrvariable unser Objekt zu schützen, und acquire, die jedes Mal sperren, bevor wir verwenden das Objekt und loslassen jedes Mal, nachdem wir unsere Aufgabe auf dem Objekt abgeschlossen haben. Auf diese Weise stellen wir sicher, dass sich q.append(name) niemals mit print(q) überschneidet.


(Bei neueren Versionen von Python gibt es auch eine „mit ....“ Syntax, die Sie nicht freigeben Sperren oder Schließen von Dateien oder andere Bereinigungen man häufig vergisst. Hilft)

7

Ich bin nicht sicher über dieses Problem. Ich vermute, der Aufruf von ParseFile blockiert und nur der Parsing-Thread wird wegen der GIL ausgeführt. Ein Weg um dies zu tun wäre, stattdessen multiprocessing zu verwenden. Es ist sowieso so konzipiert, dass es mit Warteschlangen funktioniert.

Sie machen eine Process und Sie können es eine Queue passieren:

import sys, time 
import xml.parsers.expat 
import multiprocessing 
import Queue 

def do_expat(q): 
    p = xml.parsers.expat.ParserCreate() 

    def start_handler(name, attrs): 
     q.put(name) 

    p.StartElementHandler = start_handler 
    p.buffer_text = True 
    print("opening {0}".format(sys.argv[1])) 
    with open(sys.argv[1]) as f: 
     print("file is open") 
     p.ParseFile(f) 
     print("parsing complete") 

if __name__ == '__main__': 
    q = multiprocessing.Queue() 
    process = multiprocessing.Process(target=do_expat, args=(q,)) 
    process.start() 

    elements = [] 
    while True: 
     while True: 
      try: 
       elements.append(q.get_nowait()) 
      except Queue.Empty: 
       break 

     print elements 
     time.sleep(1) 

Ich habe eine Elementeliste enthalten ist, nur Ihre Original-Skript zu replizieren. Ihre endgültige Lösung wird wahrscheinlich get_nowait und eine Pool oder etwas ähnliches verwenden.

+1

Ja, das ist ein guter Weg, um zu gehen - wie Sie sagten, Sie würden sowieso Warteschlangen verwenden wollen. –

+0

Ich habe diesen Code ausprobiert; Es vermeidet das Lockup, aber ParseFile scheint immer noch nichts auszugeben, bis es die gesamte Eingabe gelesen hat. – decitrig

0

Ich weiß nicht viel über die Implementierung, aber wenn der Parse C-Code ist, der bis zum Abschluss ausgeführt wird, werden andere Python-Threads nicht ausgeführt. Wenn der Parser wieder zu Python-Code aufruft, kann die GIL freigegeben werden, damit andere Threads ausgeführt werden können, aber ich bin mir nicht sicher. Vielleicht möchten Sie diese Details überprüfen.