Ich mache ein HTML5-Spiel www.titansoftime.comPHP Pthreads mit Knarre Websocket

Ich bin mit Ratsche als PHP websocket-Server-Lösung. Es funktioniert super! http://socketo.me/docs/push

Ich habe mehrere Standalone-Test mit der PHP Pthreads-Erweiterung gemacht und habe einige sehr aufregende Ergebnisse gesehen. Es funktioniert wirklich und funktioniert gut .. solange Websockets nicht im Mix sind.

Pthreads geben php Multithreading-Fähigkeiten (es funktioniert wirklich und es ist erstaunlich). Diese http://php.net/manual/en/book.pthreads.php

ist, was ich tue:

/src/server.php Dies ist die Datei, die den Daemon startet.


    use Ratchet\Server\IoServer; 
    use Ratchet\WebSocket\WsServer; 
    use MyApp\Pusher; 

    require __DIR__ . '/../vendor/autoload.php'; 

    require_once __DIR__ . '/../mysql.cls.php'; 
    require_once __DIR__ . '/../game.cls.php'; 
    require_once __DIR__ . '/../model.cls.php'; 

    $mysql = new mysql; 
    $game = new game; 

    $loop = React\EventLoop\Factory::create(); 
    $pusher = new MyApp\Pusher(); 

    $loop->addPeriodicTimer(0.50, function() use($pusher){ 

    $webSock = new React\Socket\Server($loop); 

    if ($loop instanceof \React\EventLoop\LibEventLoop) { 
     echo "\n HAS LibEvent"; 

    $webSock->listen(8080, ''); // Binding to means remotes can connect 
    $webServer = new Ratchet\Server\IoServer(
      new Ratchet\Http\HttpServer(
        new Ratchet\WebSocket\WsServer($pusher) 


Das alles funktioniert gut.

/src/MyApp/Pusher.php Diese Klasse überträgt Daten an alle verbundenen Benutzer.

namespace MyApp; 
use Ratchet\ConnectionInterface; 
use Ratchet\MessageComponentInterface; 

class AsyncThread extends \Thread{ 

    public $client; 

    public function __construct($client){ 
     $this->client = $client; 

    public function run(){ 

     // do work on $this->client 
     $user = mysql::assoc('SELECT * from users WHERE connection_id = "'.$this->client->resourceId.'"'); 
     // etc.. 



class Pusher implements MessageComponentInterface{ 

    public static $clients = array(); 

    public static function load(){ 

     $client_count = count(self::$clients); 

     echo "\n\n\n".'Serving to '.$client_count.' clients. '.time(); 

     $start = $istart = microtime(true); 


     $threads = array(); 
     foreach(self::$clients as $key => $client){  


      // This works just fine, the only problem is that if I have lets say 50 simultaneous users, the people near the end of the clients array will have to wait till the other users have been processed. This is not desirable 

      // So I tried this: 
      $threads[$key] = new AsyncThread($client); 

      // At this point the AsyncThread class will throw a fatal error complaining about not being able to serialize a closure. 
      // If I dont set "$this->data = $client;" in the thread constructor no error appears but now I cant use the data. 

      // Also regardless of whether or not I bind the data in the AsyncThread constructor, 
      // the connection disappears if I call "new AsyncThread($client)". I cannot explain this behavior. 



    public function onMessage(ConnectionInterface $from, $msg) { 
     global $game; 
      $data = json_decode($msg); 


        case 'connect': 
         echo "\n".'New connection! ('.$from->resourceId.') '.$from->remoteAddress; 
         self::$clients[] = $from; 

         echo "\nNO TASK CLOSING"; 

       echo "\n NO DATA"; 
      echo "\n NO MSG"; 

    public function closeConnection($conn){ 
     global $game; 
       $connid = $conn->resourceId; 
       $new = array(); 
       foreach(self::$clients as $client){ 
        if($client->resourceId != $connid){ 
         $new[] = $client; 
       self::$clients = $new; 
       $game->query('UPDATE users set connection_id = 0 WHERE connection_id = "'.intval($connid).'" LIMIT 1'); 
       echo "\n".'Connection '.$connid.' has disconnected'; 

    public function onClose(ConnectionInterface $conn) { 
     echo "\nCLIENT DROPPED"; 

    public function onOpen(ConnectionInterface $conn) { 
    public function onError(ConnectionInterface $conn, \Exception $e) { 
     echo "\nCLIENT ERRORED"; 
    public function onSubscribe(ConnectionInterface $conn, $topic) { 
    public function onUnSubscribe(ConnectionInterface $conn, $topic) { 
    public function onCall(ConnectionInterface $conn, $id, $topic, array $params) { 
    public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) { 


Das funktioniert alles gut, solange ich keinen Thread innerhalb der Ereignisschleife erstellen.

Gehe ich das falsch herum oder ist PHP Multithreading und Websockets inkompatibel?


Überprüfung dieses Paket https://github.com/huyanping/react-multi-process


Komponisten erfordern jenner/reagieren-Multi-Prozess- Wie es zu benutzen?

So einfach wie:

$loop = React\EventLoop\Factory::create(); 
$server = stream_socket_server('tcp://'); 
stream_set_blocking($server, 0); 
$loop->addReadStream($server, function ($server) use ($loop) { 
    $conn = stream_socket_accept($server); 
    $data = "pid:" . getmypid() . PHP_EOL; 
    $loop->addWriteStream($conn, function ($conn) use (&$data, $loop) { 
     $written = fwrite($conn, $data); 
     if ($written === strlen($data)) { 
     } else { 
      $data = substr($data, 0, $written); 

// the second param is the sub process count 
$master = new \React\Multi\Master($loop, 20); 

ein Beispiel unter Verwendung jenner/simple_fork wie:

class IoServer { 
    * @param int $count worker process count 
    * Run the application by entering the event loop 
    * @throws \RuntimeException If a loop was not previously specified 
    public function run($count = 1) { 
     if (null === $this->loop) { 
      throw new \RuntimeException("A React Loop was not provided during instantiation"); 

     if($count <= 1){ 
      $loop = $this->loop; 
      $master = new \Jenner\SimpleFork\FixedPool(function() use($loop) { 
      }, $count); 
//   or just 
//   $master = new \React\Multi\Master($this->loop, $count); 
//   $master->start(); 