2016-07-07 9 views
2

Ich versuche, eine Reihe von Aufgaben in eine Warteschlange zu stellen und sie asynchron mit Azure Service Fabric auszuführen. Ich verwende derzeit die CloudMessageQueue mit Worker-Rollen. Ich versuche, zu Service Fabric zu migrieren. Von den Arbeiter Rollen, hier mein Code:Azure Service Fabric Nachrichtenwarteschlange

private void ExecuteTask() 
    { 
     CloudQueueMessage message = null; 

     if (queue == null) 
     { 
      jmaLogProvider.WriteToLog(new ErrorMessage(MessageSeverity.Error, String.Format("Queue for WorkerRole2 is null. Exiting."))); 
      return; 
     } 

     try 
     { 
      message = queue.GetMessage(); 
      if (message != null) 
      { 
       JMATask task = GetTask(message.AsString); 
       string msg = (message == null) ? string.Empty : message.AsString; 
       //jmaLogProvider.WriteToLog(new ErrorMessage(MessageSeverity.JMA, String.Format("Executing task {0}", msg))); 
       queue.DeleteMessage(message); 
       PerformTask(task); 
      } 
     } 
     catch (Exception ex) 
     { 
      string msg = (message == null) ? string.Empty : message.AsString; 
      jmaLogProvider.WriteToLog(new ErrorMessage(MessageSeverity.Error, String.Format("Message {0} Error removing message from queue {1}", msg, ex.ToString()))); 
     } 
    } 

Ich habe einige Fragen:

  1. Wie führe ich die Aufgabe Methode asynchron durchführen? Ich möchte ungefähr 30 - 40 Aufgaben gleichzeitig ausführen.
  2. Ich habe eine Liste von JMATask. Wie füge ich die Liste zu einer Warteschlange hinzu?
  3. Muss die Liste zu einer Warteschlange hinzugefügt werden?

    namespace Stateful1 
    { 
        public class JMATask 
        { 
        public string Name { get; set; } 
        } 
    
    /// <summary> 
    /// An instance of this class is created for each service replica by the Service Fabric runtime. 
    /// </summary> 
    internal sealed class Stateful1 : StatefulService 
    { 
    public Stateful1(StatefulServiceContext context) 
        : base(context) 
    { } 
    
    /// <summary> 
    /// Optional override to create listeners (e.g., HTTP, Service Remoting, WCF, etc.) for this service replica to handle client or user requests. 
    /// </summary> 
    /// <remarks> 
    /// For more information on service communication, see http://aka.ms/servicefabricservicecommunication 
    /// </remarks> 
    /// <returns>A collection of listeners.</returns> 
    protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners() 
    { 
        return new ServiceReplicaListener[0]; 
    } 
    
    /// <summary> 
    /// This is the main entry point for your service replica. 
    /// This method executes when this replica of your service becomes primary and has write status. 
    /// </summary> 
    /// <param name="cancellationToken">Canceled when Service Fabric needs to shut down this service replica.</param> 
    protected override async Task RunAsync(CancellationToken cancellationToken) 
    { 
        // TODO: Replace the following sample code with your own logic 
        //  or remove this RunAsync override if it's not needed in your service. 
    
        IReliableQueue<JMATask> tasks = await this.StateManager.GetOrAddAsync<IReliableQueue<JMATask>>("JMATasks"); 
        //var myDictionary = await this.StateManager.GetOrAddAsync<IReliableDictionary<string, long>>("myDictionary"); 
    
        while (true) 
        { 
         cancellationToken.ThrowIfCancellationRequested(); 
    
         using (var tx = this.StateManager.CreateTransaction()) 
         { 
          var result = await tasks.TryDequeueAsync(tx); 
    
          //how do I execute this method async? 
          PerformTask(result.Value); 
    
          //Create list of JMA Tasks to queue up 
          await tasks.EnqueueAsync(tx, new JMATask()); 
    
          //var result = await myDictionary.TryGetValueAsync(tx, "Counter"); 
    
          //ServiceEventSource.Current.ServiceMessage(this, "Current Counter Value: {0}", 
          // result.HasValue ? result.Value.ToString() : "Value does not exist."); 
    
          //await myDictionary.AddOrUpdateAsync(tx, "Counter", 0, (key, value) => ++value); 
    
          // If an exception is thrown before calling CommitAsync, the transaction aborts, all changes are 
          // discarded, and nothing is saved to the secondary replicas. 
          await tx.CommitAsync(); 
         } 
    
         await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken); 
        } 
    } 
    
    private async void PerformTask(JMATask task) 
    { 
        //execute task 
    } 
    

    }

+0

Kämpfen mit ähnlichen Problem und das war hilfreich, danke. Neugierig, was Sie gemacht haben und wie es für Sie funktioniert hat. – kenchilada

+1

Ich habe eine andere Technologie verwendet: https://msdn.microsoft.com/en-us/library/dn568104.aspx –

Antwort

1

Die RunAsync Methode sollte diese Zeile Code nicht haben: await tasks.EnqueueAsync(tx, new JMATask());

Aufgaben erstellen Liste der JMA die Warteschlange sollte eine andere Methode in der Stateful-Dienst, der wie folgt aussieht:

public async Task AddJMATaskAsync(JMATask task) 
    { 
     var tasksQueue = await StateManager.GetOrAddAsync<IReliableQueue<JMATask>>("JMATasks"); 
     using (var tx = StateManager.CreateTransaction()) 
     { 
      try 
      { 
       await tasksQueue.EnqueueAsync(tx, request); 
       await tx.CommitAsync(); 
      } 
      catch (Exception ex) 
      { 
       tx.Abort(); 
      } 
     } 
    } 

und dann PerformTask Methode kann einen Aufruf einer staatenlos Micro enthalten:

public async Task PerformTask (JMATask task) 
    { 
     //1. resolve stateless microservice URI 
     // statelessSvc 

     //2. call method of the stateless microservice 
     // statelessSvc.PerformTask(task); 
    } 

Also im Grunde die Stateful-Dienst wird nur queu e und entnimmt die Aufgaben. Die Ausführung der eigentlichen Aufgabe kann von einem Microservice durchgeführt werden, der für alle Knoten im Cluster verfügbar ist.

1

Sie eine Liste von Aufgaben erstellen und tun dann Task.WhenAll (Tasklist) erwarten;

Das ist wahrscheinlich die einfachste direkte Antwort.

Jedoch - wenn jede Aufgabe etwas anders ist, haben Sie in Betracht gezogen, individuelle Mikrodienste für jede Aufgabe zu erstellen?

+0

Ja, jede Aufgabe ist anders. Wie erstelle ich individuelle Mikrodienste? –

+0

Idealerweise sollte für jede Aufgabe ein Dienst (statusbehaftet, zustandslos) erstellt werden. Es ist schwer, genau zu antworten, ohne ein tieferes Verständnis zu haben, aber das wäre meine Bauchreaktion. Jeder Dienst sollte eine einfache Verantwortung haben und wirklich gut darin sein, das zu tun, was er tun soll. –

+0

Ich habe meinen Beitrag aktualisiert. Die Aufgaben verschieben Daten von einem System zu einem anderen. Es gibt mehrere hundert verschiedene Aufgaben, aber viele von ihnen führen dieselbe Art von Aktion aus. –