2016-07-20 6 views
1

Ich versuche, dies mit der Unterstützung von C++ 11 Parallelität zu tun.Korrekte Möglichkeit, eine Bedingungsvariable zu warten, die von mehreren Threads benachrichtigt wird

Ich habe eine Art Thread-Pool von Worker-Threads, die alle dasselbe tun, wo ein Master-Thread ein Array von Zustandsvariablen hat (eine für jeden Thread, sie müssen synchronisiert starten, dh nicht voraus laufen Zyklus ihrer Schleife).

for (auto &worker_cond : cond_arr) { 
     worker_cond.notify_one(); 
    } 

dann muss dieser Thread auf eine Benachrichtigung von jedem Thread des Pools warten, um seinen Zyklus erneut zu starten. Was ist der richtige Weg, dies zu tun? Haben Sie eine einzige Bedingungsvariable und warten Sie auf eine Integer-Zahl, wobei jeder Thread, der nicht der Master ist, zunehmen wird? so etwas wie (noch im Master-Thread)

unique_lock<std::mutex> lock(workers_mtx); 
    workers_finished.wait(lock, [&workers] { return workers = cond_arr.size(); }); 
+1

Eine schnelle Frage: Haben Sie ['std :: experimental :: barrier '] (http://en.cppreference.com/w/cpp/experimental/barrier) in Betracht gezogen? –

Antwort

1

Ich sehe zwei Möglichkeiten:

Option 1: join()

Grundsätzlich stattdessen eine Zustandsgröße der Verwendung der Berechnungen in Ihre Threads zu starten, Sie Erstellen Sie einen neuen Thread für jede Iteration und verwenden Sie join(), um darauf zu warten, dass sie beendet wird. Dann spawnt ihr neue Threads für die nächste Iteration und so weiter.

Option 2: sperrt

Sie wollen nicht das Hauptthread, solange einer der Threads zu benachrichtigen, noch funktioniert. So erhält jeder Thread seine eigene Sperre, die er vor den Berechnungen sperrt und danach freischaltet. Ihr Haupt-Thread sperrt alle von ihnen vor dem Aufruf der notify() und entsperrt sie danach.

+0

Für die erste wäre es nicht einfacher zu bedienen. Der zweite funktioniert nicht, weil es passieren kann, dass der Master-Thread auf eine Sperre wartet, während ein anderer Thread eine andere Sperre hat und mehr als einmal durchlaufen kann, was ich nicht möchte! – Aram

+0

Nicht, wenn die Worker-Threads nach jedem Zyklus die Sperre freigeben, dann auf die Bedingungsvariable warten und erst dann, wenn sie benachrichtigt wird, erneut sperren und genau einen Zyklus ausführen. – Anedar

1

Ich sehe nichts grundsätzlich falsch mit Ihrer Lösung.

Schutz workers mit workers_mtx und fertig.

Wir könnten dies mit einem Zähl-Semaphor abstrahieren.

struct counting_semaphore { 
    std::unique_ptr<std::mutex> m=std::make_unique<std::mutex>(); 
    std::ptrdiff_t count = 0; 
    std::unique_ptr<std::condition_variable> cv=std::make_unique<std::condition_variable>(); 

    counting_semaphore(std::ptrdiff_t c=0):count(c) {} 
    counting_semaphore(counting_semaphore&&)=default; 

    void take(std::size_t n = 1) { 
    std::unique_lock<std::mutex> lock(*m); 
    cv->wait(lock, [&]{ if (count-std::ptrdiff_t(n) < 0) return false; count-=n; return true; }); 
    } 
    void give(std::size_t n = 1) { 
    { 
     std::unique_lock<std::mutex> lock(*m); 
     count += n; 
     if (count <= 0) return; 
    } 
    cv->notify_all(); 
    } 
}; 

take nimmt count entfernt und blockiert, wenn es nicht genug ist.

give fügt zu count hinzu und benachrichtigt, wenn es eine positive Menge gibt.

Jetzt fädelt der Arbeiter Fähren Tokens zwischen zwei Semaphoren.

std::vector<counting_semaphore> m_worker_start{count}; 
counting_semaphore m_worker_done{0}; // not count, zero 
std::atomic<bool> m_shutdown = false; 

// master controller: 
for (each step) { 
    for (auto&& starts:m_worker_start) 
    starts.give(); 
    m_worker_done.take(count); 
} 

// master shutdown: 
m_shutdown = true; 
// wake up forever: 
for (auto&& starts:m_worker_start) 
    starts.give(std::size_t(-1)/2); 

// worker thread: 
while (true) { 
    master->m_worker_start[my_id].take(); 
    if (master->m_shutdown) return; 
    // do work 
    master->m_worker_done.give(); 
} 

oder somesuch.

live example.