2016-03-31 16 views
0

Ich verwende RabbitMQ mit C# als Message Broker Framework. In meinem Szenario habe ich viele Verbraucher, die aus verschiedenen Warteschlangen verbrauchen können. mehrere Warteschlangen bilden eine logische Gruppe aus der Perspektive eines Verbrauchers.RabbitMQ, ist es möglich, 1 Nachricht von einem Kanal zu verbrauchen, während es FIFO?

zum Beispiel: Queue1, Queue2, Queue3.

Consumer1 erste Nachricht von [Queue1, Queue3] und nur wenn nicht verfügbar Versuch von Queue2 zu ergreifen, um wollen

Consumer2 erste Nachricht von [Queue2, Queue3] und nur wenn nicht verfügbar Versuch nehmen wollen von Queue1 zu nehmen

Unten ist eine mögliche Einrichtung, jeder Verbraucher kümmert sich um Teilmenge der Warteschlangen. Consumer2 interessiert sich für alle Warteschlangen, in den Prioritäten 1,2, daher der Verbraucher die Prioritäten Logik beibehalten. Beachten Sie, dass dies ein anderes Szenario aus den Arbeitswarteschlangen ist, die hier RabbitMQ tutorials enter image description here

hier präsentiert wird, ist eine Klasse, die eine Nachricht zurück sollte:

public class MessagesProvider 
{ 

    private IConnection _connection; 
    private IModel _channel; 
    private readonly IConnectionFactory _connectionFactory; 
    private ConcurrentDictionary<string, string> _tagsQueueItems; 
    private EventingBasicConsumer _consumer; 
    private CancellationTokenSource _cts; 
    private TaskCompletionSource<BasicGetResult> _tcs; 

    public MessagesProvider() 
    { 
     _connectionFactory = new ConnectionFactory() 
     { 
      HostName = "localhost" 
     }; 

    } 

    public Task<BasicGetResult> GetMessage(int timeout, IEnumerable<string> queues) 
    { 
     _tagsQueueItems = new ConcurrentDictionary<string, string>(); 
     _cts = new CancellationTokenSource(); 
     _connection = _connectionFactory.CreateConnection(); 
     _channel = _connection.CreateModel(); 
     _channel.SingleMessagePerChannel(); 
     _consumer = new EventingBasicConsumer(_channel); 
     _consumer.Received += OnReceive; 

     foreach (var queue in queues) 
     { 
      var tag = _channel.BasicConsume(queue, false, _consumer); 
      _tagsQueueItems.AddOrUpdate(queue, tag, (k, o) => queue); 
     } 

     return _tcs.Task; 
    } 

    private void SetResult(BasicGetResult result) 
    { 
     _tcs.SetResult(result); 
     if (_channel.IsOpen) 
      _channel.Close(); 
     if (_connection.IsOpen) 
      _connection.Close(); 
    } 

    private void OnReceive(object sender, BasicDeliverEventArgs e) 
    { 
     _consumer.Received -= OnReceive; 
     _channel.BasicAck(e.DeliveryTag, false); 
     var result = new BasicGetResult(e.DeliveryTag, e.Redelivered, e.Exchange, e.RoutingKey, 1, 
      e.BasicProperties, e.Body); 

     SetResult(result); 

    } 
} 

Kanaldefinition:

public static class ChannelExtension 
{ 
    public static void SingleMessagePerChannel(this IModel channel) 
    { 
     channel.BasicQos(0,1,true); 
    } 

    public static void SingleMessagePerConsumer(this IModel channel) 
    { 
     channel.BasicQos(0,1,false); 
    } 
} 

Das Problem ist, dass ich mich für eine Warteschlange eins nach dem anderen registriere (in der Methode "GetMessage"), und ich war nicht in der Lage, eine "atomare" Operation zu finden, die die nächste FIFO messa bringt ge aus einer Gruppe von Warteschlangen.

Ich bin auf der Suche nach einer Möglichkeit, dies zu tun: _consumer.getNextMessage();

Ein anderer Ansatz wäre, alle ersten Nachrichten aus der ersten Prioritätsgruppe zu verbrauchen, sie auf der Verbraucherseite zu filtern, um die älteste Nachricht zu finden und keine Bestätigung für die anderen Nachrichten (in derselben Prioritätsgruppe) zu senden. . Dieser Ansatz ist auch problematisch, da es bedeutet, dass zu einem Zeitpunkt, zu dem die Nachrichten abgerufen werden, andere Verbraucher nicht in der Lage sind, mit ihnen umzugehen (bis keine Bestätigung).

Irgendwelche Vorschläge? Vielen Dank.

+1

Scheint, Sie haben das Setup falsch. Könnten Sie bitte erläutern, was Sie zu erreichen versuchen? – cantSleepNow

+0

+1 zu @cantSleepNow - dieses ganze Szenario scheint wie ein schlechtes Design, und was Sie tun möchten, ist nicht wirklich ohne größere Probleme möglich. Welches Problem versuchen Sie zu lösen? –

+0

@ cantSleepNow, @ Derick Bailey, ich stimme Ihnen zu, das Design ist schlecht und ich mag es nicht, Einstellung der Priorität auf der Consumer-Ebene ist falsch IMO aber es ist von den Clients eines derzeit funktionierenden Systems erforderlich.Was ich brauche Das Ziel besteht darin, es Dritten zu ermöglichen, zu definieren, welche Nachrichten für sie wichtig sind. Jeder Verbraucher hat seine eigenen Prioritäten. Es ist ein komplexeres Szenario, als nur eine Last zwischen den Verbrauchern der gleichen Warteschlangen auszugleichen, wofür es eine Lösung aus der Box mit RabbitMQ gibt. Sie können das Diagramm näher betrachten, es erklärt meine Bedürfnisse. – user440850

Antwort

0

Ich glaube nicht, dass Sie Warteschlangen und Verbraucher in der Art verwenden, wie sie entworfen wurden. Stellen Sie sich eine Warteschlange als einen Haufen Arbeit vor. Jeder Eimer Arbeit kann N Verbraucher haben, die die Arbeit machen.

Jeder Verbraucher sollte so konzipiert sein, dass er mit einer einzelnen Warteschlange arbeitet und andere Warteschlangen oder andere Verbraucher nicht kennt oder sich darum kümmert.

Wenn Sie versuchen, eine "hohe Priorität" Warteschlange zu erreichen, dann schlage ich Ihnen einen einzigen Austausch mit 2 verschiedenen Warteschlangen:

  • Regular Priority Queue
  • High Priority Queue

Dann werden Nachrichten standardmäßig in die reguläre Prioritätswarteschlange eingereiht. Wenn eine Nachricht mit hoher Priorität ankommt, können Sie die Kopfzeile der Nachricht so festlegen, dass sie in die Warteschlange mit hoher Priorität verschoben wird.

Wir verwenden diese heute gleichen Ansatz eine direkte Warteschlange verwenden und Hinzufügen eines Headers, wenn wir eine Nachricht gehen, in die Warteschlange mit hoher Priorität werden soll.

+0

Glücklicherweise war ich in der Lage, die Stack-Inhaber zu überzeugen, dass der RabbitMQ-Hammer nicht für das Problem geeignet ist, so änderten sie das Problem, vereinfachten die Anforderungen, entfernten sie die Komplexität der verschiedenen Consumer aus den verschiedenen logischen Gruppen. Jetzt konsumieren alle Konsumenten von den gleichen Gruppen, die zu einer einzigen Warteschlange (pro Gruppe) geroutet werden. Vielen Dank. – user440850