Ich arbeite mit Python2.7 mit Futures-Modul installiert.Wie implementiert man Multithreading mit Tornado?
Ich versuche Multithreading in Tornado mit ThreadPoolExecutor zu implementieren.
Hier ist der Code, den ich implementiert habe.
from __future__ import absolute_import
from base_handler import BaseHandler
from tornado import gen
from pyrestful import mediatypes
from pyrestful.rest import get, post, put, delete
from bson.objectid import ObjectId
from spark_map import Map
from concurrent import futures
import tornado
class MapService(BaseHandler):
MapDB = dict()
executor = futures.ProcessPoolExecutor(max_workers=3)
@tornado.web.asynchronous
@gen.coroutine
@post(_path='/map', _type=[str, str])
def postMap(self, inp, out):
db = self.settings['db']
function = lambda (x,y): (x,y[0]*2)
future = yield db.MapInfo.insert({'input': inp, 'output': out, 'input_function': str(function)})
response = {"inserted ID": str(future)}
self.write(response)
m = Map(inp, out, function, appName=str(future))
futuree = self.executor.submit(m.operation())
self.MapDB[str(future)] = {'map_object': m, 'running_process_future_object': futuree}
self.finish()
@tornado.web.asynchronous
@gen.coroutine
@delete(_path='/map/{_id}', _types=[str])
def deleteMap(self, _id):
db = self.settings['db']
future = yield db.MapInfo.find_one({'_id': ObjectId(_id)})
if future is None:
raise AttributeError('No entry exists in the database with the provided ID')
chk = yield db.MapInfo.remove(future)
response = { "Succes": "OK" }
self.write(response)
self.MapDB[_id]['map_object'].stop()
del self.MapDB[_id]
self.finish()
Im obigen Code erhalte ich zwei Eingaben mit der Post-Anfrage in Inp und Out. Dann führe ich eine Operation mit ihnen durch. Diese Operation sollte dauern, bis eine Löschanforderung empfangen wird, um den Prozess zu stoppen und zu entfernen.
Das Problem, mit dem ich konfrontiert bin, ist mit den mehreren Anfragen. Es führt nur die erste Anforderung aus, während andere Anforderungen auf die erste Anforderung warten, wodurch der Haupt-IOLoop blockiert wird.
Also, ich möchte jeden Prozess in einem separaten Thread ausführen. Wie soll ich es umsetzen?
Was macht m.operation()? –
m.operation() ist eine Funktion der benutzerdefinierten Klasse Map, die ich implementiert habe. Es führt im Grunde einige Berechnungen für einige Daten durch. Diese Operation sollte ausgeführt werden, bis eine Löschanforderung für diese bestimmte Operation empfangen wird. – vidhan
Klingt, als ob Sie in m.operation() blockieren und Sie müssen es asynchron umschreiben. Wir müssten jedoch etwas Code für operation() sehen, um sicher zu sein. Oder Sie können einfach einen "Ausdruck" vor und nach dem Aufruf von operation() einfügen. –