2014-04-09 16 views
6

Ich habe eine Anwendung, die Boost.Asio für TCP- und UDP-Socket-Kommunikation verwendet. Ich verstehe, dass das "A" in "Asio" für Asynchronous steht, so dass die Bibliothek dazu neigt, Sie zu ermutigen, asynchrone E/A zu verwenden, wenn möglich. Ich habe ein paar Fälle, in denen synchrone Socket-Lesevorgänge vorzuziehen sind. Gleichzeitig möchte ich jedoch eine Zeitüberschreitung für diese empfangenen Anrufe festlegen, so dass es nicht möglich ist, das Lesen unbegrenzt zu blockieren.Kann ich von einem Socket synchron mit Boost.Asio mit einem Timeout für einen Multithread-E/A-Dienst lesen?

Dies scheint ein ziemlich häufiges Problem bei Boost.Asio Benutzern zu sein, mit dem folgenden Vergangenheit Stack-Überlauf Fragen zum Thema:

Dort darf ev en mehr sein. Es gibt sogar examples in the documentation für die Implementierung von synchronen Operationen mit Timeouts. Sie laufen darauf hinaus, die synchrone Operation in eine asynchrone Operation umzuwandeln und dann parallel mit einer asio::deadline_timer zu starten. Der Exspiration-Handler des Timers kann dann den asynchronen Lesevorgang abbrechen, falls das Zeitlimit abgelaufen ist. Das sieht etwa so aus (Ausschnitt aus dem oben verlinkten Beispiel genommen):

std::size_t receive(const boost::asio::mutable_buffer& buffer, 
     boost::posix_time::time_duration timeout, boost::system::error_code& ec) 
    { 
    // Set a deadline for the asynchronous operation. 
    deadline_.expires_from_now(timeout); 

    // Set up the variables that receive the result of the asynchronous 
    // operation. The error code is set to would_block to signal that the 
    // operation is incomplete. Asio guarantees that its asynchronous 
    // operations will never fail with would_block, so any other value in 
    // ec indicates completion. 
    ec = boost::asio::error::would_block; 
    std::size_t length = 0; 

    // Start the asynchronous operation itself. The handle_receive function 
    // used as a callback will update the ec and length variables. 
    socket_.async_receive(boost::asio::buffer(buffer), 
     boost::bind(&client::handle_receive, _1, _2, &ec, &length)); 

    // Block until the asynchronous operation has completed. 
    do io_service_.run_one(); while (ec == boost::asio::error::would_block); 

    return length; 
    } 

Dies ist eigentlich eine relativ saubere Lösung: Starten Sie die asynchronen Operationen, dann manuell die asio::io_service abfragen asynchrone Handler zu einem Zeitpunkt eines auszuführen, bis entweder Die async_receive() wird abgeschlossen oder der Timer läuft ab.

Was ist jedoch mit dem Fall, in dem der zugrunde liegende I/O-Service des Sockets bereits in einem oder mehreren Hintergrundthreads ausgeführt wird? In diesem Fall gibt es keine Garantie dafür, dass die Handler für die asynchronen Operationen vom Vordergrundthread im obigen Snippet ausgeführt werden, sodass run_one() erst zurückgegeben wird, wenn ein späterer, möglicherweise nicht verwandter Handler ausgeführt wird. Dies würde dazu führen, dass das Socket eher nicht mehr reagiert.

asio::io_service hat eine poll_one() Funktion, die die Warteschlange des Dienstes ohne Blockierung überprüfen wird, aber ich sehe keine gute Möglichkeit, den Vordergrund Thread zu blockieren (emuliert das synchrone Aufrufverhalten), bis der Handler ausgeführt wird, außer für den Fall dort sind keine Hintergrundthreads, die bereits asio::io_service::run() ausführen.

Ich sehe zwei mögliche Lösungen, von denen keines Ich mag:

  1. Verwenden einer Bedingungsvariable oder ähnliches Konstrukt in den Vordergrund Gewindesatz zu bilden, nachdem die asynchronen Operationen beginnen. Geben Sie im Handler für den Aufruf async_receive() die Zustandsvariable an, um den Thread zu entsperren. Dies führt zu einer gewissen Sperrung für jeden Lesevorgang, die ich vermeiden möchte, da ich den maximal möglichen Durchsatz bei den UDP-Socket-Lesevorgängen erreichen möchte. Sonst ist es lebensfähig und ist wahrscheinlich das, was ich tun würde, wenn sich keine überlegene Methode ergibt.

  2. Stellen Sie sicher, dass der Socket über eine eigene asio::io_service verfügt, die nicht von Hintergrundthreads ausgeführt wird. Dies macht es schwieriger, asynchrone E/A mit dem Socket in den Fällen zu verwenden, in denen dies erwünscht ist.

Irgendwelche Ideen für andere Möglichkeiten, dies auf eine sichere Weise zu erreichen?


Abgesehen: Es gibt einige Antworten auf frühere SO Fragen, die die SO_RCVTIMEO Socket-Option Anwalt mit dem Socket lesen Timeout zu implementieren. Das hört sich theoretisch gut an, aber auf meiner Plattform scheint es zumindest nicht zu funktionieren (Ubuntu 12.04, Boost v1.55). Ich kann das Socket Timeout setzen, aber es wird nicht den gewünschten Effekt mit Asio geben. Der entsprechende Code ist in /boost/asio/detail/impl/socket_ops.ipp:

size_t sync_recvfrom(socket_type s, state_type state, buf* bufs, 
    size_t count, int flags, socket_addr_type* addr, 
    std::size_t* addrlen, boost::system::error_code& ec) 
{ 
    if (s == invalid_socket) 
    { 
    ec = boost::asio::error::bad_descriptor; 
    return 0; 
    } 

    // Read some data. 
    for (;;) 
    { 
    // Try to complete the operation without blocking. 
    signed_size_type bytes = socket_ops::recvfrom(
     s, bufs, count, flags, addr, addrlen, ec); 

    // Check if operation succeeded. 
    if (bytes >= 0) 
     return bytes; 

    // Operation failed. 
    if ((state & user_set_non_blocking) 
     || (ec != boost::asio::error::would_block 
      && ec != boost::asio::error::try_again)) 
     return 0; 

    // Wait for socket to become ready. 
    if (socket_ops::poll_read(s, 0, ec) < 0) 
     return 0; 
    } 
} 

Wenn ein Socket-mal ausgelesen, würde der Aufruf von recvfrom() oben zurückzukehren EAGAIN oder EWOULDBLOCK, die boost::asio::error::try_again oder boost::asio::error::would_block übersetzen lassen. In diesem Fall rufen Sie den obigen Code die poll_read()-Funktion, die für meine Plattform wie folgt aussieht:

int poll_read(socket_type s, state_type state, boost::system::error_code& ec) 
{ 
    if (s == invalid_socket) 
    { 
    ec = boost::asio::error::bad_descriptor; 
    return socket_error_retval; 
    } 

    pollfd fds; 
    fds.fd = s; 
    fds.events = POLLIN; 
    fds.revents = 0; 
    int timeout = (state & user_set_non_blocking) ? 0 : -1; 
    clear_last_error(); 
    int result = error_wrapper(::poll(&fds, 1, timeout), ec); 
    if (result == 0) 
    ec = (state & user_set_non_blocking) 
     ? boost::asio::error::would_block : boost::system::error_code(); 
    else if (result > 0) 
    ec = boost::system::error_code(); 
    return result; 
} 

ich den Code snipped aus bedingt für andere Plattformen zusammengestellt. Wie Sie sehen, wenn der Socket kein nicht-blockierender Socket ist, ruft er poll() mit einem unendlichen Timeout auf und blockiert daher, bis der Socket Daten zum Lesen hat (und den Versuch eines Timeouts vereitelt). Daher ist die Option SO_RCVTIMEO nicht wirksam.

Antwort

6

Boost.Asios Unterstützung für futures kann eine elegante Lösung bieten. Wenn für eine asynchrone Operation der boost::asio::use_future-Wert als Abschlussbehandlungsroutine angegeben wird, gibt die initiierende Funktion ein std::future-Objekt zurück, das das Ergebnis der Operation erhalten wird. Wenn der Vorgang mit einem Fehler abgeschlossen wird, wird außerdem die error_code in eine system_error konvertiert und über die future an den Aufrufer übergeben.

In der Boost.Asio C++ 11 Futures datytime client example läuft ein dedizierter Thread die io_service, und der Haupt-Thread initiiert asynchrone Operationen wartet dann synchron auf den Abschluß der Operationen, wie unten:

std::array<char, 128> recv_buf; 
udp::endpoint sender_endpoint; 
std::future<std::size_t> recv_length = 
    socket.async_receive_from(
     boost::asio::buffer(recv_buf), 
     sender_endpoint, 
     boost::asio::use_future); 

// Do other things here while the receive completes. 

std::cout.write(
    recv_buf.data(), 
    recv_length.get()); // Blocks until receive is complete. 

Bei der Verwendung von future s ist der allgemeine Ansatz zum Implementieren eines synchronen Lesevorgangs mit einer Zeitüberschreitung derselbe wie zuvor. Anstatt einen synchronen Lesevorgang zu verwenden, würde man einen asynchronen Lesevorgang verwenden und asynchron auf einen Timer warten. Die einzige geringfügige Änderung besteht darin, dass anstelle des Blockierens auf io_service oder des periodischen Überprüfens des Prädikats future::get() aufgerufen wird, um zu blockieren, bis der Vorgang erfolgreich oder fehlgeschlagen ist (z. B. ein Timeout). Wenn C++ 11 nicht verfügbar ist, kann der Rückgabetyp für asynchrone Operationen für Boost.Thread future angepasst werden, wie in this Antwort gezeigt.

+0

Danke für die Einsicht; Mir war nicht bewusst, dass Asio Futures auf diese Weise nutzen könnte. Ohne es zu betrachten, ähnelt die Implementierung wahrscheinlich meinem bestehenden Ansatz, eine 'condition_variable' zu ​​verwenden, um den aufrufenden Thread zu blockieren, bis die Operation abgeschlossen ist, aber es ist schön, etwas etwas mehr Standard zu haben. –

+0

Danke für Ihre Antwort. Dieser Code schlägt für mich fehl [link] (https://stackoverflow.com/questions/46166923/). – ar2015