2016-06-24 13 views
-1

Ich entwickle einen Prototyp eines Algorithmus, der auf einer Gruppe von Knoten funktioniert, wobei jeder Knoten eine Verbindung zu allen anderen Knoten unterhält und Nachrichten sendet.Segmentierungsfehler in einem Multithread-Client/Server-Prototyp

Um eine Nachricht zu senden, sendet der Knoten zuerst Header mit fester Größe und dann die Daten.

Nach einer Menge Arbeit, habe ich festgestellt, dass das Problem mit dem Multi-Thread-Programmierung Teil des Codes ist. Daher habe ich diesen Code als PoC erstellt.

Dieser Prototyp wurde entwickelt, um einen Server mit mehreren Clients einer während der Kompilierung definierten Nummer zu haben.

Ein Server ist zuständig für das Abhören einiger Clients in einem separaten Thread. Da dies nur ein Prototyp ist, lassen wir die Daten fallen.

Jeder Client sendet Daten in zwei Schritten in den Server: Header und Körper mit send_message.

Übrigens sollte dieser Algorithmus Daten über eine bestimmte Bandbreite Benchmark von jedem Client zum Server produzieren. Standardmäßig sendet jeder Client 100 MB/s-Daten an den Server.

Der Code umfasst:

client.cpp:

#include "network.h" 

int main(int argc, char *argv[]){ 

    int sockfd; 

    std::cout << "HEADER: " << HEADER << std::endl; 

    // Read the server's IP 
    struct hostent *server = gethostbyname(argv[1]); 

    // Read the arguments from console 
    get_client_arguments (argc, argv); 

    // Connect to the server 
    sockfd = connect_to_server(server); 

    sleep (1); 

    // Start sending (Start the experiment) 
    multi_unicaster (sockfd); 

    close (sockfd); 

    return 0; 
} 

/**********************************************/ 

void usage (char *argv){ 

    std::cout << "usage: " << argv << " hostname [-p port] [-t throughput]" << std::endl; 
    exit(0); 
} 

/**********************************************/ 

server.cpp:

#include "network.h" 

/**********************************************/ 

int main(int argc, char *argv[]){ 

    std::cout << "HEADER: " << HEADER << std::endl; 
    get_server_arguments(argc, argv); 

    // Create several threads to listen for incoming connections 
    // and read data from several clients simultaneously 
    start_listening_threads(); 

    return 0; 
} 

/**********************************************/ 

void usage (char *argv){ 

    std::cout << "usage: " << argv << " [-p port]" << std::endl; 
    exit(0); 
} 

/**********************************************/ 

message.h:

#ifndef __MESSAGE__ 
#define __MESSAGE__ 

#include <vector> 
#include <queue> 
#include <string> 
#include <cstring> 
#include <cstdio> 
#include <algorithm> 
#include <stdexcept> 
#include <iostream> 
#include <cstdlib> 
#include <sys/socket.h> 
#include <netinet/in.h> 
#include <arpa/inet.h> 
#include <netdb.h> 
#include "func.h" 

/***************************************/ 

// The header structure 
typedef struct { 

    // Message ID 
    unsigned mID; 

    // IP of sender 
    struct in_addr sender; 

    // Message sie 
    size_t datasize; 

}header_type; 

/***************************************/ 

#define HEADER sizeof(header_type) 

/***************************************/ 

/* 
* Message Class 
*/ 
class message 
{ 
    private: 
     // Message header 
     header_type * header; 

     // Message text 
     byte * text; 

    public: 

     // Message Accessors, mutators and related functions 
     byte * get_text(); 
     header_type * get_header(); 
     void set_datasize(size_t); 
     size_t get_datasize(); 
     struct in_addr get_sender(); 
     void set_ID(unsigned); 
     unsigned get_ID(); 
     void print(); 

     message(int,struct in_addr,size_t); 
     message(header_type *, size_t); 
     ~message(); 

     message & operator = (const message&); 
     message(const message&); 
}; 
#endif 


/***************************************/ 

extern std::queue <message * > sending_messages_queue; 

/***************************************/ 
/* 
* Constructor used for initializing complete messages 
* 
*/ 

message::message(int ID,struct in_addr IP,size_t d_s){ 

    header = (header_type *) malloc (HEADER); 

    header -> mID = ID; 
    header -> datasize = d_s; 
    header -> sender.s_addr = IP.s_addr; 

    if (d_s > 0){ 
     text = (byte *) malloc (d_s); 
     memset (text, '.', d_s); 
    } 
    else text = NULL; 
} 

/***************************************/ 
/* 
* Copy constructor (Initialize) 
* 
*/ 

message::message(const message& other){ 

    header = (header_type *) malloc (HEADER); 

    std::memcpy (header, other.header, HEADER); 

    if (header -> datasize > 0){ 
     text = (byte *) malloc (header -> datasize); 
     std::memcpy (text, other.text, header -> datasize); 
    } else 
     text = NULL; 
} 

/***************************************/ 
/* 
* destructor 
* 
* Message destructor 
*/ 

message::~message() { 
    if (text != NULL){ 
     free(text); 
     text = NULL; 
    } 
} 

/***************************************/ 
/* 
* Assignment operator (Update) 
* 
*/ 

message & message::operator = (const message& other) { 

    header = (header_type *) malloc (HEADER); 

    std::memcpy (header, other.header, HEADER); 

    if (header -> datasize >0){ 
     text = (byte *) malloc (header -> datasize); 
     std::memcpy (text, other.text, header -> datasize); 
    } else 
     text = NULL; 

    return *this; 
} 

/***************************************/ 

/* 
* another constructor 
* 
*/ 

message::message(header_type *h, size_t s){ 

    header = (header_type *) malloc (HEADER); 

    std::memcpy (header, h, HEADER); 

    if (s > 0){ 
     text = (byte *) malloc (s); 
     std::memset (text, '.', s); 
    } else 
     text = NULL; 
} 

/***************************************/ 

/* 
* get_header 
* 
* Header accessor 
*/ 

header_type * message::get_header(){ 
    return header; 
} 

/***************************************/ 

/* 
* get_text 
* 
* Text accessor 
*/ 

byte * message::get_text(){ 
    return text; 
} 

/***************************************/ 

/* 
* get_sender 
* 
* Sender IP accessor 
*/ 

struct in_addr message::get_sender(){ 
    return header -> sender; 
} 

/***************************************/ 

/* 
* get_datasize 
* 
* datasize accessor 
*/ 

size_t message::get_datasize(){ 
    return header -> datasize; 
} 

/***************************************/ 

/* 
* set_ID 
* 
* ID mutator 
*/ 
void message::set_ID(unsigned ID) { 
    header -> mID = ID; 
} 

/***************************************/ 

/* 
* get_ID 
* 
* ID Accessor 
*/ 
unsigned message::get_ID() { 
    return header -> mID; 
} 

/***************************************/ 

/* 
* set_datasize 
* 
* datasize mutator 
*/ 
void message::set_datasize(size_t d) { 
    header -> datasize = d; 
} 

/***************************************/ 

/* 
* print 
* 
*/ 
void message::print() { 
    std::cout << header -> mID << "," << inet_ntoa (header -> sender) << "," << header -> datasize; 

    std::cout << std::endl; 
} 

/***************************************/ 

func.h:

// Some support functions 

using namespace std; 

/***********************************************************************/ 

#include <stdio.h> 
#include <sys/types.h> 
#include <cstdlib> 
#include <sys/socket.h> 
#include <netinet/in.h> 
#include <netdb.h> 
#include <unistd.h> 
#include <strings.h> 
#include <cstring> 
#include <iostream> 
#include <string> 
#include <sys/time.h> 
#include <math.h> 

/***********************************************************************/ 

typedef uint8_t byte; 
const unsigned long kilo = 1024; 
const unsigned long mega = 1024 * kilo; 
const unsigned long giga = 1024 * mega; 
const unsigned MESSAGE_SIZE = 10 * kilo; 

/***********************************************************************/ 

int port = 4444; 
int throughput = 100; 
int newsockfd [CLIENTS]; 

/***********************************************************************/ 

void usage (char *argv); 

/***********************************************************************/ 

/* 
* 
* subtract_time 
* 
* Subtracts time to handle negative values 
* 
*/ 

struct timeval subtract_time (struct timeval * left_operand, struct timeval * right_operand){ 

    struct timeval res; 

    if (left_operand -> tv_sec >= right_operand -> tv_sec){ 
     if (left_operand -> tv_usec >= right_operand -> tv_usec){ 

      res.tv_sec = left_operand -> tv_sec - right_operand -> tv_sec; 
      res.tv_usec = left_operand -> tv_usec - right_operand -> tv_usec; 
     }else{ 
      res.tv_sec = left_operand -> tv_sec - right_operand -> tv_sec - 1; 
      res.tv_usec = 1000000 + left_operand -> tv_usec - right_operand -> tv_usec; 
     } 
    } 

    return res; 
} 

/***********************************************************************/ 

void get_server_arguments (int argc, char *argv[]){ 

    int i = 1; 
    while (i < argc){ 

     if (strcmp (argv [i], "-p") ==0){ 

      port = atoi (argv [i + 1]); 
      i+= 2; 
     } 
     else usage (argv [0]); 
    } 
} 

/***********************************************************************/ 

void get_client_arguments (int argc, char *argv[]){ 

    int i = 2; 

    while (i < argc){ 

     if (strcmp (argv [i], "-p") ==0){ 

      port = atoi (argv [i + 1]); 
      i+= 2; 
     } 

     else if (strcmp (argv [i], "-t") ==0){ 

      throughput = atoi (argv [i + 1]); 
      i+= 2; 
     } 
     else usage (argv [0]); 
    } 
} 

/***********************************************************************/ 

void print_bandwidth(unsigned long long sz){ 

    double size; 

    if (sz > giga){ 

     // Round result and show two decimal values 
     size = round (sz/(giga /1000)); 
     std::cout << size /1000 << " Gb/s"<< std::endl; 
    } 
    else if (sz > mega){ 

     // Round result and show two decimal values 
     size = round (sz/(mega /100)); 
     std::cout << size /100 << " Mb/s"<< std::endl; 
    } 
    else if (sz > kilo){ 

     // Round result and show one decimal value 
     size = round (sz /(kilo /10)); 
     std::cout << size /10 << " Kb/s"<< std::endl; 
    } 
    else{ 
     std::cout << sz << " b/s"<< std::endl; 
    } 
} 

/***********************************************************************/ 

network.h:

// Network related functions 

#include "message.h" 
#include <netinet/tcp.h> 
#include <arpa/inet.h> 

/***********************************************************************/ 

void read_message (int); 
int accept_connection (int); 
void * listening (void *); 

/***********************************************************************/ 

unsigned burst_size; 
bool NAGLE = false; 
struct sockaddr_in serv_addr; 
struct timeval recent_elapsed_time_val {0,0}; 
struct timeval start_tv; 
int initial_listening_socket; 
unsigned connections = 0; 

/***********************************************************************/ 

void listen_for_connections(){ 

    // Server: Listens for connections from clients 
    struct sockaddr_in serv_addr; 

    initial_listening_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); 
    if (initial_listening_socket < 0) 
     std::cerr << "ERROR opening socket"; 
    bzero((char *) &serv_addr, sizeof(serv_addr)); 
    serv_addr.sin_family = AF_INET; 
    serv_addr.sin_port = htons(port); 
    serv_addr.sin_addr.s_addr = INADDR_ANY; 

    if (bind(initial_listening_socket, (struct sockaddr *) &serv_addr,sizeof(serv_addr)) < 0) 
     std::cerr << "ERROR on binding"<<std::endl; 

    listen(initial_listening_socket,CLIENTS); 
} 

/***********************************************************************/ 

int accept_connection(){ 

    // Server: Accepts connections from client 
    int newsockfd; 
    socklen_t clilen; 
    struct sockaddr_in cli_addr; 

    clilen = sizeof(cli_addr); 

    std::cout << "waiting for new connection .." << std::endl; 
    newsockfd = accept(initial_listening_socket, (struct sockaddr *) &cli_addr, &clilen); 

    std::cout << "received new connection .." << std::endl; 

    connections ++; 

    if (connections == CLIENTS) 
     close(initial_listening_socket); 

    return newsockfd; 
} 

/***********************************************************************/ 

int connect_to_server(struct hostent *server){ 

    // Client: Connects to server 
    int sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); 
    if (sockfd < 0) 
     std::cerr << "ERROR opening socket"; 

    if (server == NULL){ 
     std::cerr << stderr << "ERROR, no such host"<< std::endl; 
     exit(0); 
    } 

    int flag; 
    if (NAGLE) flag = 0; 
    else flag = 1; 

    if (setsockopt (sockfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)) ==-1){ 
     perror ("ERROR on setting TCP_NODELAY!"); 
     std::terminate(); 
    } 

    bzero((char *) &serv_addr, sizeof(serv_addr)); 
    serv_addr.sin_family = AF_INET; 
    bcopy((char *)server->h_addr,(char *)&serv_addr.sin_addr.s_addr,server->h_length); 
    serv_addr.sin_port = htons(port); 

    if (connect(sockfd,(struct sockaddr *)&serv_addr,sizeof(serv_addr)) < 0) 
     std::cerr <<"ERROR connecting"<< std::endl; 

    return sockfd; 
} 


/***********************************************************************/ 

void start_listening_threads(){ 

    // Server: Creates listening threads 
    pthread_t listening_thread [CLIENTS]; 

    listen_for_connections(); 

    for(unsigned i=0;i< CLIENTS;i++){ 

     unsigned * arg = (unsigned *) malloc(sizeof(*arg)); 

     if (arg == NULL) { 
      fprintf(stderr, "Couldn't allocate memory for thread arg.\n"); 
      exit(EXIT_FAILURE); 
     } 

     *arg = i; 

     pthread_create(&listening_thread[i], NULL,(void* (*)(void*))&listening, arg); 
    } 

    for(unsigned i=0;i< CLIENTS;i++){ 

     pthread_join (listening_thread[i], NULL); 
    } 
} 

/***********************************************************************/ 

void * listening (void *a){ 

    // Server: Start listening after establishing a connection with the client 
    int i = *((int *) a); 

    newsockfd [i] = accept_connection(); 

    while (1){ 

     read_message (newsockfd [i]); 
    } 

    return NULL; 
} 

/***********************************************************************/ 

void measure_throughput(unsigned counter){ 

    // Client: Tracks throughput and keeps on the wanted threshold 
    struct timeval current_time; 

    // Get the current time in order to track the throughput 
    gettimeofday (&current_time, NULL); 
    struct timeval elapsed_time_val = subtract_time (&current_time, &start_tv); 

    double elapsed = elapsed_time_val.tv_sec+ (elapsed_time_val.tv_usec/1000000.0); 

    unsigned long long sent_bytes = counter * (MESSAGE_SIZE + HEADER); 
    if (elapsed > 0){ 

     // Calculate the expected time to send sent_bytes 
     double theoretical_time = (sent_bytes)/((throughput * mega)/8.0); 

     // Compare the expected time with the real elapsed time 
     if (theoretical_time > elapsed){ 
      __useconds_t additional_time = (theoretical_time - elapsed) * 1000000; 
      usleep (additional_time); 
     } 
    } 

    if (elapsed_time_val.tv_sec > recent_elapsed_time_val.tv_sec){ 
     unsigned sending_throughput = (unsigned)((sent_bytes * 8)/(mega * elapsed * 1.0)); 
     std::cout << "throughput: " << sending_throughput << std::endl; 
     recent_elapsed_time_val = elapsed_time_val; 
    } 
} 

/***********************************************************************/ 

void send_message (message * m, int sockfd){ 

    // Client: Send on message header then data. 
    if (write (sockfd, m -> get_header(), HEADER) == -1){ 

     perror ("Error exporting Header to socket"); 
     close (sockfd); 
     exit (1); 
    } 

    if (write (sockfd, m -> get_text(), MESSAGE_SIZE) == -1){ 

     perror ("Error exporting Header to socket"); 
     close (sockfd); 
     exit (1); 
    } 
} 

/***********************************************************************/ 

void read_message (int sockfd){ 

    // Server: Listens for one message header then text. 
    int receivedPackage = 0; 
    int pos = 0; 
    int expected_bytes = HEADER; 
    header_type header; 

    while (expected_bytes >0){ 

     if ((receivedPackage = read(sockfd, &header + pos, expected_bytes)) < 0){ 
      perror ("ERROR importing message header from socket!"); 
      std::terminate(); 
     } 
     pos += receivedPackage; 
     expected_bytes -= receivedPackage; 
    } 

    if (header.datasize != MESSAGE_SIZE){ 
     message * m = new message (&header, (size_t)0); 
     m-> print(); 
    } 

    pos = 0; 
    receivedPackage = 0; 
    expected_bytes = MESSAGE_SIZE; 
    byte text [MESSAGE_SIZE]; 

    while (expected_bytes >0){ 

     if ((receivedPackage = read(sockfd, text + pos, expected_bytes)) < 0){ 
      perror ("ERROR importing message header from socket!"); 
      std::terminate(); 
     } 
     pos += receivedPackage; 
     expected_bytes -= receivedPackage; 
    } 
} 

/***********************************************************************/ 

void multi_unicaster (int sockfd){ 

    unsigned counter=0; 

    gettimeofday (&start_tv, NULL); 

    while (1){ 

     counter ++; 

     struct in_addr IP; 

     inet_aton ("127.0.0.1",&IP); 

     message * m = new message (counter, IP, MESSAGE_SIZE); 

     if (m -> get_datasize() != MESSAGE_SIZE) 
      m-> print(); 

     send_message (m,sockfd); 

     delete m; 

     measure_throughput(counter); 
    } 
} 

/***********************************************************************/ 

Makefile:

all: server client 

FLAGS=-Wall -Wextra -Werror -pedantic -pthread $(ARGS) -std=c++11 -g -rdynamic -lpthread 
CXXFLAGS=$(DEF) $(FLAGS) 

output/%.o: %.cpp 
    g++ $(CXXFLAGS) -c -o [email protected] $< 

client: output/client.o 
    g++ $(FLAGS) -o [email protected] $^ 

server: output/server.o 
    g++ $(FLAGS) -o [email protected] $^ 

clean: 
    rm -rf output/* *~ server client 

Wenn auf dem Loopback den Code ausgeführt wird, geht alles gut, aber wenn es auf auseinander Servern zu testen (in der Tat, abgesehen Rechenzentren) Manchmal funktioniert es ohne Probleme und andere nicht.

Wenn die empfangenen Daten richtig sind, sollte der empfangene Header richtig sein. Um zu verifizieren, dass die empfangenen Daten nicht fehlerhaft sind, sollte die in der Kopfzeile empfangene datasize korrekt sein (d. H. 10 * Kilo), ansonsten sind die Daten durcheinander.

Diese Überprüfung wird in read_message Funktion in network.h zur Verfügung gestellt, wo ich denke, ist das Problem.

Ich habe all diesen Code zur Verfügung gestellt, sollte jemand es testen müssen.

+0

Was ist das Problem? Was geschieht? Was passiert nicht? Was muss passieren? Wir können Ihre Gedanken nicht lesen, und es ist schwierig, ein Problem zu finden, wenn wir nicht wissen, wonach wir suchen ... – DrDonut

+1

Ich hoffe, Ihr Server akzeptiert nicht mehr Clients als die CPU-Kerne, die er hat ... alles läuft Diese Threads verlangsamen Sie aufgrund exzessiver Kontextwechsel ... Warum nicht vorhandene Bibliotheken verwenden? – Myst

+2

@ Mohamad-jaafar Gibt es eine Möglichkeit, den Code zu verkleinern (https://stackoverflow.com/help/mcve)? Es scheint nicht so klein zu sein, wie es sein könnte, daher kann es für einige Leser schwierig sein, das Problem zu verstehen und eine gute Antwort auf Ihre Frage zu finden. –

Antwort

2

Ich poste eine Antwort, die ich in den Kommentaren gegeben habe, falls es jemandem helfen sollte, ein ähnliches Problem zu erleben.

Die Header-Typ-Nachricht im Code wie folgt aussieht:

typedef struct { 
    // Message ID 
    unsigned mID; 
    // IP of sender 
    struct in_addr sender; 
    // Message size 
    size_t datasize; 
} header_type; 

Diese Header Typ Nachricht ist nicht tragbar und Architektur abhängig.

Auf einigen Architekturen ohne Vorzeichen und size_t möglicherweise 32 Bit haben, auf andere sie 64bits oder 16bits haben könnte ...

Auch struct in_addr ist die Umsetzung spezifisch, so dass die Nachrichten-Header anders aussehen auf verschiedenen Betriebssystemen (Auf welchem ​​Betriebssystem läuft der Server? Welche Version?

Es sei denn, die alle Netzknoten (Server und Clients) laufen auf dem gleichen Betriebssystem und Architektur, gibt es einen Bedarf für Byte-Streams und wenig spezifische Typen (dh uint64_t datasize und uint8_t client_addr[16].

Ein weiteres Problem in Zusammenhang der Architektur wäre (vs. Netzwerk) Byte-Reihenfolge für die Nachrichtengröße.

verschiedene Architekturen weisen unterschiedliche Endianness, so ist es wichtig, richtig, um sicherzustellen, ist die Nachrichtenlänge gespeichert und gelesen werden.

ich ein union für t in Betracht ziehen würde Er Größe der Nachricht, oder die Länge der Nachrichtengröße auf 32 Bits (uint32_t) zu begrenzen, damit ich die POSIX network byte order API verwenden konnte.

typedef struct { 
    // Network byte ordered Message ID 
    uint32_t nb_mID; 
    // IP of sender as either a IPv4 string or a IPv6 string 39 
    uint8_t sender[39]; 
    // IPv4 vs. IPv6 data identifier 
    uint8_t sender_type; 
    // Network byte ordered Message size 
    uint32_t nb_datasize; 
} header_type; 

Side Hinweis

Als Randbemerkung, soll es, dass pro Verbindung Design mit einem Gewinde erwähnt werden, Verlangsamungen aufgrund Kontextschalter verursacht und könnte den Server anfällige todos Angriffe machen.

Normalerweise würden mehr Threads (oder Prozesse) laufen, dann würde die Anzahl der CPU-Kerne zu exzessiven Kontextwechsel führen. Dies ist normalerweise in gewissem Umfang aufgrund anderer Überlegungen akzeptabel, aber ein Thread pro Verbindung würde Systemressourcen sehr schnell herunterlaufen lassen, und das System könnte leicht einen Punkt erreichen, an dem es mehr Zeit für Kontextwechsel als für die Aufgabenleistung aufwendet.

+0

danke noch einmal –

+0

Das eigentliche Problem war mit der Serialisierung.Zusätzlich haben Sie einen Punkt hinzugefügt .. –

+0

@ Mohamad-jaafar, ich bin glücklich, dass Sie das Problem gelöst . – Myst