2015-11-07 14 views
40

Jimmy Boagard beschreibt eine McDonalds Fast-Food-Kette here es von oben Artikeln gestohlen zu einer scatter gather pattern.Wie eine Sage mit einem Scatter/Gather Muster In Masstransit 3.0

Workflow-Bild im Vergleich zu implementieren: enter image description here

Erstimplementierung Gedanken:

Um eine gemeinsame Schnittstelle für alle Arten von FoodOrdered Veranstaltungen zu haben, die alle Lebensmittelstationen bekommen würden und dann jede Lebensmittelstation wäre in der Lage zu konsumieren/schaffen seine jeweiligen Posten und veröffentlichen Sie ein gemeinsames Ereignis. Bsp .: Pommes und Burger-Station bekommt eine Nachricht bezüglich einer Bestellung von Fries, die Pommes-Station konsumiert die Bestellung kündigt ein ItemDoneEvent an, auf das die Saga wartet.

Anfängliche Bedenken:

Da die Saga nicht über die Art der Nahrung nur die Tatsache abgeschlossen ist egal, dass alle Lebensmittel abgeschlossen ist dies scheint eine OK Lösung. Jedoch nach Lesen Warnungen here in Bezug auf die gemeinsame Nutzung von Warteschlangen und merkt, dass Consumer.Conditional filtering has been removed with MassTransit 3.0 Es fühlt sich an, als ob das Framework sagt "Bad Things (TM) wird passieren" mit dieser Art von Ansatz. Aber ich bin nicht sicher, wie sonst Sie es ohne das Errichten einer Nachrichtenanforderung und -antwort und des Korrelierens des Ereignisses für jedes Nahrungsmittel in der Küche tun würden. Ex: FriesOrdered, BurgerOrdered FriesCooked, BurgerCooked. Das wäre sehr mühsam, wenn du das für jeden Gegenstand in der Küche machen müsstest?

Angesichts der oben genannten Bedenken - wie würde eine gute Saga Beispiel für diese Art von Workflow aussehen?

+4

Ich kann über das Wochenende eine Wende machen und eine Probe auf das MT-Repository stellen. –

+1

Chris, hast du jemals eine Chance bekommen, dabei zu schwingen? Ich bin derzeit auf der Suche nach einem ähnlichen Problem –

+2

Warum können Sie nicht die Liste der bestellten Lebensmittel in Saga-Instanz und entfernen Sie Elemente aus der Liste oder als "fertig" in der Liste Wert Objekte, wenn Sie eine generische "FoodReady" mit erhalten bestimmte 'FoodType' in der Nachricht? Wenn du schließlich herausfindest, dass die Liste leer ist, kannst du die Saga abschließen. –

Antwort

0

Das Problem mit dem Zurücksetzen abgeschlossener Ereignisse in die Saga ist, dass es Konflikte auf einer freigegebenen Ressource (d. H. Dem Saga-Status) erzeugt.

Jim hat einen weiteren Beitrag, der nach dem von Ihnen referenzierten Post kam, der das Problem und die Lösung umreißt. Natürlich spricht er speziell über NServiceBus, aber das Problem und die Konzepte sind die gleichen.

https://lostechies.com/jimmybogard/2014/02/27/reducing-nservicebus-saga-load/

einen externen Speicher erstellen. Legen Sie einen Datensatz für jedes Arbeitselement ein. Lassen Sie jeden Mitarbeiter seine Arbeit selbst erledigen, während die Saga effektiv mit verzögerter Nachrichtenübermittlung abfragt, um zu sehen, ob alle Arbeit erledigt ist.

Dann machen Sie noch Scatter-Gather, aber der "Aggregator" wurde durch das Prozess-Manager-Muster ersetzt, um Konflikte zu reduzieren.

0

Ich kam in ähnliches Problem - müssen einige Dutzend Befehle (alle die gleiche Schnittstelle, IMyRequest) veröffentlichen und alle warten.

Eigentlich initiiert mein Befehl andere Saga, die IMyRequestDone am Ende der Verarbeitung veröffentlichen, ohne Kennzeichnung abgeschlossen. (Sie müssen sie irgendwann zu einem späteren Zeitpunkt vervollständigen.) Anstatt also die Anzahl der abgeschlossenen verschachtelten Sagen in der Eltern-Saga zu speichern, frage ich einfach den Status der Kind-Saga-Instanzen ab.

prüfen bei jeder MyRequestDone Nachricht:

Schedule(() => FailSagaOnRequestsTimeout, x => x.CheckToken, x => 
{ 
    // timeout for all requests 
    x.Delay = TimeSpan.FromMinutes(10); 
    x.Received = e => e.CorrelateById(context => context.Message.CorrelationId); 
}); 


During(Active, 
    When(Xxx) 
     .ThenAsync(async context => 
     { 
      await context.Publish(context => new MyRequestCommand(context.Instance, "foo")); 
      await context.Publish(context => new MyRequestCommand(context.Instance, "bar")); 

      context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow + FailSagaOnRequestsTimeout.Delay; 
      context.Instance.WaitingMyResponsesCount = 2; 
     }) 
     .TransitionTo(WaitingMyResponses) 
     .Schedule(FailSagaOnRequestsTimeout, context => new FailSagaCommand(context.Instance)) 
    ); 

During(WaitingMyResponses, 
    When(MyRequestDone) 
     .Then(context => 
     { 
      if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow) 
       throw new TimeoutException(); 
     }) 
     .If(context => 
     { 
      var db = serviceProvider.GetRequiredService<DbContext>(); 
      var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList(); 
      var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount && 
       requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing)); // assume 3 states of request - Processing, Done and Failed 
      return allDone; 
     }, x => x 
      .Unschedule(FailSagaOnRequestsTimeout) 
      .TransitionTo(Active)) 
     ) 
     .Catch<TimeoutException>(x => x.TransitionTo(Failed)) 
); 

During(WaitingMyResponses, 
    When(FailSagaOnRequestsTimeout.Received) 
     .TransitionTo(Failed) 

Überprüfen Sie regelmäßig, dass alle getan Anfragen (durch "Reduzierung NServiceBus Saga load"):

Schedule(() => CheckAllRequestsDone, x => x.CheckToken, x => 
{ 
    // check interval 
    x.Delay = TimeSpan.FromSeconds(15); 
    x.Received = e => e.CorrelateById(context => context.Message.CorrelationId); 
}); 

During(Active, 
    When(Xxx) 
     .ThenAsync(async context => 
     { 
      await context.Publish(context => new MyRequestCommand(context.Instance, "foo")); 
      await context.Publish(context => new MyRequestCommand(context.Instance, "bar")); 

      context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow.AddMinutes(10); 
      context.Instance.WaitingMyResponsesCount = 2; 
     }) 
     .TransitionTo(WaitingMyResponses) 
     .Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance)) 
    ); 

During(WaitingMyResponses, 
    When(CheckAllRequestsDone.Recieved) 
     .Then(context => 
     { 
      var db = serviceProvider.GetRequiredService<DbContext>(); 
      var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList(); 
      var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount && 
       requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing)); 
      if (!allDone)   
      { 
       if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow + CheckAllRequestsDone.Delay)    
        throw new TimeoutException(); 
       throw new NotAllDoneException(); 
      } 
     }) 
     .TransitionTo(Active) 
     .Catch<NotAllDoneException>(x => x.Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance))) 
     .Catch<TimeoutException>(x => x.TransitionTo(Failed)); 
1

Könnten Sie nicht "einfach" das Objekt übergeben in der Warteschlange als Ereignisparameter? Wenn der Saga-Listener ein "Order completed" -Ereignis erhält, würde es das Objekt enthalten, das im Ereignis abgeschlossen ist?

ich vorstellen, dass es über eine generische Methode an die Warteschlange gesendet werden, wo das Objekt IFoodOrdered implementieren muss

Dann können Sie auf dem eine virtuelle Methode implementieren, die die Saga verwenden kann, das „generische“, was zu tun, wenn es abgeholt, und Sie müssen nur Überladungen für diese speziellen Elemente implementieren, die etwas Besonderes erfordern passieren?