2012-04-09 19 views
16

Quer gebucht http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9Schein BufferBlock.Post/Receive/ReceiveAsync Rennen/bug

Ich weiß ... Ich bin nicht wirklich TplDataflow auf ihr maximales Potenzial verwenden. ATM Ich verwende einfach als eine sichere Warteschlange für die Nachrichtenübergabe, wo Hersteller und Verbraucher mit unterschiedlichen Raten laufen. Ich sehe ein seltsames Verhalten, das mich ratlos macht, wie ich weitergehen soll.

private BufferBlock<object> messageQueue = new BufferBlock<object>(); 

public void Send(object message) 
{ 
    var accepted=messageQueue.Post(message); 
    logger.Info("Send message was called qlen = {0} accepted={1}", 
    messageQueue.Count,accepted); 
} 

public async Task<object> GetMessageAsync() 
{ 
    try 
    { 
     var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30)); 
     //despite messageQueue.Count>0 next line 
     //occasionally does not execute 
     logger.Info("message received"); 
     //....... 
    } 
    catch(TimeoutException) 
    { 
     //do something 
    } 
} 

In dem obigen Code (die Teil der 2000 Linie verteilte Lösung ist), wird Send alle 100ms periodisch oder so bezeichnet wird. Das bedeutet, ein Artikel ist Post ed to messageQueue mit etwa 10 Mal pro Sekunde. Dies ist verifiziert. Gelegentlich scheint jedoch ReceiveAsync nicht innerhalb der Zeitüberschreitung abgeschlossen zu sein (d. H. Die Post führt nicht dazu, dass ReceiveAsync abgeschlossen wird) und TimeoutException wird nach 30 Sekunden ausgelöst. An diesem Punkt ist messageQueue.Count in den Hunderten. Das ist unerwartet. Dieses Problem wurde auch bei langsameren Buchungsraten beobachtet (1 Post/Sekunde) und tritt normalerweise auf, bevor 1000 Artikel die überschritten haben.

Also, um dieses Problem zu arbeiten, ich den folgenden Code verwenden, die funktioniert, aber gelegentlich verursacht 1s Latenz, wenn (aufgrund des Fehler oben auftritt) empfängt

public async Task<object> GetMessageAsync() 
    { 
     try 
     { 
      object m; 
      var attempts = 0; 
      for (; ;) 
      { 
       try 
       { 
        m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1)); 
       } 
       catch (TimeoutException) 
       { 
        attempts++; 
        if (attempts >= 30) throw; 
        continue; 
       } 
       break; 

      } 

      logger.Info("message received"); 
      //....... 
     } 
     catch(TimeoutException) 
     { 
      //do something 
     } 
    } 

Das sieht wie eine Race-Bedingung in TDF zu mir, aber ich kann nicht auf den Grund, warum dies nicht in den anderen Orten, wo ich auf ähnliche Weise zu verwenden. Der Versuch, von ReceiveAsync zu Receive zu wechseln, hilft nicht. Ich habe nicht überprüft, aber ich stelle mir isoliert vor, der obige Code funktioniert perfekt. Es ist ein Muster, das ich in "Einführung in TPL Dataflow" tpldataflow.docx dokumentiert habe.

Was kann ich tun, um diesem Problem auf den Grund zu gehen? Gibt es Metriken, die helfen könnten, zu schließen, was passiert? Wenn ich keinen zuverlässigen Testfall erstellen kann, welche Informationen kann ich dann anbieten?

Hilfe!

+1

Ich sehe nichts falsch mit dem, was Sie tun oder was Ihre Erwartungen hier sind. Ich denke definitiv, dass Sie dies in den MSDN-Foren mehr aktiv halten müssen als hier. Sie haben bereits die Aufmerksamkeit von @ Stephen Toub bekommen und er ist definitiv der Typ, den Sie untersuchen wollen. –

+0

Nein. Ich bin nie auf den Grund gegangen. Ich konnte das Problem nicht in einem kleinen, in sich geschlossenen Beispiel reproduzieren. Da ich nur BufferBlock verwendet habe, habe ich stattdessen meine eigene asynchrone Warteschlangenimplementierung gerollt. Ich musste keinen anderen Code ändern ... Ich habe die Teile der BufferBlock-Schnittstelle, die ich verwendete, einfach neu implementiert. Funktioniert jetzt ein Leckerbissen, was mich denken lässt, dass etwas nicht stimmt, aber ich kann es nicht beweisen. Gr. – spender

+0

@spendor Sehr interessant, merkwürdigerweise verschrottete ich meine eigene asynchrone gleichzeitige Queue-Implementierung nach dem Auffinden von BufferBlock ...Jetzt muss ich es noch einmal überdenken. Vielen Dank. –

Antwort

1

Stephen scheint zu denken, die folgende ist die Lösung

var m = erwarten messageQueue.ReceiveAsync();

Statt:

var m = erwarten messageQueue.ReceiveAsync (TimeSpan.FromSeconds (30));

Können Sie dies bestätigen oder ablehnen?

+0

Das hat nicht geklappt. Es spielte keine Rolle, welche ReceiveAsync-Überladung ich wählte, das Ergebnis war das gleiche. Siehe meinen Kommentar oben für meine Entschließung. – spender