2016-06-29 9 views
4


ich eine Lock-Frei-Warteschlange implementiert zu implementieren, basierend auf dem Algorithmus angegeben in Maged M. Michael und Michael L. Scott Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms Arbeit (für den Algorithmus, um springen Seite 4)Stack-Überlauf, wenn sie versuchen Lock-freie Warteschlange

I verwendet atomare Operation auf shared_ptr wie std::atomic_load_explicit usw.

wenn nur die Warteschlange in einem Thread verwendet wird, alles in Ordnung ist, aber wenn es von anderen Thread verwendet wird, I eine Stapelüberlaufausnahme erhalten.

Ich konnte die Quelle des Problems leider nicht verfolgen. es scheint, dass, wenn man shared_ptr ist out of scope bekommen, es die Anzahl der Referenzen auf den nächsten ConcurrentQueueNode dekrementiert und es eine unendliche Rekursion verursacht, aber ich kann nicht sehen, warum ..

den Code:

die Warteschlangen-Knoten:

template<class T> 
struct ConcurrentQueueNode { 
    T m_Data; 
    std::shared_ptr<ConcurrentQueueNode> m_Next; 

    template<class ... Args> 
    ConcurrentQueueNode(Args&& ... args) : 
     m_Data(std::forward<Args>(args)...) {} 

    std::shared_ptr<ConcurrentQueueNode>& getNext() { 
     return m_Next; 
    } 

    T getValue() { 
     return std::move(m_Data); 
    } 

}; 

die gleichzeitige Warteschlange (Anmerkung: nicht für schwache Nerven):

template<class T> 
class ConcurrentQueue { 
    std::shared_ptr<ConcurrentQueueNode<T>> m_Head, m_Tail; 

public: 

ConcurrentQueue(){ 
    m_Head = m_Tail = std::make_shared<ConcurrentQueueNode<T>>(); 
} 

template<class ... Args> 
void push(Args&& ... args) { 
    auto node = std::make_shared<ConcurrentQueueNode<T>>(std::forward<Args>(args)...); 
    std::shared_ptr<ConcurrentQueueNode<T>> tail; 

    for (;;) { 
     tail = std::atomic_load_explicit(&m_Tail, std::memory_order_acquire); 
     std::shared_ptr<ConcurrentQueueNode<T>> next = 
      std::atomic_load_explicit(&tail->getNext(),std::memory_order_acquire); 

     if (tail == std::atomic_load_explicit(&m_Tail, std::memory_order_acquire)) { 
      if (next.get() == nullptr) { 
       auto currentNext = std::atomic_load_explicit(&m_Tail, std::memory_order_acquire)->getNext(); 
       auto res = std::atomic_compare_exchange_weak(&tail->getNext(), &next, node); 
       if (res) { 
        break; 
       } 
      } 
      else { 
       std::atomic_compare_exchange_weak(&m_Tail, &tail, next); 
      } 
     } 
    } 

    std::atomic_compare_exchange_strong(&m_Tail, &tail, node); 
} 

bool tryPop(T& dest) { 
    std::shared_ptr<ConcurrentQueueNode<T>> head; 
    for (;;) { 
     head = std::atomic_load_explicit(&m_Head, std::memory_order_acquire); 
     auto tail = std::atomic_load_explicit(&m_Tail,std::memory_order_acquire); 
     auto next = std::atomic_load_explicit(&head->getNext(), std::memory_order_acquire); 

     if (head == std::atomic_load_explicit(&m_Head, std::memory_order_acquire)) { 
      if (head.get() == tail.get()) { 
       if (next.get() == nullptr) { 
        return false; 
       } 
       std::atomic_compare_exchange_weak(&m_Tail, &tail, next); 
      } 
      else { 
       dest = next->getValue(); 
       auto res = std::atomic_compare_exchange_weak(&m_Head, &head, next); 
       if (res) { 
        break; 
       } 
      } 
     } 
    } 

    return true; 
} 
}; 

Beispiel Verwendung, die repro duces das Problem:

int main(){ 
    ConcurrentQueue<int> queue; 
    std::thread threads[4]; 

for (auto& thread : threads) { 
    thread = std::thread([&queue] { 

     for (auto i = 0; i < 100'000; i++) { 
      queue.push(i); 
      int y; 
      queue.tryPop(y); 
     } 
    }); 
} 

for (auto& thread : threads) { 
    thread.join(); 
} 
return 0; 
} 
+2

Ich denke, ich schätze mich glücklich, dass meine Nebenläufigkeitsbedürfnisse immer von Garten-Diversity-Mutexe erfüllt werden, mit wohldefinierter Semantik, und dass ich mich nicht dazu bringen muss, riesige Mengen an Zeit auf wackeligen, verschlungenen zu versenken. " Lock-free "Alternativen, die merkwürdigerweise immer Looping-Spinlocks implementieren, die die Ausführung effektiv blockieren, genau wie ein gewöhnlicher Mutex. –

+0

@SamVarshavchik Ich denke du hast recht, aber es ist immer gut experimentieren –

+1

Wenn Sie etwas, das frei von Sperren ist, implementieren, schlage ich vor, dass Ihre Standardimplementierung für einen Aufruf von 'std :: atomic_is_lock_free (& ptr)' 'wahr 'zurückgibt where 'ptr' ist eine' std :: shared_ptr <> 'Instanz. –

Antwort

2

Das Problem der Race-Bedingung ist, dass zu jedem Knoten in der Warteschlange führen kann warten alle auf einmal freigegeben zu werden - die rekursiv ist und bläst Ihren Stack.

Wenn Sie Ihren Test so ändern, dass nur ein Thread verwendet wird, aber nicht, erhalten Sie jedes Mal den gleichen Stapelüberlauffehler.

for (auto i = 1; i < 100000; i++) { 
    queue.push(i); 
    //int y; 
    //queue.tryPop(y); 
} 

Sie müssen unrecursive-ize die Kette der Knoten zu löschen:

__forceinline ~ConcurrentQueueNode() { 
    if (!m_Next || m_Next.use_count() > 1) 
     return; 
    KillChainOfDeath(); 
} 
void KillChainOfDeath() { 
    auto pThis = this; 
    std::shared_ptr<ConcurrentQueueNode> Next, Prev; 
    while (1) { 
     if (pThis->m_Next.use_count() > 1) 
      break; 
     Next.swap(pThis->m_Next); // unwire node 
     Prev = NULL; // free previous node that we unwired in previous loop 
     if (!(pThis = Next.get())) // move to next node 
      break; 
     Prev.swap(Next); // else Next.swap will free before unwire. 
    } 
} 

ich Shared_ptr nie zuvor verwendet haben, so dass ich weiß nicht, ob es einen schnelleren Weg, dies zu tun. Da ich shared_ptr noch nie benutzt habe, weiß ich nicht, ob Ihr Algorithmus unter ABA-Problemen leidet. Wenn es in der shared_ptr-Implementierung nicht etwas Besonderes gibt, um ABA zu verhindern, mache ich mir Sorgen, dass zuvor freigegebene Knoten wiederverwendet werden könnten, wodurch CAS getäuscht wird. Ich schien dieses Problem jedoch nie zu haben, als ich deinen Code lief.

+0

yep, es ist eine alberne Rekursion, die Stapelüberlauf verursacht .. –

+0

Ich habe gerade festgestellt, dass ich hier meinen vollständigen Testcode nicht eingegeben habe. Woops. – johnnycrash