2015-07-26 11 views
5

In this Frage beschrieb ich boost :: asio und boost :: Koroutine Nutzungsmuster, die zufällige Abstürze meiner Anwendung verursacht und ich veröffentlichte Auszug aus meinem Code und valgrind und GDB Ausgang.Was ist los mit diesem Boost :: Asio und Boost :: Coroutine Nutzungsmuster?

Um das Problem weiter zu untersuchen, erstellte ich kleinere Proof of Concept Anwendung, die das gleiche Muster anwendet. Ich habe gesehen, dass das gleiche Problem im kleineren Programm entsteht, dessen Quelle ich hier veröffentliche.

Der Code startet ein paar Threads und erstellt einen Verbindungspool mit einigen Dummy-Verbindungen (vom Benutzer bereitgestellte Nummern). Zusätzliche Argumente sind vorzeichenlose Integer-Zahlen, die die Rolle falscher Anfragen spielen. Die Dummy-Implementierung der sendRequest Funktion startet nur den asynchronen Timer für die Anzahl der Sekunden, die der Eingangsnummer entsprechen, und yileds von der Funktion.

Kann jemand das Problem mit diesem Code sehen und kann er eine Lösung vorschlagen?

#include "asiocoroutineutils.h" 
#include "concurrentqueue.h" 

#include <iostream> 
#include <thread> 

#include <boost/lexical_cast.hpp> 

using namespace std; 
using namespace boost; 
using namespace utils; 

#define id this_thread::get_id() << ": " 

// --------------------------------------------------------------------------- 

/*! 
* \brief This is a fake Connection class 
*/ 
class Connection 
{ 
public: 
    Connection(unsigned connectionId) 
     : _id(connectionId) 
    { 
    } 

    unsigned getId() const 
    { 
     return _id; 
    } 

    void sendRequest(asio::io_service& ioService, 
        unsigned seconds, 
        AsioCoroutineJoinerProxy, 
        asio::yield_context yield) 
    { 
     cout << id << "Connection " << getId() 
      << " Start sending: " << seconds << endl; 

     // waiting on this timer is palceholder for any asynchronous operation 
     asio::steady_timer timer(ioService); 
     timer.expires_from_now(chrono::seconds(seconds)); 
     coroutineAsyncWait(timer, yield); 

     cout << id << "Connection " << getId() 
      << " Received response: " << seconds << endl; 
    } 

private: 
    unsigned _id; 
}; 

typedef std::unique_ptr<Connection> ConnectionPtr; 
typedef std::shared_ptr<asio::steady_timer> TimerPtr; 

// --------------------------------------------------------------------------- 

class ConnectionPool 
{ 
public: 
    ConnectionPool(size_t connectionsCount) 
    { 
     for(size_t i = 0; i < connectionsCount; ++i) 
     { 
      cout << "Creating connection: " << i << endl; 
      _connections.emplace_back(new Connection(i)); 
     } 
    } 

    ConnectionPtr getConnection(TimerPtr timer, 
           asio::yield_context& yield) 
    { 
     lock_guard<mutex> lock(_mutex); 

     while(_connections.empty()) 
     { 
      cout << id << "There is no free connection." << endl; 

      _timers.emplace_back(timer); 
      timer->expires_from_now(
       asio::steady_timer::clock_type::duration::max()); 

      _mutex.unlock(); 
      coroutineAsyncWait(*timer, yield); 
      _mutex.lock(); 

      cout << id << "Connection was freed." << endl; 
     } 

     cout << id << "Getting connection: " 
      << _connections.front()->getId() << endl; 

     ConnectionPtr connection = std::move(_connections.front()); 
     _connections.pop_front(); 
     return connection; 
    } 

    void addConnection(ConnectionPtr connection) 
    { 
     lock_guard<mutex> lock(_mutex); 

     cout << id << "Returning connection " << connection->getId() 
      << " to the pool." << endl; 

     _connections.emplace_back(std::move(connection)); 

     if(_timers.empty()) 
      return; 

     auto timer = _timers.back(); 
     _timers.pop_back(); 
     auto& ioService = timer->get_io_service(); 

     ioService.post([timer]() 
     { 
      cout << id << "Wake up waiting getConnection." << endl; 
      timer->cancel(); 
     }); 
    } 

private: 
    mutex _mutex; 
    deque<ConnectionPtr> _connections; 
    deque<TimerPtr> _timers; 
}; 

typedef unique_ptr<ConnectionPool> ConnectionPoolPtr; 

// --------------------------------------------------------------------------- 

class ScopedConnection 
{ 
public: 
    ScopedConnection(ConnectionPool& pool, 
        asio::io_service& ioService, 
        asio::yield_context& yield) 
     : _pool(pool) 
    { 
     auto timer = make_shared<asio::steady_timer>(ioService); 
     _connection = _pool.getConnection(timer, yield); 
    } 

    Connection& get() 
    { 
     return *_connection; 
    } 

    ~ScopedConnection() 
    { 
     _pool.addConnection(std::move(_connection)); 
    } 

private: 
    ConnectionPool& _pool; 
    ConnectionPtr _connection; 
}; 

// --------------------------------------------------------------------------- 

void sendRequest(asio::io_service& ioService, 
       ConnectionPool& pool, 
       unsigned seconds, 
       asio::yield_context yield) 
{ 
    cout << id << "Constructing request ..." << endl; 

    AsioCoroutineJoiner joiner(ioService); 

    ScopedConnection connection(pool, ioService, yield); 

    asio::spawn(ioService, bind(&Connection::sendRequest, 
           connection.get(), 
           std::ref(ioService), 
           seconds, 
           AsioCoroutineJoinerProxy(joiner), 
           placeholders::_1)); 

    joiner.join(yield); 

    cout << id << "Processing response ..." << endl; 
} 

// --------------------------------------------------------------------------- 

void threadFunc(ConnectionPool& pool, 
       ConcurrentQueue<unsigned>& requests) 
{ 
    try 
    { 
     asio::io_service ioService; 

     while(true) 
     { 
      unsigned request; 
      if(!requests.tryPop(request)) 
       break; 

      cout << id << "Scheduling request: " << request << endl; 

      asio::spawn(ioService, bind(sendRequest, 
             std::ref(ioService), 
             std::ref(pool), 
             request, 
             placeholders::_1)); 
     } 

     ioService.run(); 
    } 
    catch(const std::exception& e) 
    { 
     cerr << id << "Error: " << e.what() << endl; 
    } 
} 

// --------------------------------------------------------------------------- 

int main(int argc, char* argv[]) 
{ 
    if(argc < 3) 
    { 
     cout << "Usage: ./async_request poolSize threadsCount r0 r1 ..." 
      << endl; 
     return -1; 
    } 

    try 
    { 
     auto poolSize = lexical_cast<size_t>(argv[1]); 
     auto threadsCount = lexical_cast<size_t>(argv[2]); 

     ConcurrentQueue<unsigned> requests; 
     for(int i = 3; i < argc; ++i) 
     { 
      auto request = lexical_cast<unsigned>(argv[i]); 
      requests.tryPush(request); 
     } 

     ConnectionPoolPtr pool(new ConnectionPool(poolSize)); 

     vector<unique_ptr<thread>> threads; 
     for(size_t i = 0; i < threadsCount; ++i) 
     { 
      threads.emplace_back(
       new thread(threadFunc, std::ref(*pool), std::ref(requests))); 
     } 

     for_each(threads.begin(), threads.end(), mem_fn(&thread::join)); 
    } 
    catch(const std::exception& e) 
    { 
     cerr << "Error: " << e.what() << endl; 
    } 

    return 0; 
} 

Hier sind einige Helfer Dienstprogramme, die von den obigen Code verwendet:

#pragma once 

#include <boost/asio/steady_timer.hpp> 
#include <boost/asio/spawn.hpp> 

namespace utils 
{ 

inline void coroutineAsyncWait(boost::asio::steady_timer& timer, 
           boost::asio::yield_context& yield) 
{ 
    boost::system::error_code ec; 
    timer.async_wait(yield[ec]); 
    if(ec && ec != boost::asio::error::operation_aborted) 
     throw std::runtime_error(ec.message()); 
} 

class AsioCoroutineJoiner 
{ 
public: 
    explicit AsioCoroutineJoiner(boost::asio::io_service& io) 
     : _timer(io), _count(0) {} 

    void join(boost::asio::yield_context yield) 
    { 
     assert(_count > 0); 
     _timer.expires_from_now(
      boost::asio::steady_timer::clock_type::duration::max()); 
     coroutineAsyncWait(_timer, yield); 
    } 

    void inc() 
    { 
     ++_count; 
    } 

    void dec() 
    { 
     assert(_count > 0); 
     --_count; 
     if(0 == _count) 
      _timer.cancel(); 
    } 

private: 
    boost::asio::steady_timer _timer; 
    std::size_t _count; 

}; // AsioCoroutineJoiner class 

class AsioCoroutineJoinerProxy 
{ 
public: 
    AsioCoroutineJoinerProxy(AsioCoroutineJoiner& joiner) 
     : _joiner(joiner) 
    { 
     _joiner.inc(); 
    } 

    AsioCoroutineJoinerProxy(const AsioCoroutineJoinerProxy& joinerProxy) 
     : _joiner(joinerProxy._joiner) 
    { 
     _joiner.inc(); 
    } 

    ~AsioCoroutineJoinerProxy() 
    { 
     _joiner.dec(); 
    } 

private: 
    AsioCoroutineJoiner& _joiner; 

}; // AsioCoroutineJoinerProxy class 

} // utils namespace 

Zur Vollständigkeit des Codes der letzte fehlende Teil ist ConcurrentQueue Klasse. Es ist zu lang, es hier einzufügen, aber wenn Sie möchten, können Sie es finden here.

Beispiel der Verwendung der Anwendung ist:

./connectionpooltest 3 3 5 7 8 1 0 9 2 4 3 6

wobei die erste Zahl 3 sind gefälschte Verbindungen zählen und die zweite Nummer 3 sind die Anzahl der verwendeten Threads. Zahlen nach ihnen sind gefälschte Anfragen.

Der Ausgang des valgrind und GDB ist die gleiche wie in der oben erwähnten question.

Gebrauchte Version Boost ist 1,57. Der Compiler ist GCC 4.8.3. Das Betriebssystem ist CentOS Linux Release 7.1.1503

+5

'#define id this_thread :: get_id() <<": "Ist es dir ernst? – erenon

+0

mögliches Duplikat von [Was verursacht einen zufälligen Absturz in boost :: coroutine?] (Http: // stackoverflow.com/questions/31610415/Was-Ursachen-eine-Zufalls-Crash-in-BoostCoroutine) – PSIAlt

+0

Aber der Code ist jetzt abgeschlossen (es scheint) @ PSIAlt Ich würde es eine Chance geben, wie diese – sehe

Antwort

1

Es scheint, dass alle valgrind Fehler werden dadurch verursacht, dass die BOOST_USE_VALGRIND Makro definiert sind nicht als Tanner Sansbury Punkte in Kommentar zu this Frage bezogen. Es scheint, dass außer diesem das Programm korrekt ist.