2011-01-06 24 views
4

Ich portiere eine Anwendung, die auf dem ACE Proactor Framework aufbaut. Die Anwendung läuft perfekt sowohl für VxWorks als auch für Windows, aber nicht für Linux (CentOS 5.5, WindRiver Linux 1.4 & 3.0) mit Kernel 2.6.X.X - mit librt.Simultaner Socket Lesen/Schreiben ("Vollduplex") in Linux (speziell)

Ich habe das Problem auf ein sehr grundlegendes Problem eingegrenzt: Die Anwendung beginnt eine asynchrone (über aio_read) Lesevorgang auf einem Socket und beginnt anschließend eine asynchrone (über aio_write) Schreiben auf dem gleichen Sockel. Die Leseoperation kann noch nicht erfüllt werden, da das Protokoll vom Ende der Anwendung an initialisiert wird. - Wenn der Socket im Blocking-Modus ist, wird der Schreibvorgang nie erreicht und das Protokoll "hängt". - Wenn ein O_NONBLOCK-Socket verwendet wird, ist der Schreibvorgang erfolgreich, aber der Lesevorgang kehrt mit einem Fehler "EWOULDBLOCK/EAGAIN" auf unbestimmte Zeit zurück und wird nie wiederhergestellt (auch wenn die AIO-Operation neu gestartet wird).

Ich ging durch mehrere Foren und konnte keine definitive Antwort finden, ob das mit Linux AIO funktionieren sollte (und ich mache etwas falsch) oder unmöglich. Ist es möglich, wenn ich die AIO ablege und eine andere Implementierung suche (über epoll/poll/select etc.)?

Beigefügt ist ein Beispielcode, um schnell das Problem auf einem nicht blockierenden Socket wieder herzustellen:

#include <aio.h> 
#include <stdio.h> 
#include <stdlib.h> 
#include <netdb.h> 
#include <string.h> 
#include <netinet/in.h> 
#include <sys/socket.h> 
#include <sys/types.h> 
#include <assert.h> 
#include <errno.h> 

#define BUFSIZE (100) 

// Global variables 
struct aiocb *cblist[2]; 
int theSocket; 

void InitializeAiocbData(struct aiocb* pAiocb, char* pBuffer) 
{ 
    bzero((char *)pAiocb, sizeof(struct aiocb)); 

    pAiocb->aio_fildes = theSocket; 
    pAiocb->aio_nbytes = BUFSIZE; 
    pAiocb->aio_offset = 0; 
    pAiocb->aio_buf = pBuffer; 
} 

void IssueReadOperation(struct aiocb* pAiocb, char* pBuffer) 
{ 
    InitializeAiocbData(pAiocb, pBuffer); 

    int ret = aio_read(pAiocb); 
    assert (ret >= 0); 
} 

void IssueWriteOperation(struct aiocb* pAiocb, char* pBuffer) 
{ 
    InitializeAiocbData(pAiocb, pBuffer); 

    int ret = aio_write(pAiocb); 
    assert (ret >= 0); 
} 

int main() 
{ 
    int ret; 
    int nPort = 11111; 
    char* szServer = "10.10.9.123"; 

    // Connect to the remote server 
    theSocket = socket(AF_INET, SOCK_STREAM, 0); 
    assert (theSocket >= 0); 

    struct hostent *pServer; 
    struct sockaddr_in serv_addr; 
    pServer = gethostbyname(szServer); 

    bzero((char *) &serv_addr, sizeof(serv_addr)); 
    serv_addr.sin_family = AF_INET; 
    serv_addr.sin_port = htons(nPort); 
    bcopy((char *)pServer->h_addr, (char *)&serv_addr.sin_addr.s_addr, pServer->h_length); 

    assert (connect(theSocket, (const sockaddr*)(&serv_addr), sizeof(serv_addr)) >= 0); 

    // Set the socket to be non-blocking 
    int oldFlags = fcntl(theSocket, F_GETFL) ; 
    int newFlags = oldFlags | O_NONBLOCK; 

    fcntl(theSocket, F_SETFL, newFlags); 
    printf("Socket flags: before=%o, after=%o\n", oldFlags, newFlags); 

    // Construct the AIO callbacks array 
    struct aiocb my_aiocb1, my_aiocb2; 
    char* pBuffer = new char[BUFSIZE+1]; 

    bzero((char *)cblist, sizeof(cblist)); 
    cblist[0] = &my_aiocb1; 
    cblist[1] = &my_aiocb2; 

    // Start the read and write operations on the same socket 
    IssueReadOperation(&my_aiocb1, pBuffer); 
    IssueWriteOperation(&my_aiocb2, pBuffer); 

    // Wait for I/O completion on both operations 
    int nRound = 1; 
    printf("\naio_suspend round #%d:\n", nRound++); 
    ret = aio_suspend(cblist, 2, NULL); 
    assert (ret == 0); 

    // Check the error status for the read and write operations 
    ret = aio_error(&my_aiocb1); 
    assert (ret == EWOULDBLOCK); 

    // Get the return code for the read 
    { 
     ssize_t retcode = aio_return(&my_aiocb1); 
     printf("First read operation results: aio_error=%d, aio_return=%d - That's the first EWOULDBLOCK\n", ret, retcode); 
    } 

    ret = aio_error(&my_aiocb2); 
    assert (ret == EINPROGRESS); 
    printf("Write operation is still \"in progress\"\n"); 

    // Re-issue the read operation 
    IssueReadOperation(&my_aiocb1, pBuffer); 

    // Wait for I/O completion on both operations 
    printf("\naio_suspend round #%d:\n", nRound++); 
    ret = aio_suspend(cblist, 2, NULL); 
    assert (ret == 0); 

    // Check the error status for the read and write operations for the second time 
    ret = aio_error(&my_aiocb1); 
    assert (ret == EINPROGRESS); 
    printf("Second read operation request is suddenly marked as \"in progress\"\n"); 

    ret = aio_error(&my_aiocb2); 
    assert (ret == 0); 

    // Get the return code for the write 
    { 
     ssize_t retcode = aio_return(&my_aiocb2); 
     printf("Write operation has completed with results: aio_error=%d, aio_return=%d\n", ret, retcode); 
    } 

    // Now try waiting for the read operation to complete - it'll just busy-wait, receiving "EWOULDBLOCK" indefinitely 
    do 
    { 
     printf("\naio_suspend round #%d:\n", nRound++); 
     ret = aio_suspend(cblist, 1, NULL); 
     assert (ret == 0); 

     // Check the error of the read operation and re-issue if needed 
     ret = aio_error(&my_aiocb1); 
     if (ret == EWOULDBLOCK) 
     { 
      IssueReadOperation(&my_aiocb1, pBuffer); 
      printf("EWOULDBLOCK again on the read operation!\n"); 
     } 
    } 
    while (ret == EWOULDBLOCK); 
} 

Vielen Dank im Voraus, Yotam.

+0

Versuchen Sie stattdessen die Mailingliste 'ace-users @ list.isis.vanderbilt.edu': http://www.cs.wustl.edu/~schmidt/ACE-mail.html –

Antwort

3

Erstens, O_NONBLOCK und AIO nicht vermischen. AIO wird die asynchrone Operation abgeschlossen berichten, wenn die entsprechenden read oder write nicht blockiert haben - und mit O_NONBLOCK, sie würden nie Block, so dass die aio Anfrage wird immer vollständig ab sofort (mit aio_return() geben EWOULDBLOCK).

Zweitens, verwenden Sie nicht den gleichen Puffer für zwei gleichzeitig ausstehende AIO-Anforderungen. Der Puffer sollte zwischen dem Zeitpunkt, zu dem die AIO-Anforderung ausgegeben wurde, und dem Zeitpunkt, zu dem die Meldung aio_error() Ihnen mitteilt, dass er vollständig ist, als vollständig verboten betrachtet werden.

Drittens werden AIO-Anforderungen an den gleichen Dateideskriptor in die Warteschlange gestellt, um sinnvolle Ergebnisse zu erhalten. Dies bedeutet, dass Ihr Schreibvorgang erst nach Abschluss des Lesevorgangs ausgeführt wird. Wenn Sie die Daten zuerst schreiben müssen, müssen Sie die AIOs in umgekehrter Reihenfolge ausgeben. Im Folgenden wird funktionieren, ohne Einstellung O_NONBLOCK:

struct aiocb my_aiocb1, my_aiocb2; 
char pBuffer1[BUFSIZE+1], pBuffer2[BUFSIZE+1] = "Some test message"; 

const struct aiocb *cblist[2] = { &my_aiocb1, &my_aiocb2 }; 

// Start the read and write operations on the same socket 
IssueWriteOperation(&my_aiocb2, pBuffer2); 
IssueReadOperation(&my_aiocb1, pBuffer1); 

// Wait for I/O completion on both operations 
int nRound = 1; 
int aio_status1, aio_status2; 
do { 
    printf("\naio_suspend round #%d:\n", nRound++); 
    ret = aio_suspend(cblist, 2, NULL); 
    assert (ret == 0); 

    // Check the error status for the read and write operations 
    aio_status1 = aio_error(&my_aiocb1); 
    if (aio_status1 == EINPROGRESS) 
     puts("aio1 still in progress."); 
    else 
     puts("aio1 completed."); 

    aio_status2 = aio_error(&my_aiocb2); 

    if (aio_status2 == EINPROGRESS) 
     puts("aio2 still in progress."); 
    else 
     puts("aio2 completed."); 
} while (aio_status1 == EINPROGRESS || aio_status2 == EINPROGRESS); 

// Get the return code for the read 
ssize_t retcode; 
retcode = aio_return(&my_aiocb1); 
printf("First operation results: aio_error=%d, aio_return=%d\n", aio_status1, retcode); 

retcode = aio_return(&my_aiocb1); 
printf("Second operation results: aio_error=%d, aio_return=%d\n", aio_status1, retcode); 

Alternativ, wenn Sie kümmern sich nicht um liest und schreibt mit Bezug zueinander geordnet werden, damit sie dup() verwenden können zwei Dateideskriptoren für die schaffen Socket, und verwenden Sie einen zum Lesen und den anderen zum Schreiben - jeder wird seine AIO-Operationen separat in die Warteschlange gestellt.

+1

Haben Sie eine Referenz für" AIO-Anfragen an dasselbe fd werden in der Reihenfolge "? Wenn das der Fall ist, wäre es sicherlich eine bessere Lösung, den Griff zu duplizieren. –

+1

@Ben Voigt: Das ist eine andere Alternative - verwenden Sie 'dup()', um zwei Dateideskriptoren für den Socket zu erstellen, und verwenden Sie einen zum Lesen und den anderen zum Schreiben. – caf

+0

Zuerst habe ich nur die Kommentare gesehen und angefangen, mit ihnen zu experimentieren - aber es sieht so aus, als könnte das dup() dingy den Trick machen. Es scheint das Problem zu beheben, aber ich muss noch einige Stresstests durchführen. Es ist sehr wahrscheinlich, dass ich euch ein Bier schulde!Zweitens, ich weiß, dass ich im folgenden Beispiel einen nicht blockierenden Sockel und den gleichen Puffer verwendet habe - es ist nur ein Dummy-Testprogramm. Dennoch, das Lesen im Hintergrund, bevor der Schreibvorgang ausgelöst wird, ist etwas, das von unserer speziellen Anwendung vorgeschrieben ist - also gibt es keine Änderung ... – Yotam