Ich verwende RabbitMQ mit C# als Message Broker Framework. In meinem Szenario habe ich viele Verbraucher, die aus verschiedenen Warteschlangen verbrauchen können. mehrere Warteschlangen bilden eine logische Gruppe aus der Perspektive eines Verbrauchers.RabbitMQ, ist es möglich, 1 Nachricht von einem Kanal zu verbrauchen, während es FIFO?
zum Beispiel: Queue1, Queue2, Queue3.
Consumer1 erste Nachricht von [Queue1, Queue3] und nur wenn nicht verfügbar Versuch von Queue2 zu ergreifen, um wollen
Consumer2 erste Nachricht von [Queue2, Queue3] und nur wenn nicht verfügbar Versuch nehmen wollen von Queue1 zu nehmen
Unten ist eine mögliche Einrichtung, jeder Verbraucher kümmert sich um Teilmenge der Warteschlangen. Consumer2 interessiert sich für alle Warteschlangen, in den Prioritäten 1,2, daher der Verbraucher die Prioritäten Logik beibehalten. Beachten Sie, dass dies ein anderes Szenario aus den Arbeitswarteschlangen ist, die hier RabbitMQ tutorials
hier präsentiert wird, ist eine Klasse, die eine Nachricht zurück sollte:
public class MessagesProvider
{
private IConnection _connection;
private IModel _channel;
private readonly IConnectionFactory _connectionFactory;
private ConcurrentDictionary<string, string> _tagsQueueItems;
private EventingBasicConsumer _consumer;
private CancellationTokenSource _cts;
private TaskCompletionSource<BasicGetResult> _tcs;
public MessagesProvider()
{
_connectionFactory = new ConnectionFactory()
{
HostName = "localhost"
};
}
public Task<BasicGetResult> GetMessage(int timeout, IEnumerable<string> queues)
{
_tagsQueueItems = new ConcurrentDictionary<string, string>();
_cts = new CancellationTokenSource();
_connection = _connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
_channel.SingleMessagePerChannel();
_consumer = new EventingBasicConsumer(_channel);
_consumer.Received += OnReceive;
foreach (var queue in queues)
{
var tag = _channel.BasicConsume(queue, false, _consumer);
_tagsQueueItems.AddOrUpdate(queue, tag, (k, o) => queue);
}
return _tcs.Task;
}
private void SetResult(BasicGetResult result)
{
_tcs.SetResult(result);
if (_channel.IsOpen)
_channel.Close();
if (_connection.IsOpen)
_connection.Close();
}
private void OnReceive(object sender, BasicDeliverEventArgs e)
{
_consumer.Received -= OnReceive;
_channel.BasicAck(e.DeliveryTag, false);
var result = new BasicGetResult(e.DeliveryTag, e.Redelivered, e.Exchange, e.RoutingKey, 1,
e.BasicProperties, e.Body);
SetResult(result);
}
}
Kanaldefinition:
public static class ChannelExtension
{
public static void SingleMessagePerChannel(this IModel channel)
{
channel.BasicQos(0,1,true);
}
public static void SingleMessagePerConsumer(this IModel channel)
{
channel.BasicQos(0,1,false);
}
}
Das Problem ist, dass ich mich für eine Warteschlange eins nach dem anderen registriere (in der Methode "GetMessage"), und ich war nicht in der Lage, eine "atomare" Operation zu finden, die die nächste FIFO messa bringt ge aus einer Gruppe von Warteschlangen.
Ich bin auf der Suche nach einer Möglichkeit, dies zu tun: _consumer.getNextMessage();
Ein anderer Ansatz wäre, alle ersten Nachrichten aus der ersten Prioritätsgruppe zu verbrauchen, sie auf der Verbraucherseite zu filtern, um die älteste Nachricht zu finden und keine Bestätigung für die anderen Nachrichten (in derselben Prioritätsgruppe) zu senden. . Dieser Ansatz ist auch problematisch, da es bedeutet, dass zu einem Zeitpunkt, zu dem die Nachrichten abgerufen werden, andere Verbraucher nicht in der Lage sind, mit ihnen umzugehen (bis keine Bestätigung).
Irgendwelche Vorschläge? Vielen Dank.
Scheint, Sie haben das Setup falsch. Könnten Sie bitte erläutern, was Sie zu erreichen versuchen? – cantSleepNow
+1 zu @cantSleepNow - dieses ganze Szenario scheint wie ein schlechtes Design, und was Sie tun möchten, ist nicht wirklich ohne größere Probleme möglich. Welches Problem versuchen Sie zu lösen? –
@ cantSleepNow, @ Derick Bailey, ich stimme Ihnen zu, das Design ist schlecht und ich mag es nicht, Einstellung der Priorität auf der Consumer-Ebene ist falsch IMO aber es ist von den Clients eines derzeit funktionierenden Systems erforderlich.Was ich brauche Das Ziel besteht darin, es Dritten zu ermöglichen, zu definieren, welche Nachrichten für sie wichtig sind. Jeder Verbraucher hat seine eigenen Prioritäten. Es ist ein komplexeres Szenario, als nur eine Last zwischen den Verbrauchern der gleichen Warteschlangen auszugleichen, wofür es eine Lösung aus der Box mit RabbitMQ gibt. Sie können das Diagramm näher betrachten, es erklärt meine Bedürfnisse. – user440850