2013-03-04 14 views
38

Ich habe vor kurzem einen Port zu C++ 11 gemacht, der std :: atomic eines dreifachen Puffers verwendet, um als Gleichzeitigkeits-sync-Mechanismus verwendet zu werden. Die Idee hinter diesem Thread-Sync-Ansatz ist, dass für einen Produzenten-Verbraucher-Situation, wo Sie einen Hersteller haben, der schneller läuft, dass der Verbraucher, Triple-Pufferung einige Vorteile geben kann, da der Produzenten-Thread nicht "verlangsamt" werden wird auf den Verbraucher warten. In meinem Fall habe ich einen Physik-Thread, der bei ~ 120 Bildern pro Sekunde aktualisiert wird, und einen Render-Thread, der bei ~ 60 Bildern pro Sekunde läuft. Offensichtlich möchte ich, dass der Render-Thread immer den neuesten Zustand erhält, aber ich weiß auch, dass ich aufgrund des Unterschieds in den Raten viele Frames aus dem Physik-Thread überspringen werde. Auf der anderen Seite möchte ich, dass mein Physik-Thread seine konstante Aktualisierungsrate beibehält und nicht durch den langsameren Render-Thread eingeschränkt wird, der meine Daten sperrt.C++ 11 atomare Speicherordnung - ist dies eine korrekte Verwendung der entspannten (release-consume) Bestellung?

Der ursprüngliche C-Code wurde von Remis-Gedanken gemacht und die vollständige Erklärung ist in seinem blog. Ich ermutige alle, die daran interessiert sind, es für ein besseres Verständnis der ursprünglichen Implementierung zu lesen.

Meine Implementierung kann here gefunden werden.

Die Grundidee besteht darin, ein Array mit 3 Positionen (Puffer) und einem atomaren Flag zu haben, das compare-and-swapped wird, um zu bestimmen, welche Array-Elemente welchem ​​Zustand zu welchem ​​Zeitpunkt entsprechen. Auf diese Weise wird nur eine atomare Variable verwendet, um alle 3 Indizes des Arrays und die Logik hinter der Dreifachpufferung zu modellieren. Die 3 Positionen des Puffers heißen Dirty, Clean und Snap. Der Producer schreibt immer in den Dirty-Index und kann den Writer umdrehen, um den Dirty mit dem aktuellen Clean-Index zu tauschen. Der Consumer kann einen neuen Snap anfordern, der den aktuellen Snap-Index mit dem Clean-Index tauscht, um den neuesten Puffer zu erhalten. Der Consumer liest immer den Puffer in der Snap-Position.

Die Flagge besteht aus einem 8-Bit unsigned int und die Bits entsprechen:

(ungebraucht) (new write) (2x schmutzig) (2x sauber) (2x Snap)

Die newWrite Das Extra-Bit-Flag wird vom Schreiber gesetzt und vom Leser gelöscht. Der Leser kann dies verwenden, um zu überprüfen, ob seit dem letzten Snap irgendwelche Schreibvorgänge stattgefunden haben, und wenn dies nicht der Fall ist, wird kein weiterer Snap mehr benötigt. Das Flag und die Indizes können unter Verwendung einfacher bitweiser Operationen erhalten werden.

Ok jetzt für den Code:

template <typename T> 
class TripleBuffer 
{ 

public: 

    TripleBuffer<T>(); 
    TripleBuffer<T>(const T& init); 

    // non-copyable behavior 
    TripleBuffer<T>(const TripleBuffer<T>&) = delete; 
    TripleBuffer<T>& operator=(const TripleBuffer<T>&) = delete; 

    T snap() const; // get the current snap to read 
    void write(const T newT); // write a new value 
    bool newSnap(); // swap to the latest value, if any 
    void flipWriter(); // flip writer positions dirty/clean 

    T readLast(); // wrapper to read the last available element (newSnap + snap) 
    void update(T newT); // wrapper to update with a new element (write + flipWriter) 

private: 

    bool isNewWrite(uint_fast8_t flags); // check if the newWrite bit is 1 
    uint_fast8_t swapSnapWithClean(uint_fast8_t flags); // swap Snap and Clean indexes 
    uint_fast8_t newWriteSwapCleanWithDirty(uint_fast8_t flags); // set newWrite to 1 and swap Clean and Dirty indexes 

    // 8 bit flags are (unused) (new write) (2x dirty) (2x clean) (2x snap) 
    // newWrite = (flags & 0x40) 
    // dirtyIndex = (flags & 0x30) >> 4 
    // cleanIndex = (flags & 0xC) >> 2 
    // snapIndex = (flags & 0x3) 
    mutable atomic_uint_fast8_t flags; 

    T buffer[3]; 
}; 

Umsetzung:

template <typename T> 
TripleBuffer<T>::TripleBuffer(){ 

    T dummy = T(); 

    buffer[0] = dummy; 
    buffer[1] = dummy; 
    buffer[2] = dummy; 

    flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2 
} 

template <typename T> 
TripleBuffer<T>::TripleBuffer(const T& init){ 

    buffer[0] = init; 
    buffer[1] = init; 
    buffer[2] = init; 

    flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2 
} 

template <typename T> 
T TripleBuffer<T>::snap() const{ 

    return buffer[flags.load(std::memory_order_consume) & 0x3]; // read snap index 
} 

template <typename T> 
void TripleBuffer<T>::write(const T newT){ 

    buffer[(flags.load(std::memory_order_consume) & 0x30) >> 4] = newT; // write into dirty index 
} 

template <typename T> 
bool TripleBuffer<T>::newSnap(){ 

    uint_fast8_t flagsNow(flags.load(std::memory_order_consume)); 
    do { 
    if(!isNewWrite(flagsNow)) // nothing new, no need to swap 
     return false; 
    } while(!flags.compare_exchange_weak(flagsNow, 
             swapSnapWithClean(flagsNow), 
             memory_order_release, 
             memory_order_consume)); 
    return true; 
} 

template <typename T> 
void TripleBuffer<T>::flipWriter(){ 

    uint_fast8_t flagsNow(flags.load(std::memory_order_consume)); 
    while(!flags.compare_exchange_weak(flagsNow, 
            newWriteSwapCleanWithDirty(flagsNow), 
            memory_order_release, 
            memory_order_consume)); 
} 

template <typename T> 
T TripleBuffer<T>::readLast(){ 
    newSnap(); // get most recent value 
    return snap(); // return it 
} 

template <typename T> 
void TripleBuffer<T>::update(T newT){ 
    write(newT); // write new value 
    flipWriter(); // change dirty/clean buffer positions for the next update 
} 

template <typename T> 
bool TripleBuffer<T>::isNewWrite(uint_fast8_t flags){ 
    // check if the newWrite bit is 1 
    return ((flags & 0x40) != 0); 
} 

template <typename T> 
uint_fast8_t TripleBuffer<T>::swapSnapWithClean(uint_fast8_t flags){ 
    // swap snap with clean 
    return (flags & 0x30) | ((flags & 0x3) << 2) | ((flags & 0xC) >> 2); 
} 

template <typename T> 
uint_fast8_t TripleBuffer<T>::newWriteSwapCleanWithDirty(uint_fast8_t flags){ 
    // set newWrite bit to 1 and swap clean with dirty 
    return 0x40 | ((flags & 0xC) << 2) | ((flags & 0x30) >> 2) | (flags & 0x3); 
} 

Wie Sie sehen können, habe ich beschlossen, eine Freigabe-Verbrauchen Muster für Speicherordnung zu verwenden . Die Release (memory_order_release) für den Speicher gewährleistet, dass keine Schreibvorgänge im aktuellen Thread nach dem Speicher neu geordnet werden können. Auf der anderen Seite, die Consume gewährleistet, dass keine Lesevorgänge im aktuellen Thread abhängig von dem Wert derzeit geladen werden kann vor diese Last neu geordnet werden. Dies stellt sicher, dass Schreibvorgänge in abhängigen Variablen in anderen Threads, die dieselbe atomare Variable freigeben, im aktuellen Thread sichtbar sind.

Wenn ich mein Verständnis richtig bin, da die Flags nur atomar gesetzt werden müssen, können Operationen auf anderen Variablen, die die Flags nicht direkt beeinflussen, vom Compiler neu geordnet werden, was mehr Optimierungen ermöglicht. Beim Lesen einiger Dokumente über das neue Speichermodell ist mir auch bewusst, dass diese entspannten Atome nur auf Plattformen wie ARM und POWER (die hauptsächlich wegen ihnen eingeführt wurden) spürbare Auswirkungen haben werden. Da ich ARM anvisiere, glaube ich, dass ich von diesen Operationen profitieren und ein bisschen mehr Leistung herauspressen könnte.

Nun zur Frage:

Bin ich für dieses spezielle Problem richtig in der Release-Verbrauchen entspannt Bestellung mit?

Danke,

André

PS: Sorry für den langen Post, aber ich glaubte, dass einige anständige Kontext für eine bessere Sicht des Problems erforderlich war.

EDIT: Implementiert @ Yakk Vorschläge:

  • Fest flags weiter lesen newSnap() und flipWriter() die direkte Zuordnung verwendet haben, damit Standard mit load(std::memory_order_seq_cst).
  • Die Bit-Fiddling-Operationen wurden zur besseren Übersichtlichkeit in dedizierte Funktionen verschoben.
  • bool Rückgabetyp zu newSnap() hinzugefügt, gibt jetzt false zurück, wenn nichts anderes neu und wahr ist.
  • Definierte Klasse als nicht kopierbar mit = delete Idiom seit beide Kopie und Zuordnung Konstruktoren waren unsicher, wenn die TripleBuffer verwendet wurde.

EDIT 2: Fest Beschreibung, die nicht korrekt war (Danke @Useless). Es ist der Consumer, der einen neuen Snap anfordert und aus dem Snap-Index liest (nicht der "Writer"). Entschuldige die Ablenkung und danke an Useless, dass du es aufgezeigt hast.

EDIT 3: die newSnap() und flipriter() Funktionen Optimiertes Vorschläge nach @Display Namen, effektiv redundante load() 2 ‚s pro Schleifenzyklus zu entfernen.

+2

einfach ein kurzen Code-Review, keine Antwort hier: Zum Beispiel würde newSnap() sein 'template TripleBuffer :: TripleBuffer (const TripleBuffer & t) {' doesn‘ In den meisten Kontexten ist es sicher, wenn der 'TripleBuffer' verwendet wird: Ich wäre versucht, ihn wegzulassen. Gleiches mit 'operator ='. 'newSnap' sollte" false "für" nichts zu lesen "zurückgeben. Sie sollten das ganze bisschen herumfiedeln zu einem einzigen Typ, der ein 'uint8_t & 'nimmt und sinnvolle Werte daraus ausgibt. Sie lesen 'flags' in' newSnap' ohne explizite Speicherbestellgarantien. – Yakk

+0

Wow, danke! Sie haben Recht mit den Kopier- und Zuweisungsoperatoren. Sie sind tatsächlich unsicher, wenn der "TripleBuffer" verwendet wird, da Typ-T-Zuordnungen nicht garantiert atomar sind und der Triple-Puffer, von dem kopiert wird, den Zustand zwischen Aufrufen ändern kann. Sie denken, es ist besser, sie zu entfernen und vom Compiler generieren zu lassen? Oder haben Sie eine Idee, wie Sie es sicher machen können? Ich sehe nicht viele _real_ Verwendungen für sie, wenn der TripleBuffer, von dem kopiert wird, verwendet wird. Ich habe sie der Vollständigkeit halber hinzugefügt, aber vergessen, alle möglichen Situationen zu berücksichtigen. –

+0

Über die 'newSnap'-Funktion kann in der Tat True/False zurückgegeben werden und eine nette Ergänzung sein. Es kann signalisieren, dass wir versuchen, zu schnell zu lesen;) Wirklich tolle Beobachtung über die 'flags' in' newSnap' gelesen! Wie es ist, ist es ein Standard 'load (memory_order_seq_cst)'. Es schadet zwar nicht, aber besiegt meinen ursprünglichen Zweck, Release/Consume-Bestellungen zu verwenden :) Vielen Dank! –

Antwort

1

Ja, es ist ein Unterschied zwischen memory_order_acquire und memory_order_consume, aber Sie werden es nicht bemerken, wenn Sie es 180 oder so pro Sekunde verwenden. Sie können meinen Test mit m2 = memory_order_consume ausführen, wenn Sie die Antwort in Zahlen wissen wollen. Ändern Sie einfach producer_or_consumer_Thread etwas wie folgt aus:

TripleBuffer <int> tb; 

void producer_or_consumer_Thread(void *arg) 
{ 
    struct Arg * a = (struct Arg *) arg; 
    bool succeeded = false; 
    int i = 0, k, kold = -1, kcur; 

    while (a->run) 
    { 
     while (a->wait) a->is_waiting = true; // busy wait 
     if (a->producer) 
     { 
      i++; 
      tb.update(i); 
      a->counter[0]++; 
     } 
     else 
     { 
      kcur = tb.snap(); 
      if (kold != -1 && kcur != kold) a->counter[1]++; 
      succeeded = tb0.newSnap(); 
      if (succeeded) 
      { 
       k = tb.readLast(); 
       if (kold == -1) 
        kold = k; 
       else if (kold = k + 1) 
        kold = k; 
       else 
        succeeded = false; 
      } 
      if (succeeded) a->counter[0]++; 
     } 
    } 
    a->is_waiting = true; 
} 

Prüfergebnis:

_#_ __Produced __Consumed _____Total 
    1 39258150 19509292 58767442 
    2 24598892 14730385 39329277 
    3 10615129 10016276 20631405 
    4 10617349 10026637 20643986 
    5 10600334 9976625 20576959 
    6 10624009 10069984 20693993 
    7 10609040 10016174 20625214 
    8 25864915 15136263 41001178 
    9 39847163 19809974 59657137 
10 29981232 16139823 46121055 
11 10555174 9870567 20425741 
12 25975381 15171559 41146940 
13 24311523 14490089 38801612 
14 10512252 9686540 20198792 
15 10520211 9693305 20213516 
16 10523458 9720930 20244388 
17 10576840 9917756 20494596 
18 11048180 9528808 20576988 
19 11500654 9530853 21031507 
20 11264789 9746040 21010829 
2

Warum sind Sie den alten Fahnen Wert zweimal in Ihrem CAS geladen Schleifen? Das erste Mal ist flags.load(), und das zweite durch die compare_exchange_weak(), die der Standard bei CAS-Fehler angibt, lädt den vorherigen Wert in das erste Argument, in diesem Fall ist FlagsNow.

Nach http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchangeAndernfalls lädt den Istwert gespeichert in * dies in erwartet (führt den Lastbetrieb).“ Also, was Ihre Schleife macht, ist, dass bei einem Fehler, compare_exchange_weak() Reloads flagsNow, dann die Schleife wird wiederholt, und die erste Anweisung lädt sie sofort nach dem Laden erneut durch compare_exchange_weak(). Es scheint mir, dass Ihre Schleife stattdessen die Last außerhalb der Schleife gezogen haben sollte.

uint_fast8_t flagsNow(flags.load(std::memory_order_consume)); 
do 
{ 
    if(!isNewWrite(flagsNow)) return false; // nothing new, no need to swap 
} while(!flags.compare_exchange_weak(flagsNow, swapSnapWithClean(flagsNow), memory_order_release, memory_order_consume)); 

und flipWriter():

uint_fast8_t flagsNow(flags.load(std::memory_order_consume)); 
while(!flags.compare_exchange_weak(flagsNow, newWriteSwapCleanWithDirty(flagsNow), memory_order_release, memory_order_consume)); 
+0

Sie haben Recht! Danke für die Optimierung;) Ich werde Ihre Änderungen in der ursprünglichen Frage widerspiegeln und werde Ihre Antwort verbessern, weil Sie konstruktiv beitragen. Sie gehen jedoch nicht auf meine Frage ein und ich kann Ihre Antwort nicht akzeptieren. Prost! –

+0

Oh, ich habe das nur als Antwort hinzugefügt, weil in meinem Beitrag zu viele Zeichen für das Kommentarfeld sind. –