2014-02-21 16 views
6

Ich möchte wissen, wie man mit Amqpphplib verzögert.Wie verzögern? - php-amqplib

benutzen ich diese großen Kaffee-Skript-Tutorial:

https://github.com/jamescarr/rabbitmq-scheduled-delivery

aber es scheint nicht mit PHP-amqplib zu arbeiten.

Die Nachricht läuft ab, wie ich will, aber es scheint, dass "x-dead-letter-exchange" nicht die Arbeit machen. Ich habe die RabbitMQ-Verwaltungskonsole verwendet, und ich sehe die Erstellung und Löschung von Warteschlangen im Live-Modus. Aber meine Nachricht geht nach dem Ablauf in die unmittelbare Warteschlange. Ich benutze RabbitMQ 3.2.3 Version, PHP-amqplib 2.2. * Version.

ist hier mein Code:

Connection-Klasse:

class Connection 
{ 
/** 
* @var $ch 
*/ 
public $ch; 

/** 
* @var $consumer_tag 
*/ 
public $consumer_tag; 

/** 
* @var $exchange 
*/ 
public $exchange; 

/** 
* @var $conn 
*/ 
public $conn; 

public function __construct($host, $port, $user, $password, $vhost) 
{ 

    $this->exchange = 'immediate'; 
    $this->queue = 'right.now.queue'; 
    $this->consumer_tag = 'consumer'; 


    $this->conn = new AMQPConnection($host, $port, $user, $password, $vhost); 
    $this->ch = $this->conn->channel(); 

    $this->ch->exchange_declare($this->exchange, 'direct', false, true, false); 

    $this->ch->queue_declare($this->queue, false, true, false, false, false); 

    $this->ch->queue_bind($this->queue, $this->exchange); 


} 

public function createDelayedQueue ($name, $delay_seconds) { 
    $this->ch->queue_declare($name, false, false, false, true, true, array(
     "x-dead-letter-exchange" => array("S", $this->exchange), 
     "x-message-ttl" => array("I", $delay_seconds*1000), 
     "x-expires" => array("I", $delay_seconds*1000+1000) 
    )); 
} 
} 

Veröffentlichen Code

$name = 'send.later.'.$ts; 
$amqp->createDelayedQueue($name, 2); 
$msg = new AMQPMessage($msg_body, array('content_type' => 'text/plain', 'delivery_mode' => 2)); 
$amqp->ch->basic_publish($msg); 

Consumer Code

$amqp = $this->getContainer()->get('amqp_connexion'); 

    $amqp->ch->basic_consume($amqp->queue, $amqp->consumer_tag, false, false, false, false, function ($msg) { 

     echo $msg->body; 
     echo "\n--------\n"; 
    }); 

    $output->writeln('Listening '.$amqp->queue.'...'); 

    // Loop as long as the channel has callbacks registered 
    while (count($amqp->ch->callbacks)) { 
     $amqp->ch->wait(); 
    } 

Antwort

13

Ich schrieb gerade eine vereinfachte Arbeitsversion für PHP:

/////// simplified /////// 

// include the AMQPlib Classes || use an autoloader 

// queue/exchange names 
$queueRightNow = 'right.now.queue'; 
$exchangeRightNow = 'right.now.exchange'; 
$queueDelayed5sec = 'delayed.five.seconds.queue'; 
$exchangeDelayed5sec = 'delayed.five.seconds.exchange'; 

$delay = 5; // delay in seconds 

// create connection 
$AMQPConnection = new \PhpAmqpLib\Connection\AMQPConnection('localhost',5672,'guest','guest'); 

// create a channel 
$channel = $AMQPConnection->channel(); 

// create the right.now.queue, the exchange for that queue and bind them together 
$channel->queue_declare($queueRightNow); 
$channel->exchange_declare($exchangeRightNow, 'direct'); 
$channel->queue_bind($queueRightNow, $exchangeRightNow); 

// now create the delayed queue and the exchange 
$channel->queue_declare(
     $queueDelayed5sec, 
     false, 
     false, 
     false, 
     true, 
     true, 
     array(
      'x-message-ttl' => array('I', $delay*1000), // delay in seconds to milliseconds 
      "x-expires" => array("I", $delay*1000+1000), 
      'x-dead-letter-exchange' => array('S', $exchangeRightNow) // after message expiration in delay queue, move message to the right.now.queue 
     ) 
); 
$channel->exchange_declare($exchangeDelayed5sec, 'direct'); 
$channel->queue_bind($queueDelayed5sec, $exchangeDelayed5sec); 

// now create a message und publish it to the delayed exchange 
$msg = new \PhpAmqpLib\Message\AMQPMessage(
    time(), 
    array(
     'delivery_mode' => 2 
    ) 
); 
$channel->basic_publish($msg,$exchangeDelayed5sec); 


// consume the delayed message 
$consumeCallback = function(\PhpAmqpLib\Message\AMQPMessage $msg) { 
    $messagePublishedAt = $msg->body; 
    echo 'seconds between publishing and consuming: ' 
     . (time()-$messagePublishedAt) . PHP_EOL; 
}; 
$channel->basic_consume($queueRightNow, '', false, true, false, false, $consumeCallback); 

// start consuming 
while (count($channel->callbacks) > 0) { 
    $channel->wait(); 
} 
+0

großartig! Danke ! – LucasC

0

Wenn Sie amqp interop basierten Transport wählen Sie gar nicht in Details graben müssen. Nur ein paar Dinge zu tun:

Installieren enqueue/amqp-lib (übrigens können Sie andere Transporte basierend auf amqp ext und eine große Hase lib) Transport und enqueue/amqp-tools verwenden.

composer require enqueue/amqp-lib enqueue/amqp-tools 

erstellen AMQP Kontext, eine Verzögerungsstrategie hinzufügen und verzögerten Nachrichten senden:

<?php 
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy; 
use Enqueue\AmqpBunny\AmqpConnectionFactory; 

$context = (new AmqpConnectionFactory('amqp://'))->createContext(); 
$context->setDelayStrategy(new RabbitMqDlxDelayStrategy()) 

$queue = $context->createQueue('foo'); 
$context->declareQueue($queue); 

$message = $context->createMessage('Hello world!'); 

$context->createProducer() 
    ->setDeliveryDelay(5000) // 5 sec 
    ->send($queue, $message) 
; 

By the way, diese nicht diese einzige Strategie zur Verfügung. Es gibt einen basierend auf RabbitMQ Delay Plugin. Es könnte auf die gleiche Weise verwendet werden.