2010-12-08 4 views
1

implementiert hallo ich arbeite an entwickeln einen Rpc-Bediener, der auf verdreht basiert wird, um einige microcontrollers zu dienen, die rpc Anruf zum verdrehten jsonrpc Bediener bilden. Aber die Anwendung erfordert auch, dass Server Informationen zu jedem Mikro zu jeder Zeit senden, so ist die Frage, wie eine gute Praxis sein könnte zu verhindern, dass die Antwort von einem Remote-jsonrpc Aufruf von einem Mikro mit einem Server jsonrpc Anfrage verwechselt werden, für die gemacht wird Ein Benutzer. Die Konsequenz, die ich jetzt habe, ist, dass Mikros schlechte Informationen erhalten, weil sie nicht wissen, ob die vom Socket kommende Zeichenfolge netstring/json ihre Antwort von einer vorherigen Anforderung ist oder eine neue Anforderung vom Server ist.Wie man einen Zweiweg jsonrpc + verdrehten Bediener/Klient

Hier ist mein Code:

from twisted.internet import reactor 
from txjsonrpc.netstring import jsonrpc 
import weakref 

creds = {'user1':'pass1','user2':'pass2','user3':'pass3'} 

class arduinoRPC(jsonrpc.JSONRPC): 
    def connectionMade(self): 
     pass 

    def jsonrpc_identify(self,username,password,mac): 
     """ Each client must be authenticated just after to be connected calling this rpc """ 
     if creds.has_key(username): 
      if creds[username] == password: 
       authenticated = True 
      else: 
       authenticated = False 
     else: 
      authenticated = False 

     if authenticated: 
      self.factory.clients.append(self) 
      self.factory.references[mac] = weakref.ref(self) 
      return {'results':'Authenticated as %s'%username,'error':None} 
     else: 
      self.transport.loseConnection() 

    def jsonrpc_sync_acq(self,data,f): 
     """Save into django table data acquired from sensors and send ack to gateway""" 
     if not (self in self.factory.clients): 
      self.transport.loseConnection() 
     print f 
     return {'results':'synced %s records'%len(data),'error':'null'} 

    def connectionLost(self, reason): 
     """ mac address is searched and all reference to self.factory.clientes are erased """ 
     for mac in self.factory.references.keys(): 
      if self.factory.references[mac]() == self: 
       print 'Connection closed - Mac address: %s'%mac 
       del self.factory.references[mac] 
       self.factory.clients.remove(self) 


class rpcfactory(jsonrpc.RPCFactory): 
    protocol = arduinoRPC 
    def __init__(self, maxLength=1024): 
     self.maxLength = maxLength 
     self.subHandlers = {} 
     self.clients = [] 
     self.references = {} 

""" Asynchronous remote calling to micros, simulating random calling from server """ 
import threading,time,random,netstring,json 
class asyncGatewayCalls(threading.Thread): 
    def __init__(self,rpcfactory): 
     threading.Thread.__init__(self) 
     self.rpcfactory = rpcfactory 
     """identifiers of each micro/client connected""" 
     self.remoteMacList = ['12:23:23:23:23:23:23','167:67:67:67:67:67:67','90:90:90:90:90:90:90'] 
    def run(self): 
     while True: 
      time.sleep(10) 
      while True: 
       """ call to any of three potential micros connected """ 
       mac = self.remoteMacList[random.randrange(0,len(self.remoteMacList))] 
       if self.rpcfactory.references.has_key(mac): 
        print 'Calling %s'%mac 
        proto = self.rpcfactory.references[mac]() 
        """ requesting echo from selected micro""" 
        dataToSend = netstring.encode(json.dumps({'method':'echo_from_micro','params':['plop']})) 
        proto.transport.write(dataToSend) 
        break 

factory = rpcfactory(arduinoRPC) 

"""start thread caller""" 
r=asyncGatewayCalls(factory) 
r.start() 

reactor.listenTCP(7080, factory) 
print "Micros remote RPC server started" 
reactor.run() 

Antwort

2

Sie benötigen eine genügend Informationen, um jede Nachricht hinzuzufügen, so dass der Empfänger kann bestimmen, wie sie zu interpretieren. Ihre Anforderungen ähneln denen von AMP. Sie können also entweder AMP verwenden oder die gleiche Struktur wie AMP verwenden, um Ihre Nachrichten zu identifizieren. Speziell:

  • In Anfragen, legen Sie einen bestimmten Schlüssel - zum Beispiel AMP verwendet "_ask", um Anfragen zu identifizieren. Es gibt diesen auch einen eindeutigen Wert, der diese Anforderung für die Lebensdauer der Verbindung weiter identifiziert.
  • Geben Sie in Antworten einen anderen Schlüssel ein, z. B. AMP verwendet hierfür "_answer". Der Wert entspricht dem Wert aus dem Schlüssel "_ask" in der Anforderung, für die die Antwort gilt.
  • Mit einem solchen Ansatz müssen Sie nur nachsehen, ob es einen "_ask" -Schlüssel oder einen "_answer" -Schlüssel gibt, um festzustellen, ob Sie eine neue Anfrage oder eine Antwort auf eine frühere Anfrage erhalten haben .

    Zu einem separaten Thema sollte Ihre asyncGatewayCalls Klasse nicht Thread-basiert sein. Es gibt keinen ersichtlichen Grund dafür, Threads zu verwenden, und dabei werden auch verdrehte APIs missbraucht, die zu undefiniertem Verhalten führen. Die meisten verdrehten APIs können nur in dem Thread verwendet werden, in dem Sie reactor.run aufgerufen haben. Die einzige Ausnahme ist reactor.callFromThread, die Sie verwenden können, um eine Nachricht von jedem anderen Thread an den Reaktor-Thread zu senden. asyncGatewayCalls versucht jedoch, auf einen Transport zu schreiben, was zu Pufferbeschädigungen oder willkürlichen Verzögerungen bei den gesendeten Daten führen wird, oder vielleicht schlimmeren Dingen. Stattdessen können Sie asyncGatewayCalls wie folgt schreiben:

    from twisted.internet.task import LoopingCall 
    
    class asyncGatewayCalls(object): 
        def __init__(self, rpcfactory): 
         self.rpcfactory = rpcfactory 
         self.remoteMacList = [...] 
    
        def run(): 
         self._call = LoopingCall(self._pokeMicro) 
         return self._call.start(10) 
    
        def _pokeMicro(self): 
         while True: 
          mac = self.remoteMacList[...] 
          if mac in self.rpcfactory.references: 
           proto = ... 
           dataToSend = ... 
           proto.transport.write(dataToSend) 
           break 
    
    factory = ... 
    r = asyncGatewayCalls(factory) 
    r.run() 
    
    reactor.listenTCP(7080, factory) 
    reactor.run() 
    

    Dieses Sie eine Single-Threaded-Lösung gibt, die das gleiche Verhalten haben sollte, wie Sie für die ursprüngliche asyncGatewayCalls Klasse bestimmt. Anstatt in einer Schleife in einem Thread zu schlafen, um die Aufrufe zu planen, verwendet es jedoch die Scheduling-APIs des Reaktors (über die LoopingCall-Klasse auf höherer Ebene, die wiederholt aufgerufene Dinge plant), um sicherzustellen, dass alle zehn Sekunden _pokeMicro aufgerufen wird .

    +0

    Ja, Ihr Recht, vor ein paar Stunden machte ich die gleiche Schlussfolgerung nach dem Lesen der Thread-API-Dokumentation (task.LoopingCall). Ich habe getestet und es hat ziemlich gut funktioniert. Vielen Dank für Ihre Hilfe – Jaime

    +1

    Ist das 'id'-Feld von jsonrpc nicht geeignet, um festzustellen, welche Antwort für welche Anfrage ist? –

    +0

    JSON-RPC 2.0-Nachrichten können klar unterschieden werden. Benachrichtigungen - Anfragen ohne ID-Feld, Antworten auch nicht gleich (Ergebnis und Fehler). –