2016-04-07 2 views
0

Ich fange an, RabbitMQ zu verwenden, und nachdem ich die Tutorials verfolgt habe, versuche ich nun, es so zu machen, wie ich es brauche und ich stehe in Schwierigkeiten. Die Einrichtung, die ich habe, ist, dass ich in der Lage sein muss, einen RPC zuerst zu machen, und dann basierend auf der Antwort von dem Client eine weitere Nachricht an eine Arbeitswarteschlange senden wird (oder nicht) wird, auf die ich keine Antwort brauche der Kunde). Leider scheinen meine Bemühungen, diese Zusammenarbeit in Gang zu bringen, nicht so zu funktionieren, wie ich es möchte. Auf der Serverseite, habe ich so etwas wie diese (ich viele Variationen alle mit den gleichen Problemen versucht habe):Arbeitswarteschlange und RPC zusammen arbeiten

var factory = new ConnectionFactory() { HostName = "localhost" }; 
connection = factory.CreateConnection(); 
channel = connection.CreateModel(); 
channel.ExchangeDeclare(exchange: "jobs", type: "direct", durable: true); 

// I started with a named queue, not sure if that's better or worse for this 
var queueName = channel.QueueDeclare().QueueName; 

channel.QueueBind(queue: queueName, 
    exchange: "jobs", 
    routingKey: "saveJob_queue"); 

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); 

var consumer = new EventingBasicConsumer(channel); 
consumer.Received += (model, ea) => 
{ 
    // save stuff that was sent with the saveJob_queue routingKey 
} 

channel.BasicConsume(queue: queueName, 
    noAck: false, 
    consumer: consumer); 

// set up channel for RPC 
// Not sure if this has to have another channel, but it wasn't working on the same channel either 
rpcChannel = connection.CreateModel(); 
var rpcQueueName = rpcChannel.QueueDeclare().QueueName; 

rpcChannel.QueueBind(queue: rpcQueueName, 
    exchange: "jobs", 
    routingKey: "rpc_CheckJob_queue"); 

var rpcConsumer = new EventingBasicConsumer(rpcChannel); 

rpcConsumer.Received += (model, ea) => 
{ 
    // do my remote call and send back a response 
} 

Das Problem, das ich habe ist, dass eine Nachricht an den noch jobs Austausch mit dem Routing-Schlüssel rpc_CheckJob_queue gesendet beendet das Auslösen des Recieved Ereignisses auf dem ersten Kanal trotz der Tatsache, dass es nur saveJob_queue Routen erhalten sollte. Ich könnte ea.RoutingKey in diesem Handler überprüfen und diese Nachrichten einfach ignorieren, aber ich verstehe nicht, wie und warum sie dort überhaupt landen?

Was wäre der richtige Weg, um eine Verbindung einzurichten, so dass sowohl Arbeitswarteschlangenmeldungen als auch RPC-Nachrichten empfangen und korrekt verarbeitet werden können?

+0

Vermissen Sie etwas wie 'rpcChannel .BasicConsume (rpcQueueName: queuename,' Auch wenn Sie Ihren Code für das Senden der Nachrichten veröffentlichen konnte – cantSleepNow

Antwort

0

Also habe ich aufgegeben und beschlossen, nur im Received Ereignis zu filtern. Ich denke, das Problem ist, dass RabbitMQ hat nur eine Received Ereignis auf einem Kanal, aber nicht auf eine Warteschlange. So wird das Received Ereignis in beide Richtungen getroffen. So, jetzt habe ich das:

channel.QueueDeclare(queue: queueName, 
     durable: true, 
     exclusive: false, 
     autoDelete: false, 
     arguments: null); 

channel.QueueDeclare(queue: rpcQueueName, 
     durable: false, 
     exclusive: false, 
     autoDelete: false, 
     arguments: null); 

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); 

var consumer = new EventingBasicConsumer(channel); 
consumer.Received += (model, ea) => 
{ 
    switch (ea.RoutingKey) 
    { 
     case queueName: 
      SaveJob(ea); 
      break; 
     case rpcQueueName: 
      CheckJob(ea); 
      break; 
    } 
    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 
}; 

channel.BasicConsume(queue: queueName, 
    noAck: false, 
    consumer: consumer); 

channel.BasicConsume(queue: rpcQueueName, 
        noAck: false, 
        consumer: consumer); 

Ich bin offen für bessere Vorschläge, da dies scheint ein bisschen aus.

So Senden ist nur:

var properties = channel.CreateBasicProperties(); 
properties.Persistent = true; 

channel.BasicPublish(exchange: "", 
        routingKey: queueName, 
        basicProperties: properties, 
        body: body); 

für die reguläre Arbeit Job und:

var corrId = Guid.NewGuid().ToString(); 
var props = channel.CreateBasicProperties(); 
props.ReplyTo = replyQueueName; 
props.CorrelationId = corrId; 

var messageBytes = Encoding.UTF8.GetBytes(msg); 
channel.BasicPublish(exchange: "", 
        routingKey: rpcQueueName, 
        basicProperties: props, 
        body: messageBytes); 

while (true) 
{ 
    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 
    if (ea.BasicProperties.CorrelationId == corrId) 
    { 
     return ea.Body != null && ea.Body.Any() ? BitConverter.ToInt32(ea.Body,0) : (int?)null; 
    } 
} 

Für RPC.

0

Da Sie keinen Namen für Ihre Warteschlange angeben, vermute ich, dass Sie die gleiche Warteschlange zweimal erhalten. Ich denke, was passiert, ist im Wesentlichen das.

Jobs -> saveJob_queue -> SomeSystemQueue
Jobs -> rpc_CheckJob_queue -> SomeSystemQueue

Versuchen zwei separate Warteschlange Namen Kommissionierung, und dann wieder laufen Code. Anstatt also diese:

var queueName = channel.QueueDeclare().QueueName; 

channel.QueueBind(queue: queueName, 
    exchange: "jobs", 
    routingKey: "saveJob_queue"); 

haben:

var name = "Queue A"; 
channel.QueueDeclare(name); 
channel.QueueBind(queue: queueName, 
     exchange: "jobs", 
     routingKey: "saveJob_queue"); 

Dann sonst Ihre zweite Warteschlange etwas benennen und das versuchen.

+0

Ich habe ursprünglich die beiden Warteschlangen (anders benannt haben, natürlich), aber hatte das gleiche –

+0

OK Vielleicht möchten Sie zurückschalten, nur um das komplett zu verwerfen Könnten Sie auch Ihre sendende Logik posten? So dass jeder sehen kann, welchen Routing-Schlüssel Sie adressieren und so weiter. –