2010-05-10 11 views
6

Ich möchte eine Nachricht an einen RabbitMQ-Server senden und dann auf eine Antwortnachricht warten (in einer "Antwort" -Warteschlange). Natürlich möchte ich nicht für immer warten, falls die Anwendung, die diese Nachrichten verarbeitet, nicht funktioniert - es muss ein Timeout sein. Es klingt wie eine sehr einfache Aufgabe, aber ich kann keinen Weg finden, dies zu tun. Ich habe jetzt dieses Problem sowohl mit py-amqplib als auch mit der RabbitMQ .NET client.Warte auf eine einzelne RabbitMQ-Nachricht mit einer Zeitüberschreitung

Die beste Lösung, die ich bisher bekommen habe ist mit basic_get mit sleep in-between, abzufragen, aber das ist ziemlich hässlich:

def _wait_for_message_with_timeout(channel, queue_name, timeout): 
    slept = 0 
    sleep_interval = 0.1 

    while slept < timeout: 
     reply = channel.basic_get(queue_name) 
     if reply is not None: 
      return reply 

     time.sleep(sleep_interval) 
     slept += sleep_interval 

    raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout) 

Sicher ist etwas besserer Weg gibt?

Antwort

8

Ich habe gerade Timeout-Unterstützung für amqplib in carrot hinzugefügt.

Dies ist eine Unterklasse von amqplib.client0_8.Connection:

http://github.com/ask/carrot/blob/master/carrot/backends/pyamqplib.py#L19-97

wait_multi ist eine Version von channel.wait Lage von Kanälen auf eine beliebige Zahl zu erhalten.

Ich denke, das könnte zu einem bestimmten Zeitpunkt stromaufwärts zusammengeführt werden.

+1

Jetzt nenne ich das eine "große Antwort": "Es ist behoben"! Akzeptieren - in der Hoffnung, dass es * in amqplib zusammengeführt wird. – EMP

+0

@EMP haha ​​:) lustig :) –

1

Dies scheint die ganze Idee der asynchronen Verarbeitung zu brechen, aber wenn Sie müssen ich denke, der richtige Weg, um es zu tun ist, eine RpcClient zu verwenden.

+0

Während RpcClient sich mir nicht sinnvoll ist, bei dessen Umsetzung suchen enthüllt den Ansatz zu verwenden: Erstellen Sie eine 'QueueingBasicConsumer' und Warte auf seine Warteschlange, die eine Zeitüberschreitung unterstützt. Dies ist in .NET nicht so komplex, wie ich befürchtet habe. – EMP

2

Es ist ein Beispiel hereqpid mit einem msg = q.get(timeout=1) verwenden, das tun sollten, was Sie wollen. Tut mir leid, ich weiß nicht, welche anderen AMQP-Client-Bibliotheken Timeouts implementieren (und insbesondere kenne ich die beiden spezifischen, die du erwähnt hast).

+0

Betrachtet man die Quelle von qpid, so scheint sie genau den gleichen Ansatz wie der .NET-Client zu verwenden: 'basic_consume' mit einer Warteschlange und Warten auf die Warteschlange mit einem Timeout. Sieht so aus, als müsste ich das tun. – EMP

8

Hier ist, was ich am Ende in der .NET-Client zu tun:

protected byte[] WaitForMessageWithTimeout(string queueName, int timeoutMs) 
{ 
    var consumer = new QueueingBasicConsumer(Channel); 
    var tag = Channel.BasicConsume(queueName, true, null, consumer); 
    try 
    { 
     object result; 
     if (!consumer.Queue.Dequeue(timeoutMs, out result)) 
      throw new ApplicationException(string.Format("Timeout ({0} seconds) expired while waiting for an MQ response.", timeoutMs/1000.0)); 

     return ((BasicDeliverEventArgs)result).Body; 
    } 
    finally 
    { 
     Channel.BasicCancel(tag); 
    } 
} 

Leider kann ich das gleiche mit py-amqplib nicht tun, weil seine basic_consume Methode den Rückruf nicht nennen, es sei denn Sie channel.wait() und channel.wait() nennen unterstützt keine Timeouts! Diese dumme Einschränkung (auf die ich immer wieder zu sprechen komme) bedeutet, dass dein Thread für immer eingefroren ist, wenn du nie wieder eine Nachricht erhältst.

1

Mit Rabbit können Sie jetzt Timeout-Ereignisse hinzufügen. Wickeln Sie den Code einfach in einem Versuch zu fangen und dann Ausnahmen im TimeOut werfen und Trennen Lader:

try{ 
    using (IModel channel = rabbitConnection.connection.CreateModel()) 
    { 
     client = new SimpleRpcClient(channel, "", "", queue); 
     client.TimeoutMilliseconds = 5000; // 5 sec. defaults to infinity 
     client.TimedOut += RpcTimedOutHandler; 
     client.Disconnected += RpcDisconnectedHandler; 
     byte[] replyMessageBytes = client.Call(message); 
     return replyMessageBytes; 
    } 
} 
catch (Exception){ 
    //Handle timeout and disconnect here 
} 
private void RpcDisconnectedHandler(object sender, EventArgs e) 
{ 
    throw new Exception("RPC disconnect exception occured."); 
} 

private void RpcTimedOutHandler(object sender, EventArgs e) 
{ 
    throw new Exception("RPC timeout exception occured."); 
}