2014-09-06 13 views
7

Ich mache ein HTML5-Spiel www.titansoftime.comPHP Pthreads mit Knarre Websocket

Ich bin mit Ratsche als PHP websocket-Server-Lösung. Es funktioniert super! http://socketo.me/docs/push

Ich habe mehrere Standalone-Test mit der PHP Pthreads-Erweiterung gemacht und habe einige sehr aufregende Ergebnisse gesehen. Es funktioniert wirklich und funktioniert gut .. solange Websockets nicht im Mix sind.

Pthreads geben php Multithreading-Fähigkeiten (es funktioniert wirklich und es ist erstaunlich). Diese http://php.net/manual/en/book.pthreads.php

ist, was ich tue:

/src/server.php Dies ist die Datei, die den Daemon startet.

<?php 
    session_start(); 

    use Ratchet\Server\IoServer; 
    use Ratchet\WebSocket\WsServer; 
    use MyApp\Pusher; 

    require __DIR__ . '/../vendor/autoload.php'; 

    require_once __DIR__ . '/../mysql.cls.php'; 
    require_once __DIR__ . '/../game.cls.php'; 
    require_once __DIR__ . '/../model.cls.php'; 

    $mysql = new mysql; 
    $game = new game; 

    $loop = React\EventLoop\Factory::create(); 
    $pusher = new MyApp\Pusher(); 

    $loop->addPeriodicTimer(0.50, function() use($pusher){ 
     $pusher->load(); 
    }); 

    $webSock = new React\Socket\Server($loop); 

    if ($loop instanceof \React\EventLoop\LibEventLoop) { 
     echo "\n HAS LibEvent"; 
    } 

    $webSock->listen(8080, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect 
    $webServer = new Ratchet\Server\IoServer(
      new Ratchet\Http\HttpServer(
        new Ratchet\WebSocket\WsServer($pusher) 
      ), 
      $webSock 
    ); 

    $loop->run(); 

Das alles funktioniert gut.

/src/MyApp/Pusher.php Diese Klasse überträgt Daten an alle verbundenen Benutzer.

<?php 
namespace MyApp; 
use Ratchet\ConnectionInterface; 
use Ratchet\MessageComponentInterface; 

class AsyncThread extends \Thread{ 

    public $client; 

    public function __construct($client){ 
     $this->client = $client; 
    } 

    public function run(){ 

     // do work on $this->client 
     $user = mysql::assoc('SELECT * from users WHERE connection_id = "'.$this->client->resourceId.'"'); 
     // etc.. 
     $this->client->send(json_encode(array('foo'=>'bar'))); 

    } 

} 

class Pusher implements MessageComponentInterface{ 

    public static $clients = array(); 

    #load 
    public static function load(){ 

     $client_count = count(self::$clients); 

     echo "\n\n\n".'Serving to '.$client_count.' clients. '.time(); 

     $start = $istart = microtime(true); 

     if(!count(self::$clients)){ 
      if(!mysql_ping()){ 
       $game->connect(); 
      } 
     } 

     $threads = array(); 
     foreach(self::$clients as $key => $client){  

      // HANDLE CLIENT 

      // This works just fine, the only problem is that if I have lets say 50 simultaneous users, the people near the end of the clients array will have to wait till the other users have been processed. This is not desirable 
      $client->send(json_encode('foo'=>'bar')); 

      // So I tried this: 
      $threads[$key] = new AsyncThread($client); 
      $threads[$key]->start(); 

      // At this point the AsyncThread class will throw a fatal error complaining about not being able to serialize a closure. 
      // If I dont set "$this->data = $client;" in the thread constructor no error appears but now I cant use the data. 

      // Also regardless of whether or not I bind the data in the AsyncThread constructor, 
      // the connection disappears if I call "new AsyncThread($client)". I cannot explain this behavior. 

     } 

    } 

    public function onMessage(ConnectionInterface $from, $msg) { 
     global $game; 
     if($msg){ 
      $data = json_decode($msg); 
      if($data){  

       switch($data->task){ 

        #connect 
        case 'connect': 
         echo "\n".'New connection! ('.$from->resourceId.') '.$from->remoteAddress; 
         self::$clients[] = $from; 
         break; 

        default: 
         self::closeConnection($from); 
         echo "\nNO TASK CLOSING"; 
         break; 

       } 
      }else{ 
       echo "\n NO DATA"; 
       self::closeConnection($from); 
      } 
     }else{ 
      echo "\n NO MSG"; 
      self::closeConnection($from); 
     } 
    } 

    public function closeConnection($conn){ 
     global $game; 
     if($conn){ 
      if($conn->resourceId){ 
       $connid = $conn->resourceId; 
       $conn->close(); 
       $new = array(); 
       foreach(self::$clients as $client){ 
        if($client->resourceId != $connid){ 
         $new[] = $client; 
        } 
       } 
       self::$clients = $new; 
       $game->query('UPDATE users set connection_id = 0 WHERE connection_id = "'.intval($connid).'" LIMIT 1'); 
       echo "\n".'Connection '.$connid.' has disconnected'; 
      } 
     } 
    } 

    public function onClose(ConnectionInterface $conn) { 
     echo "\nCLIENT DROPPED"; 
     self::closeConnection($conn); 
    } 

    public function onOpen(ConnectionInterface $conn) { 
    } 
    public function onError(ConnectionInterface $conn, \Exception $e) { 
     echo "\nCLIENT ERRORED"; 
     self::closeConnection($conn); 
    } 
    public function onSubscribe(ConnectionInterface $conn, $topic) { 
    } 
    public function onUnSubscribe(ConnectionInterface $conn, $topic) { 
    } 
    public function onCall(ConnectionInterface $conn, $id, $topic, array $params) { 
    } 
    public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) { 
    } 

} 

Das funktioniert alles gut, solange ich keinen Thread innerhalb der Ereignisschleife erstellen.

Gehe ich das falsch herum oder ist PHP Multithreading und Websockets inkompatibel?

+0

Hobbes haben Sie Updates für diese Fragen? –

+0

Ich warte immer noch auf eine Antwort lol. – Hobbes

+1

Ich denke nicht, dass es notwendig ist, Multithreading zu implementieren, wenn Sie durch die Ratsche und die Quelle der Reaktion gegangen sind, dann werden Sie verstehen, dass es nicht blockierende Sockellesefunktionen verwendet. Auch wenn Sie eine kleine Menge an Leistungsschub möchten, dann sollten Sie in Libevent suchen. –

Antwort

1

Überprüfung dieses Paket https://github.com/huyanping/react-multi-process

Installieren

Komponisten erfordern jenner/reagieren-Multi-Prozess- Wie es zu benutzen?

So einfach wie:

$loop = React\EventLoop\Factory::create(); 
$server = stream_socket_server('tcp://127.0.0.1:4020'); 
stream_set_blocking($server, 0); 
$loop->addReadStream($server, function ($server) use ($loop) { 
    $conn = stream_socket_accept($server); 
    $data = "pid:" . getmypid() . PHP_EOL; 
    $loop->addWriteStream($conn, function ($conn) use (&$data, $loop) { 
     $written = fwrite($conn, $data); 
     if ($written === strlen($data)) { 
      fclose($conn); 
      $loop->removeStream($conn); 
     } else { 
      $data = substr($data, 0, $written); 
     } 
    }); 
}); 

// the second param is the sub process count 
$master = new \React\Multi\Master($loop, 20); 
$master->start(); 

ein Beispiel unter Verwendung jenner/simple_fork wie:

class IoServer { 
    /** 
    * @param int $count worker process count 
    * Run the application by entering the event loop 
    * @throws \RuntimeException If a loop was not previously specified 
    */ 
    public function run($count = 1) { 
     if (null === $this->loop) { 
      throw new \RuntimeException("A React Loop was not provided during instantiation"); 
     } 

     if($count <= 1){ 
      $this->loop->run(); 
     }else{ 
      $loop = $this->loop; 
      $master = new \Jenner\SimpleFork\FixedPool(function() use($loop) { 
       $this->loop->run(); 
      }, $count); 
      $master->start(); 
      $master->keep(true); 
//   or just 
//   $master = new \React\Multi\Master($this->loop, $count); 
//   $master->start(); 
     } 
    } 
}