2016-05-11 2 views
2

Ich baue eine Websocket Server-Anwendung in Python 3. Ich bin mit dieser Implementierung: https://websockets.readthedocs.io/Python 3 Threaded WebSockets Server

Grundsätzlich ich mehrere Client verwalten möchten. Auch ich will Daten aus 2 verschiedenen Thread senden (eine für GPS + eine für IMU) GPS-Thread aktualisiert 1 Hz, während IMU Thread aktualisieren bei 25Hz ist

Mein Problem ist in MSGWorker.sendData Methode: sobald Ich entzippe die Zeile @ asyncio.coroutine und ergebe von websocket.send ('{"GPS": "% s"}'% data) die ganze Methode macht nichts (kein Ausdruck ("Send data: foo") im Terminal)

Allerdings mit diesen beiden Zeilen kommentiert mein Code funktioniert wie ich erwarte, außer dass ich nichts durch die Websocket senden.

Aber natürlich ist mein Ziel, Daten durch die WebSocket zu senden, ich verstehe einfach nicht, warum es nicht funktioniert? Irgendeine Idee ?

server.py

#!/usr/bin/env python3 
import signal, sys 
sys.path.append('.') 
import time 
import websockets 
import asyncio 
import threading 

connected = set() 
stopFlag = False 



class GPSWorker (threading.Thread): 
    def __init__(self): 
    threading.Thread.__init__(self) 
    self.data = 0 
    self.lastData = 0 
    self.inc = 0 

    # Simulate GPS data 
    def run(self): 
    while not stopFlag: 
     self.data = self.inc 
     self.inc += 1 
     time.sleep(1) 

    def get(self): 
    if self.lastData is not self.data: 
     self.lastData = self.data 
     return self.data 



class IMUWorker (threading.Thread): 
    def __init__(self): 
    threading.Thread.__init__(self) 
    self.data = 0 
    self.lastData = 0 
    self.inc = 0 

    # Simulate IMU data 
    def run(self): 
    while not stopFlag: 
     self.data = self.inc 
     self.inc += 1 
     time.sleep(0.04) 

    def get(self): 
    if self.lastData is not self.data: 
     self.lastData = self.data 
     return self.data 



class MSGWorker (threading.Thread): 
    def __init__(self): 
    threading.Thread.__init__(self) 

    def run(self): 
    while not stopFlag: 
     data = gpsWorker.get() 
     if data: 
     self.sendData('{"GPS": "%s"}' % data)   

     data = imuWorker.get() 
     if data: 
     self.sendData('{"IMU": "%s"}' % data) 

     time.sleep(0.04) 

    #@asyncio.coroutine 
    def sendData(self, data): 
    for websocket in connected.copy(): 
     print("Sending data: %s" % data) 
     #yield from websocket.send('{"GPS": "%s"}' % data) 



@asyncio.coroutine 
def handler(websocket, path): 
    global connected 
    connected.add(websocket) 
    #TODO: handle client disconnection 
    # i.e connected.remove(websocket) 



if __name__ == "__main__": 
    print('aeroPi server') 
    gpsWorker = GPSWorker() 
    imuWorker = IMUWorker() 
    msgWorker = MSGWorker() 

    try: 
    gpsWorker.start() 
    imuWorker.start() 
    msgWorker.start() 

    start_server = websockets.serve(handler, 'localhost', 7700) 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(start_server) 
    loop.run_forever() 

    except KeyboardInterrupt: 
    stopFlag = True 
    loop.close() 
    print("Exiting program...") 

client.html

<!doctype html> 
<html> 
<head> 
    <meta charset="UTF-8" /> 
</head> 
<body> 
</body> 
</html> 
<script type="text/javascript"> 
    var ws = new WebSocket("ws://localhost:7700", 'json'); 
    ws.onmessage = function (e) { 
    var data = JSON.parse(e.data); 
    console.log(data); 
    }; 
</script> 

Vielen Dank für Ihre Hilfe

Antwort

4

Endlich habe ich es! Es benötigt Python 3.5.1 (während meine Distribution nur 3.4.3 bietet) und etwas Hilfe von Aymeric, dem Autor der websockets-Bibliothek (danke an ihn).

#!/usr/bin/env python3 
import signal, sys 
sys.path.append('.') 
import time 
import websockets 
import asyncio 
import threading 


stopFlag = False 



class GPSWorker (threading.Thread): 
    def __init__(self): 
    threading.Thread.__init__(self) 
    self.data = 0 
    self.lastData = 0 
    self.inc = 0 

    # Simulate GPS data 
    def run(self): 
    while not stopFlag: 
     self.data = self.inc 
     self.inc += 1 
     time.sleep(1) 

    def get(self): 
    if self.lastData is not self.data: 
     self.lastData = self.data 
     return self.data 



class IMUWorker (threading.Thread): 
    def __init__(self): 
    threading.Thread.__init__(self) 
    self.data = 0 
    self.lastData = 0 
    self.inc = 0 

    # Simulate IMU data 
    def run(self): 
    while not stopFlag: 
     self.data = self.inc 
     self.inc += 1 
     time.sleep(0.04) 

    def get(self): 
    if self.lastData is not self.data: 
     self.lastData = self.data 
     return self.data 



class MSGWorker (threading.Thread): 
    def __init__(self): 
    threading.Thread.__init__(self) 
    self.connected = set() 

    def run(self): 
    while not stopFlag: 
     data = gpsWorker.get() 
     if data: 
     self.sendData('{"GPS": "%s"}' % data) 

     data = imuWorker.get() 
     if data: 
     self.sendData('{"IMU": "%s"}' % data) 

     time.sleep(0.04) 

    async def handler(self, websocket, path): 
    self.connected.add(websocket) 
    try: 
     await websocket.recv() 
    except websockets.exceptions.ConnectionClosed: 
     pass 
    finally: 
     self.connected.remove(websocket) 

    def sendData(self, data): 
    for websocket in self.connected.copy(): 
     print("Sending data: %s" % data) 
     coro = websocket.send(data) 
     future = asyncio.run_coroutine_threadsafe(coro, loop) 



if __name__ == "__main__": 
    print('aeroPi server') 
    gpsWorker = GPSWorker() 
    imuWorker = IMUWorker() 
    msgWorker = MSGWorker() 

    try: 
    gpsWorker.start() 
    imuWorker.start() 
    msgWorker.start() 

    ws_server = websockets.serve(msgWorker.handler, '0.0.0.0', 7700) 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(ws_server) 
    loop.run_forever() 
    except KeyboardInterrupt: 
    stopFlag = True 
    #TODO: close ws server and loop correctely 
    print("Exiting program...") 

Grüße, Clément