2016-03-30 23 views
1

Hallo ich einen Rebus mit zwei Warteschlangen haben will, dann ist dies, wie ich es Config jetztWie kann ich einen Rebus mit zwei Warteschlangen haben?

container.Register<BuiltinHandlerActivator>(() => 
     { 
      var activator = new BuiltinHandlerActivator();    
      var rebusConnection = configuration["Rebus:ConnectionString"]; 
      activator.Register(() => ActivatorUtilities.CreateInstance<CampaignsHandler>(container)); 
      activator.Register(() => ActivatorUtilities.CreateInstance<MessageHandler>(container)); 
      activator.Register(() => ActivatorUtilities.CreateInstance<DeliveryStatusHandler>(container)); 
      Log.Logger = container.GetInstance<ILogger>(); 
      Configure.With(activator) 
       .Transport(t => t.UseSqlServer(rebusConnection, "RebusQueue", "BackgroundJobs")) 
       .Logging(l => l.Serilog(Log.Logger)) 
       .Routing(r => 
       { 
        var typeBasedRouting = r.TypeBased(); 
        typeBasedRouting.MapAssemblyOf<MessageSent>("BackgroundJobs");      

       }) 
       .Options(o => o.SetNumberOfWorkers(6)) 
       .Options(o => o.SetMaxParallelism(6)) 
       .Options(b => b.SimpleRetryStrategy(maxDeliveryAttempts: 1)) 
       .Start();    
      activator.Bus.Subscribe<MessageSent>().Wait(); 

      return activator; 
     }, Lifestyle.Singleton); 

meine Message verarbeitet nachrichts Objekte und macht sie obj mit Statusinfomationen und meinen DeliveryStatusHandler verarbeitet messagesent obj messagesent zu aktualisieren meine Datenbank. Dieses Problem ist, ich habe nur eine Warteschlange (eine Datenbanktabelle "RebusQueue"). Also erst nachdem alle sendMessage-Objekte fertig sind, beginnt die Datenbank zu aktualisieren.

Ich möchte meine MessageSent obj direkt nach meiner sendMessage Objekte verarbeiten verarbeitet.so ich nehme an, ich sollte zwei Warteschlangen (zwei Tabelle) haben? aber wie konfiguriert man den Rebus?

Ich schaute auf die Frage Multiple input queues in one rebus process stehen wir vor den gleichen Problemen?

das ist, was ich tue in Messagehandler.cs

class MessageHandler : IHandleMessages<SendMessage> 
{ 
    private readonly IBus _bus; 
    private MessagingRuntime _messagingRuntime; 
    private IRepository _repository; 
    private ISnapshotRepository _snapshotRepository; 
    private readonly ITemplateEngineProvider _templateEngineProvider; 
    private IUrlShortener _urlShortener; 
    private ILogger _logger; 

    public MessageHandler(MessagingRuntime messagingRuntime, IRepository repository, ISnapshotRepository snapshotRepository, ITemplateEngineProvider templateEngineProvider, IUrlShortener urlShortener, IBus bus, ILogger logger) 
    { 
     _messagingRuntime = messagingRuntime; 
     _repository = repository; 
     _snapshotRepository = snapshotRepository; 
     _templateEngineProvider = templateEngineProvider; 
     _urlShortener = urlShortener; 
     _bus = bus; 
     _logger = logger; 
    } 

    public async Task Handle(SendMessage message) 
    { 
     var template = _snapshotRepository.Query<Template>(message.DateCreated).Where(x => x.Id == message.TemplateId).FirstOrDefault(); 
     var subscriber = _snapshotRepository.Query<Subscriber>(message.DateCreated).Where(x => x.Id == message.SubscriberId).FirstOrDefault(); 
     var templateEngine = GetTemplateEngine(message.CampaignId, message.Tags); 


     var @event = new MessageSent 
     { 
      Id = SequentialGuid.Instance.NewGuid(), 
      DeliveryStatusId = message.DeliveryStatusId, 
      SubscriberId = subscriber.Id, 
      DateCreated = message.DateCreated     
     }; 

     try 
     { 
      _messagingRuntime.ProcessSendRequest(new[] { subscriber }, templateEngine, template); 
      @event.IsDeliverySuccessful = true; 
     } 
     catch (MessagingException ex) 
     { 
      _logger.Error(ex.ToString()); 
      @event.IsDeliverySuccessful = false; 
     } 

     await _bus.Publish(@event); 
    } 
} 

dies den zweiten Handler ist die

class DeliveryStatusHandler : IHandleMessages<MessageSent> 
{ 
    private ILogger _logger; 
    private IRepository _repository; 
    private IRepository2 _repository2; 
    private ISnapshotRepository _snapshotRepository; 

    public DeliveryStatusHandler(IRepository repository, IRepository2 repository2,ISnapshotRepository snapshotRepository, ILogger logger) 
    { 
     _repository = repository; 
     _repository2 = repository2; 
     _snapshotRepository = snapshotRepository; 
     _logger = logger; 
    } 

    public Task Handle(MessageSent @event) 
    { 
     var deliveryStatus = _repository2.Find<DeliveryStatus>(@event.DeliveryStatusId); 
     if (deliveryStatus == null) 
     { 
      _logger.Error("Delivery Status does not exist"); 
      return Task.FromResult<object>(null); 
     } 

     var deliveryStatusItem = _repository2.Find<DeliveryStatusItem>(@event.Id); 
     var subscriber = _snapshotRepository.Query<Subscriber>(@event.DateCreated).Where(x => x.Id == @event.SubscriberId).FirstOrDefault(); 
     if (deliveryStatusItem == null) 
     { 
      deliveryStatusItem = new DeliveryStatusItem(); 
      deliveryStatusItem.Id = @event.Id; 
      deliveryStatusItem.Email = subscriber.Email; 
      deliveryStatusItem.PhoneNumber = subscriber.PhoneNumber; 
      deliveryStatusItem.Name = subscriber.Name; 

     } 
     deliveryStatusItem.DeliveryStatusId = @event.DeliveryStatusId; 
     deliveryStatusItem.IsDeliverySuccessful = @event.IsDeliverySuccessful; 
     _repository2.Save<DeliveryStatusItem>(deliveryStatusItem); 
     return Task.FromResult<object>(null); 
    } 
} 

Antwort

2

Wenn ich Sie werden (die Datenbank aktualisieren), würde ich zwei Rebus Instanzen starten Jede hat ihre eigene Eingabewarteschlange (es ist vollkommen in Ordnung, dies im selben Prozess zu tun, wenn Sie sie nicht unabhängig voneinander aktualisieren müssen).

Die beiden Fällen wäre:

  1. Der erste mit dem Handler Sie mich
  2. zeigte
  3. die zweite, die es zu MessageSent und Griff

Auf diese Weise entsprechend Sie können abonniert haben, Konfigurieren Sie einfach die Anzahl der Threads und die Parallelitätseinstellungen für jede Instanz, sodass Sie die Verarbeitung der Nachrichten drosseln/skalieren können.

+0

Gibt es auf Ihrer Website ein Beispielprojekt mit zwei Rebus-Instanzen? Prost –

+0

ein Rebus kann nur eine Warteschlange haben, ist es richtig? –

+0

https://github.com/rebus-org/Rebus/wiki/Rebus-configuration-section muss ich die app.config in meinem Projekt ändern? –