Ich entwickle einen Prototyp eines Algorithmus, der auf einer Gruppe von Knoten funktioniert, wobei jeder Knoten eine Verbindung zu allen anderen Knoten unterhält und Nachrichten sendet.Segmentierungsfehler in einem Multithread-Client/Server-Prototyp
Um eine Nachricht zu senden, sendet der Knoten zuerst Header mit fester Größe und dann die Daten.
Nach einer Menge Arbeit, habe ich festgestellt, dass das Problem mit dem Multi-Thread-Programmierung Teil des Codes ist. Daher habe ich diesen Code als PoC erstellt.
Dieser Prototyp wurde entwickelt, um einen Server mit mehreren Clients einer während der Kompilierung definierten Nummer zu haben.
Ein Server ist zuständig für das Abhören einiger Clients in einem separaten Thread. Da dies nur ein Prototyp ist, lassen wir die Daten fallen.
Jeder Client sendet Daten in zwei Schritten in den Server: Header und Körper mit send_message
.
Übrigens sollte dieser Algorithmus Daten über eine bestimmte Bandbreite Benchmark von jedem Client zum Server produzieren. Standardmäßig sendet jeder Client 100 MB/s-Daten an den Server.
Der Code umfasst:
client.cpp:
#include "network.h"
int main(int argc, char *argv[]){
int sockfd;
std::cout << "HEADER: " << HEADER << std::endl;
// Read the server's IP
struct hostent *server = gethostbyname(argv[1]);
// Read the arguments from console
get_client_arguments (argc, argv);
// Connect to the server
sockfd = connect_to_server(server);
sleep (1);
// Start sending (Start the experiment)
multi_unicaster (sockfd);
close (sockfd);
return 0;
}
/**********************************************/
void usage (char *argv){
std::cout << "usage: " << argv << " hostname [-p port] [-t throughput]" << std::endl;
exit(0);
}
/**********************************************/
server.cpp:
#include "network.h"
/**********************************************/
int main(int argc, char *argv[]){
std::cout << "HEADER: " << HEADER << std::endl;
get_server_arguments(argc, argv);
// Create several threads to listen for incoming connections
// and read data from several clients simultaneously
start_listening_threads();
return 0;
}
/**********************************************/
void usage (char *argv){
std::cout << "usage: " << argv << " [-p port]" << std::endl;
exit(0);
}
/**********************************************/
message.h:
#ifndef __MESSAGE__
#define __MESSAGE__
#include <vector>
#include <queue>
#include <string>
#include <cstring>
#include <cstdio>
#include <algorithm>
#include <stdexcept>
#include <iostream>
#include <cstdlib>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include "func.h"
/***************************************/
// The header structure
typedef struct {
// Message ID
unsigned mID;
// IP of sender
struct in_addr sender;
// Message sie
size_t datasize;
}header_type;
/***************************************/
#define HEADER sizeof(header_type)
/***************************************/
/*
* Message Class
*/
class message
{
private:
// Message header
header_type * header;
// Message text
byte * text;
public:
// Message Accessors, mutators and related functions
byte * get_text();
header_type * get_header();
void set_datasize(size_t);
size_t get_datasize();
struct in_addr get_sender();
void set_ID(unsigned);
unsigned get_ID();
void print();
message(int,struct in_addr,size_t);
message(header_type *, size_t);
~message();
message & operator = (const message&);
message(const message&);
};
#endif
/***************************************/
extern std::queue <message * > sending_messages_queue;
/***************************************/
/*
* Constructor used for initializing complete messages
*
*/
message::message(int ID,struct in_addr IP,size_t d_s){
header = (header_type *) malloc (HEADER);
header -> mID = ID;
header -> datasize = d_s;
header -> sender.s_addr = IP.s_addr;
if (d_s > 0){
text = (byte *) malloc (d_s);
memset (text, '.', d_s);
}
else text = NULL;
}
/***************************************/
/*
* Copy constructor (Initialize)
*
*/
message::message(const message& other){
header = (header_type *) malloc (HEADER);
std::memcpy (header, other.header, HEADER);
if (header -> datasize > 0){
text = (byte *) malloc (header -> datasize);
std::memcpy (text, other.text, header -> datasize);
} else
text = NULL;
}
/***************************************/
/*
* destructor
*
* Message destructor
*/
message::~message() {
if (text != NULL){
free(text);
text = NULL;
}
}
/***************************************/
/*
* Assignment operator (Update)
*
*/
message & message::operator = (const message& other) {
header = (header_type *) malloc (HEADER);
std::memcpy (header, other.header, HEADER);
if (header -> datasize >0){
text = (byte *) malloc (header -> datasize);
std::memcpy (text, other.text, header -> datasize);
} else
text = NULL;
return *this;
}
/***************************************/
/*
* another constructor
*
*/
message::message(header_type *h, size_t s){
header = (header_type *) malloc (HEADER);
std::memcpy (header, h, HEADER);
if (s > 0){
text = (byte *) malloc (s);
std::memset (text, '.', s);
} else
text = NULL;
}
/***************************************/
/*
* get_header
*
* Header accessor
*/
header_type * message::get_header(){
return header;
}
/***************************************/
/*
* get_text
*
* Text accessor
*/
byte * message::get_text(){
return text;
}
/***************************************/
/*
* get_sender
*
* Sender IP accessor
*/
struct in_addr message::get_sender(){
return header -> sender;
}
/***************************************/
/*
* get_datasize
*
* datasize accessor
*/
size_t message::get_datasize(){
return header -> datasize;
}
/***************************************/
/*
* set_ID
*
* ID mutator
*/
void message::set_ID(unsigned ID) {
header -> mID = ID;
}
/***************************************/
/*
* get_ID
*
* ID Accessor
*/
unsigned message::get_ID() {
return header -> mID;
}
/***************************************/
/*
* set_datasize
*
* datasize mutator
*/
void message::set_datasize(size_t d) {
header -> datasize = d;
}
/***************************************/
/*
* print
*
*/
void message::print() {
std::cout << header -> mID << "," << inet_ntoa (header -> sender) << "," << header -> datasize;
std::cout << std::endl;
}
/***************************************/
func.h:
// Some support functions
using namespace std;
/***********************************************************************/
#include <stdio.h>
#include <sys/types.h>
#include <cstdlib>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <unistd.h>
#include <strings.h>
#include <cstring>
#include <iostream>
#include <string>
#include <sys/time.h>
#include <math.h>
/***********************************************************************/
typedef uint8_t byte;
const unsigned long kilo = 1024;
const unsigned long mega = 1024 * kilo;
const unsigned long giga = 1024 * mega;
const unsigned MESSAGE_SIZE = 10 * kilo;
/***********************************************************************/
int port = 4444;
int throughput = 100;
int newsockfd [CLIENTS];
/***********************************************************************/
void usage (char *argv);
/***********************************************************************/
/*
*
* subtract_time
*
* Subtracts time to handle negative values
*
*/
struct timeval subtract_time (struct timeval * left_operand, struct timeval * right_operand){
struct timeval res;
if (left_operand -> tv_sec >= right_operand -> tv_sec){
if (left_operand -> tv_usec >= right_operand -> tv_usec){
res.tv_sec = left_operand -> tv_sec - right_operand -> tv_sec;
res.tv_usec = left_operand -> tv_usec - right_operand -> tv_usec;
}else{
res.tv_sec = left_operand -> tv_sec - right_operand -> tv_sec - 1;
res.tv_usec = 1000000 + left_operand -> tv_usec - right_operand -> tv_usec;
}
}
return res;
}
/***********************************************************************/
void get_server_arguments (int argc, char *argv[]){
int i = 1;
while (i < argc){
if (strcmp (argv [i], "-p") ==0){
port = atoi (argv [i + 1]);
i+= 2;
}
else usage (argv [0]);
}
}
/***********************************************************************/
void get_client_arguments (int argc, char *argv[]){
int i = 2;
while (i < argc){
if (strcmp (argv [i], "-p") ==0){
port = atoi (argv [i + 1]);
i+= 2;
}
else if (strcmp (argv [i], "-t") ==0){
throughput = atoi (argv [i + 1]);
i+= 2;
}
else usage (argv [0]);
}
}
/***********************************************************************/
void print_bandwidth(unsigned long long sz){
double size;
if (sz > giga){
// Round result and show two decimal values
size = round (sz/(giga /1000));
std::cout << size /1000 << " Gb/s"<< std::endl;
}
else if (sz > mega){
// Round result and show two decimal values
size = round (sz/(mega /100));
std::cout << size /100 << " Mb/s"<< std::endl;
}
else if (sz > kilo){
// Round result and show one decimal value
size = round (sz /(kilo /10));
std::cout << size /10 << " Kb/s"<< std::endl;
}
else{
std::cout << sz << " b/s"<< std::endl;
}
}
/***********************************************************************/
network.h:
// Network related functions
#include "message.h"
#include <netinet/tcp.h>
#include <arpa/inet.h>
/***********************************************************************/
void read_message (int);
int accept_connection (int);
void * listening (void *);
/***********************************************************************/
unsigned burst_size;
bool NAGLE = false;
struct sockaddr_in serv_addr;
struct timeval recent_elapsed_time_val {0,0};
struct timeval start_tv;
int initial_listening_socket;
unsigned connections = 0;
/***********************************************************************/
void listen_for_connections(){
// Server: Listens for connections from clients
struct sockaddr_in serv_addr;
initial_listening_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (initial_listening_socket < 0)
std::cerr << "ERROR opening socket";
bzero((char *) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(port);
serv_addr.sin_addr.s_addr = INADDR_ANY;
if (bind(initial_listening_socket, (struct sockaddr *) &serv_addr,sizeof(serv_addr)) < 0)
std::cerr << "ERROR on binding"<<std::endl;
listen(initial_listening_socket,CLIENTS);
}
/***********************************************************************/
int accept_connection(){
// Server: Accepts connections from client
int newsockfd;
socklen_t clilen;
struct sockaddr_in cli_addr;
clilen = sizeof(cli_addr);
std::cout << "waiting for new connection .." << std::endl;
newsockfd = accept(initial_listening_socket, (struct sockaddr *) &cli_addr, &clilen);
std::cout << "received new connection .." << std::endl;
connections ++;
if (connections == CLIENTS)
close(initial_listening_socket);
return newsockfd;
}
/***********************************************************************/
int connect_to_server(struct hostent *server){
// Client: Connects to server
int sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sockfd < 0)
std::cerr << "ERROR opening socket";
if (server == NULL){
std::cerr << stderr << "ERROR, no such host"<< std::endl;
exit(0);
}
int flag;
if (NAGLE) flag = 0;
else flag = 1;
if (setsockopt (sockfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)) ==-1){
perror ("ERROR on setting TCP_NODELAY!");
std::terminate();
}
bzero((char *) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
bcopy((char *)server->h_addr,(char *)&serv_addr.sin_addr.s_addr,server->h_length);
serv_addr.sin_port = htons(port);
if (connect(sockfd,(struct sockaddr *)&serv_addr,sizeof(serv_addr)) < 0)
std::cerr <<"ERROR connecting"<< std::endl;
return sockfd;
}
/***********************************************************************/
void start_listening_threads(){
// Server: Creates listening threads
pthread_t listening_thread [CLIENTS];
listen_for_connections();
for(unsigned i=0;i< CLIENTS;i++){
unsigned * arg = (unsigned *) malloc(sizeof(*arg));
if (arg == NULL) {
fprintf(stderr, "Couldn't allocate memory for thread arg.\n");
exit(EXIT_FAILURE);
}
*arg = i;
pthread_create(&listening_thread[i], NULL,(void* (*)(void*))&listening, arg);
}
for(unsigned i=0;i< CLIENTS;i++){
pthread_join (listening_thread[i], NULL);
}
}
/***********************************************************************/
void * listening (void *a){
// Server: Start listening after establishing a connection with the client
int i = *((int *) a);
newsockfd [i] = accept_connection();
while (1){
read_message (newsockfd [i]);
}
return NULL;
}
/***********************************************************************/
void measure_throughput(unsigned counter){
// Client: Tracks throughput and keeps on the wanted threshold
struct timeval current_time;
// Get the current time in order to track the throughput
gettimeofday (¤t_time, NULL);
struct timeval elapsed_time_val = subtract_time (¤t_time, &start_tv);
double elapsed = elapsed_time_val.tv_sec+ (elapsed_time_val.tv_usec/1000000.0);
unsigned long long sent_bytes = counter * (MESSAGE_SIZE + HEADER);
if (elapsed > 0){
// Calculate the expected time to send sent_bytes
double theoretical_time = (sent_bytes)/((throughput * mega)/8.0);
// Compare the expected time with the real elapsed time
if (theoretical_time > elapsed){
__useconds_t additional_time = (theoretical_time - elapsed) * 1000000;
usleep (additional_time);
}
}
if (elapsed_time_val.tv_sec > recent_elapsed_time_val.tv_sec){
unsigned sending_throughput = (unsigned)((sent_bytes * 8)/(mega * elapsed * 1.0));
std::cout << "throughput: " << sending_throughput << std::endl;
recent_elapsed_time_val = elapsed_time_val;
}
}
/***********************************************************************/
void send_message (message * m, int sockfd){
// Client: Send on message header then data.
if (write (sockfd, m -> get_header(), HEADER) == -1){
perror ("Error exporting Header to socket");
close (sockfd);
exit (1);
}
if (write (sockfd, m -> get_text(), MESSAGE_SIZE) == -1){
perror ("Error exporting Header to socket");
close (sockfd);
exit (1);
}
}
/***********************************************************************/
void read_message (int sockfd){
// Server: Listens for one message header then text.
int receivedPackage = 0;
int pos = 0;
int expected_bytes = HEADER;
header_type header;
while (expected_bytes >0){
if ((receivedPackage = read(sockfd, &header + pos, expected_bytes)) < 0){
perror ("ERROR importing message header from socket!");
std::terminate();
}
pos += receivedPackage;
expected_bytes -= receivedPackage;
}
if (header.datasize != MESSAGE_SIZE){
message * m = new message (&header, (size_t)0);
m-> print();
}
pos = 0;
receivedPackage = 0;
expected_bytes = MESSAGE_SIZE;
byte text [MESSAGE_SIZE];
while (expected_bytes >0){
if ((receivedPackage = read(sockfd, text + pos, expected_bytes)) < 0){
perror ("ERROR importing message header from socket!");
std::terminate();
}
pos += receivedPackage;
expected_bytes -= receivedPackage;
}
}
/***********************************************************************/
void multi_unicaster (int sockfd){
unsigned counter=0;
gettimeofday (&start_tv, NULL);
while (1){
counter ++;
struct in_addr IP;
inet_aton ("127.0.0.1",&IP);
message * m = new message (counter, IP, MESSAGE_SIZE);
if (m -> get_datasize() != MESSAGE_SIZE)
m-> print();
send_message (m,sockfd);
delete m;
measure_throughput(counter);
}
}
/***********************************************************************/
Makefile:
all: server client
FLAGS=-Wall -Wextra -Werror -pedantic -pthread $(ARGS) -std=c++11 -g -rdynamic -lpthread
CXXFLAGS=$(DEF) $(FLAGS)
output/%.o: %.cpp
g++ $(CXXFLAGS) -c -o [email protected] $<
client: output/client.o
g++ $(FLAGS) -o [email protected] $^
server: output/server.o
g++ $(FLAGS) -o [email protected] $^
clean:
rm -rf output/* *~ server client
Wenn auf dem Loopback den Code ausgeführt wird, geht alles gut, aber wenn es auf auseinander Servern zu testen (in der Tat, abgesehen Rechenzentren) Manchmal funktioniert es ohne Probleme und andere nicht.
Wenn die empfangenen Daten richtig sind, sollte der empfangene Header richtig sein. Um zu verifizieren, dass die empfangenen Daten nicht fehlerhaft sind, sollte die in der Kopfzeile empfangene datasize
korrekt sein (d. H. 10 * Kilo), ansonsten sind die Daten durcheinander.
Diese Überprüfung wird in read_message
Funktion in network.h
zur Verfügung gestellt, wo ich denke, ist das Problem.
Ich habe all diesen Code zur Verfügung gestellt, sollte jemand es testen müssen.
Was ist das Problem? Was geschieht? Was passiert nicht? Was muss passieren? Wir können Ihre Gedanken nicht lesen, und es ist schwierig, ein Problem zu finden, wenn wir nicht wissen, wonach wir suchen ... – DrDonut
Ich hoffe, Ihr Server akzeptiert nicht mehr Clients als die CPU-Kerne, die er hat ... alles läuft Diese Threads verlangsamen Sie aufgrund exzessiver Kontextwechsel ... Warum nicht vorhandene Bibliotheken verwenden? – Myst
@ Mohamad-jaafar Gibt es eine Möglichkeit, den Code zu verkleinern (https://stackoverflow.com/help/mcve)? Es scheint nicht so klein zu sein, wie es sein könnte, daher kann es für einige Leser schwierig sein, das Problem zu verstehen und eine gute Antwort auf Ihre Frage zu finden. –