2010-03-17 6 views
6

Ich habe erfolglos versucht, txredis (die nicht blockierende verdrehte API für redis) für eine persistente Nachrichtenwarteschlange zu verwenden. Ich versuche, sie mit einem Scrapy-Projekt einzurichten, an dem ich gerade arbeite. Ich stellte fest, dass der Client zwar nicht blockierte, aber viel langsamer wurde, als es hätte sein können, denn was in der Reaktorschleife ein Ereignis hätte sein sollen, wurde in Tausende von Schritten aufgeteilt.Twisted: Warum führt die Weitergabe eines verzögerten Callbacks an einen verzögerten Thread dazu, dass der Thread plötzlich blockiert?

Also habe ich versucht, redis-py (die reguläre blockierende twisted API) zu verwenden und den Anruf in einen verzögerten Thread zu wickeln. Es funktioniert gut, aber ich möchte eine innere verzögerte durchführen, wenn ich einen Anruf an Redis, wie ich Verbindungspooling in Versuchen, um die Dinge weiter beschleunigen möchten, einrichten möchten.

Unten ist meine Interpretation einiger Codebeispiel aus den verdrehten docs für einen latenten Faden genommen meine Anwendungsfall veranschaulichen:

#!/usr/bin/env python 
from twisted.internet import reactor,threads 
from twisted.internet.task import LoopingCall 
import time 

def main_loop(): 
    print 'doing stuff in main loop.. do not block me!' 


def aBlockingRedisCall(): 
    print 'doing lookup... this may take a while' 
    time.sleep(10) 
    return 'results from redis' 

def result(res): 
    print res 

def main(): 
    lc = LoopingCall(main_loop) 
    lc.start(2) 
    d = threads.deferToThread(aBlockingRedisCall) 
    d.addCallback(result) 
    reactor.run() 

if __name__=='__main__': 
    main() 

Und hier ist meine Veränderung für Connection Pooling, die den Code in dem latenten Faden macht Sperrung:

#!/usr/bin/env python 
from twisted.internet import reactor,defer 
from twisted.internet.task import LoopingCall 
import time 

def main_loop(): 
    print 'doing stuff in main loop.. do not block me!' 

def aBlockingRedisCall(x): 
    if x<5: #all connections are busy, try later 
     print '%s is less than 5, get a redis client later' % x 
     x+=1 
     d = defer.Deferred() 
     d.addCallback(aBlockingRedisCall) 
     reactor.callLater(1.0,d.callback,x) 
     return d 

    else: 
     print 'got a redis client; doing lookup.. this may take a while' 
     time.sleep(10) # this is now blocking.. any ideas? 
     d = defer.Deferred() 
     d.addCallback(gotFinalResult) 
     d.callback(x) 
     return d 

def gotFinalResult(x): 
    return 'final result is %s' % x 

def result(res): 
    print res 

def aBlockingMethod(): 
    print 'going to sleep...' 
    time.sleep(10) 
    print 'woke up' 

def main(): 
    lc = LoopingCall(main_loop) 
    lc.start(2) 


    d = defer.Deferred() 
    d.addCallback(aBlockingRedisCall) 
    d.addCallback(result) 
    reactor.callInThread(d.callback, 1) 
    reactor.run() 

if __name__=='__main__': 
    main() 

Also meine Frage ist, weiß jemand, warum meine Veränderung der latenten Faden bewirkt eine bessere Lösung zu blockieren und/oder kann jemand vorschlagen?

Antwort

12

Nun, wie die twisted docs sagen:

Deferreds den Code nicht machen magisch nicht blockieren

Jedes Mal, wenn Sie verwenden Code blockieren, wie sleep, haben Sie es verschieben zu einem neuen Thread.

#!/usr/bin/env python 
from twisted.internet import reactor,defer, threads 
from twisted.internet.task import LoopingCall 
import time 

def main_loop(): 
    print 'doing stuff in main loop.. do not block me!' 

def aBlockingRedisCall(x): 
    if x<5: #all connections are busy, try later 
     print '%s is less than 5, get a redis client later' % x 
     x+=1 
     d = defer.Deferred() 
     d.addCallback(aBlockingRedisCall) 
     reactor.callLater(1.0,d.callback,x) 
     return d 

    else: 
     print 'got a redis client; doing lookup.. this may take a while' 
     def getstuff(x): 
      time.sleep(3) 
      return "stuff is %s" % x 

     # getstuff is blocking, so you need to push it to a new thread 
     d = threads.deferToThread(getstuff, x) 
     d.addCallback(gotFinalResult) 
     return d 

def gotFinalResult(x): 
    return 'final result is %s' % x 

def result(res): 
    print res 

def aBlockingMethod(): 
    print 'going to sleep...' 
    time.sleep(10) 
    print 'woke up' 

def main(): 
    lc = LoopingCall(main_loop) 
    lc.start(2) 


    d = defer.Deferred() 
    d.addCallback(aBlockingRedisCall) 
    d.addCallback(result) 
    reactor.callInThread(d.callback, 1) 
    reactor.run() 

if __name__=='__main__': 
    main() 

Falls die redis api ist nicht sehr komplex, es könnte natürlicher sein, es neu zu schreiben twisted.web verwenden, anstatt nur die Blockierung api in vielen Threads aufrufen. http://github.com/deldotdr/txRedis

+0

genial Dank! – surtyaar

0

über einen entsprechenden Hinweis, könnten Sie wahrscheinlich speziell für Verdreht, geschaffen wie diese eine Menge unter Verwendung einen Redis-Client gewinnen das neue Protokoll und die Funktionen von Redis 2.x. Sie sollten es definitiv versuchen. Es heißt txredisapi.

Für die persistente Nachrichtenwarteschlange würde ich RestMQ empfehlen. Ein Redis-basiertes Nachrichtenwarteschlangensystem, das auf Zyklon und Txredisapi aufgebaut ist.

http://github.com/gleicon/restmq

Prost

+1

Das OP sagt in der ersten Zeile, dass er versuchte, txRedis zu verwenden. – pr1001

1

gibt es auch einen up-to-date Redis-Client für verdrehten die bereits unterstützt: