2012-07-26 14 views
9

Ich verwende boost::asio::io_service als Basis-Thread-Pool. Einige Threads werden zu io_service hinzugefügt, der Hauptthread beginnt Handler zu schreiben, die Worker-Threads starten die Handler und alles wird beendet. So weit, ist es gut; Ich bekomme eine schöne Beschleunigung über Singlethread-Code.Einstellung der Post-Queue-Größe mit Boost Asio?

Allerdings hat der Hauptthread Millionen Dinge zu posten. Und es schreibt sie einfach viel schneller, als die Worker-Threads damit umgehen können. Ich treffe RAM-Grenzen nicht, aber es ist immer noch dumm, so viele Dinge in die Warteschlange zu stellen. Ich möchte eine feste Größe für die Handler-Warteschlange und einen post() -Block haben, wenn die Warteschlange voll ist.

Ich sehe keine Optionen für diese in der Boost-ASIO-Dokumentation. Ist das möglich?

Antwort

0

könnten Sie das Strang-Objekt verwenden, um die Ereignisse zu setzen und eine Verzögerung in der Haupt setzen? Fällt Ihr Programm aus, nachdem alle Arbeiten veröffentlicht wurden? In diesem Fall können Sie das Arbeitsobjekt verwenden, das Ihnen mehr Kontrolle darüber gibt, wann Ihr io_service stoppt.

man kann immer Haupt den Zustand der Threads überprüft und hat es, bis man wartet frei wird oder so ähnlich.

// Links

http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/reference/io_service__strand.html

http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/reference/io_service.html

//example from the second link 
boost::asio::io_service io_service; 
boost::asio::io_service::work work(io_service); 

Hoffnung, das hilft.

+0

Das Problem ist nicht, dass das ist 'io_service' stoppt, bevor die Arbeiten erledigt --- wir kennen das' Werk' Objekt über das Löschen der 'io_service' zu ​​stoppen anmutig zu machen. Das Problem ist, dass der 'io_service' zu ​​viele Aufgaben ansammeln lässt. Wir möchten die Anzahl der nicht zugewiesenen Aufgaben auf eine Weise beschränken, die nicht das Abfragen des Threads erfordert, der die Aufgaben erstellt, daher unsere Frage, ob 'poll()' zum Blockieren gebracht werden kann. – uckelman

2

Ich verwende die Semaphore die Handler Größe Warteschlange zu beheben. Der folgende Code veranschaulicht diese Lösung:

void Schedule(boost::function<void()> function) 
{ 
    semaphore.wait(); 
    io_service.post(boost::bind(&TaskWrapper, function)); 
} 

void TaskWrapper(boost::function<void()> &function) 
{ 
    function(); 
    semaphore.post(); 
} 
1

Sie Ihr Lambda in einem anderen Lambda-wickeln können, die dafür sorgen würde, die „in-progress“ Aufgaben zu zählen, und dann warten vor der Veröffentlichung, wenn zu viele in-progress Aufgaben .

Beispiel:

#include <atomic> 
#include <chrono> 
#include <future> 
#include <iostream> 
#include <mutex> 
#include <thread> 
#include <vector> 
#include <boost/asio.hpp> 

class ThreadPool { 
    using asio_worker = std::unique_ptr<boost::asio::io_service::work>; 
    boost::asio::io_service service; 
    asio_worker service_worker; 
    std::vector<std::thread> grp; 
    std::atomic<int> inProgress = 0; 
    std::mutex mtx; 
    std::condition_variable busy; 
public: 
    ThreadPool(int threads) : service(), service_worker(new asio_worker::element_type(service)) { 
    for (int i = 0; i < threads; ++i) { 
     grp.emplace_back([this] { service.run(); }); 
    } 
    } 

    template<typename F> 
    void enqueue(F && f) { 
    std::unique_lock<std::mutex> lock(mtx); 
    // limit queue depth = number of threads 
    while (inProgress >= grp.size()) { 
     busy.wait(lock); 
    } 
    inProgress++; 
    service.post([this, f = std::forward<F>(f)]{ 
     try { 
     f(); 
     } 
     catch (...) { 
     inProgress--; 
     busy.notify_one(); 
     throw; 
     } 
     inProgress--; 
     busy.notify_one(); 
    }); 
    } 

    ~ThreadPool() { 
    service_worker.reset(); 
    for (auto& t : grp) 
     if (t.joinable()) 
     t.join(); 
    service.stop(); 
    } 
}; 

int main() { 
    std::unique_ptr<ThreadPool> pool(new ThreadPool(4)); 
    for (int i = 1; i <= 20; ++i) { 
    pool->enqueue([i] { 
     std::string s("Hello from task "); 
     s += std::to_string(i) + "\n"; 
     std::cout << s; 
     std::this_thread::sleep_for(std::chrono::seconds(1)); 
    }); 
    } 
    std::cout << "All tasks queued.\n"; 
    pool.reset(); // wait for all tasks to complete 
    std::cout << "Done.\n"; 
} 

Ausgang:

Hello from task 3 
Hello from task 4 
Hello from task 2 
Hello from task 1 
Hello from task 5 
Hello from task 7 
Hello from task 6 
Hello from task 8 
Hello from task 9 
Hello from task 10 
Hello from task 11 
Hello from task 12 
Hello from task 13 
Hello from task 14 
Hello from task 15 
Hello from task 16 
Hello from task 17 
Hello from task 18 
All tasks queued. 
Hello from task 19 
Hello from task 20 
Done. 
0

versuchen Vielleicht so die Priorität des Haupt-Thread senken, dass, sobald die Arbeitsthreads beschäftigt bekommen sie den Haupt-Thread und das System selbst Grenzen zu verhungern.