Mein Code hängt, wenn ich versuche, es mit vielen Threads auszuführen. Hier ist es,Warum ist dieser Openmp + Mpi-Code hängen
double CMpifun::sendData2()
{
double *tStatistics=new double[8], tmp_time; // wall clock time
double SY, Sto, header[SZ_HEADER];
int a_tasks=0, file_p=0;
vector<myDataType *> d = getData();
int idx=0;
opt_k.k=1; opt_k.proc_files=0; opt_k.p=this->node_sz; SY=0; Sto=0;
std::fill(header,header+SZ_HEADER,-1);
omp_set_num_threads(4);// for now
// parallel region
#pragma omp parallel default(none) shared(idx,SY,Sto,d,a_tasks,stdout) firstprivate(header) //firstprivate(dat_dim,dat)
{
int tid = omp_get_thread_num(), cur_idx, cur_k; int N=d.size();
while (idx<N) { // Assign tasks and fetch results where available
printf("-------------------------\n%d - 1\n", tid); fflush(stdout);
#pragma omp critical(update__a_task)
{
printf("%d - critique 1\n", tid); fflush(stdout);
if (idx<N) {
printf("%d - critique 2\n", tid); fflush(stdout);
if (a_tasks<node_sz-1){ // available nodes to assign
printf("%d - 2.1\n", tid); fflush(stdout);
MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,TAG_HEADER,MY_COMM_GRP,this->Stat);
cur_idx=idx; cur_k=opt_k.k; idx+=cur_k;
a_tasks+=cur_k;
} else {// all nodes assigned. only fetch result
printf("%d - 2.2\n", tid); fflush(stdout);
MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,TAG_RESULT,MY_COMM_GRP,this->Stat);
}
}else ;//printf("%d - done task assignment\n", tid); fflush(stdout);
}
printf("%d - 3\n", tid); fflush(stdout);
if (cur_idx<N) {
printf("%d: cur_idx:%d, opt_k.k:%d, idx:%d, N:%d \n", tid, cur_idx,opt_k.k,idx,N); fflush(stdout);
if(this->Stat->MPI_TAG == TAG_HEADER){ // serve tasks
printf("%d - task %d being assigned to %d\n", tid,cur_idx,(int)header[4]); fflush(stdout);
while (cur_k && cur_idx<N) {
printf("%d - T1\n", tid); fflush(stdout);
header[1]=d[cur_idx]->nRows; header[2]=d[cur_idx]->nCols; header[3]=cur_idx;
header[9]=--cur_k;
MPI_Send(header,SZ_HEADER,MPI_DOUBLE,(int)header[4],TAG_HEADER,MY_COMM_GRP);
printf("%d - T2 %d\n", tid,(int)header[4]); fflush(stdout);
MPI_Send(d[cur_idx]->data,d[cur_idx]->nRows*d[cur_idx]->nCols,MPI_DOUBLE,(int)header[4],TAG_DATA,MY_COMM_GRP);
printf("%d - T3 %d\n", tid,(int)header[4]); fflush(stdout);
delete[] d[cur_idx]->data; ++cur_idx;
}
} else if(this->Stat->MPI_TAG == TAG_RESULT){ // collect results
printf("%d - result from %d\n", tid,(int)header[4]); fflush(stdout);
while(true){
printf("%d - R1\n", tid); fflush(stdout);
#pragma omp atomic
--a_tasks;
double *results = new double[(int)(header[1]*header[2])];
MPI_Recv(results,(int)(header[1]*header[2]),MPI_DOUBLE,(int)header[4],TAG_DATA,MY_COMM_GRP,this->Stat);
printf("%d - R2 received result from %d\n", tid,(int)header[4]); fflush(stdout);
delete[] results;
if ((int)header[9]>0) {
MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,(int)header[4],TAG_RESULT,MY_COMM_GRP,this->Stat);
} else break;
} //end while
} // end collect results
} //end if(loopmain)
printf("%d - NExt idx: %d\n", tid,idx); fflush(stdout);
} // end while(loopmain)
} // end parallel section]
printf("<<<<<<<<<<<<< MASTER - COLLECTING RESULTS >>>>>>>>>>>> "); fflush(stdout);
printf("MASTER - pending tasks:%d\n",a_tasks); fflush(stdout);
while (a_tasks>0) {
printf("MASTER - wait for slave result request... pending tasks:%d\n",a_tasks); fflush(stdout);
MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,TAG_RESULT,MY_COMM_GRP,this->Stat);
while (true) {
double *results = new double[(int)(header[1]*header[2])];
printf("MASTER - wait for result from %d... pending tasks\n",(int)header[4]); fflush(stdout);
MPI_Recv(results,(int)(header[1]*header[2]),MPI_DOUBLE,(int)header[4],TAG_DATA,MY_COMM_GRP,this->Stat);
delete[] results;
--a_tasks;
if ((int)header[9]>0) {
printf("MASTER - result from slave .. some more\n"); fflush(stdout);
MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,(int)header[4],TAG_RESULT,MY_COMM_GRP,this->Stat);
} else break;
}
}
message("<<<<<<<<<<<<<<<<<< MASTER - terminate slaves >>>>>>>>>>>>>>>>>");
for(int i=1;i<node_sz;++i){ // terminate
MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,TAG_HEADER,MY_COMM_GRP,this->Stat);
printf("MASTER - terminate to signal %d\n",(int)header[4]); fflush(stdout);
MPI_Send(header,SZ_HEADER,MPI_DOUBLE,(int)header[4],TAG_TERMINATE,MY_COMM_GRP);
printf("MASTER - done terminated %d\n",(int)header[4]); fflush(stdout);
}
printf("MASTER - bye\n"); fflush(stdout);
return 0;
Die Slave-Funktion ist wie folgt,
void CMpifun::slave2()
{
double *Data; vector<myDataType> dataQ; vector<hist_type> resQ;
char out_opt='b'; // irrelevant
myDataType *out_im = new myDataType; hist_type *out_hist; CLdp ldp;
int file_cnt=0; double tmp_t; //local variables
double time_arr[3]={}; //1: task wait latency, 2: task set total send time, 3: taskset total process time
while (true) { // main while loop
printf("Slave: %d - ........... ready for task......\n",myRank); fflush(stdout);
header[4]=myRank; MPI_Send(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_HEADER,MY_COMM_GRP);
time_arr[0] = MPI_Wtime();
printf("Slave: %d - got master. waiting for task\n",myRank); fflush(stdout);
MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,MPI_ANY_TAG,MY_COMM_GRP,this->Stat);
time_arr[0] = MPI_Wtime() - time_arr[0]; // wait for task latency
if(this->Stat->MPI_TAG == TAG_TERMINATE) {
printf("Slave: %d - terminate signal received\n",myRank); fflush(stdout);
break;
}
printf("Slave: %d - got header. waiting for data\n",myRank); fflush(stdout);
//receive data
tmp_t = MPI_Wtime();
while(true) {
Data=new double[(int)(header[1]*header[2])];
MPI_Recv(Data,(int)(header[1]*header[2]),MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat);
myDataType d; d.data=Data; d.nRows=(int)header[1]; d.nCols=(int)header[2];
dataQ.push_back(d);
file_cnt++;
if ((int)header[9]) {
MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_HEADER,MY_COMM_GRP,this->Stat);
} else break;
}
time_arr[1] = (MPI_Wtime()-tmp_t); // Total bandwidth time for entire taskset
file_cnt = dataQ.size();
tmp_t = MPI_Wtime();
printf("Slave: %d - got data. processing\n",myRank); fflush(stdout);
while (dataQ.size()) { // process data
out_hist = new hist_type();
myDataType d = dataQ.back(); dataQ.pop_back(); // critical section
ldp.process(d.data, d.nRows,d.nCols,out_opt,out_im, out_hist);
resQ.push_back(*out_hist); out_hist=0;
delete[] d.data; delete[] out_im->data;
}
time_arr[2] = (MPI_Wtime()-tmp_t); // Total processing time for entire taskset
// tuma results
//time_arr[1] /= file_cnt; time_arr[2] /= file_cnt;
printf("Slave: %d - sending results\n",myRank); fflush(stdout);
header[4]=myRank; header[6]=time_arr[0]; header[7]=time_arr[1]; header[8]=time_arr[2];
for (size_t i = 0; i < resQ.size(); i++) {
header[1]=resQ[i].h_nHists; header[2]=resQ[i].h_binSz; header[9]=resQ.size()-i-1;
MPI_Send(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_RESULT,MY_COMM_GRP);
MPI_Send(resQ[i].hist_data,resQ[i].h_nHists*resQ[i].h_binSz,MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP);
}
resQ.clear();
} // end main while loop
message("terminating");
}
Es hängt nach einer zufälligen Anzahl von Iterationen der if (idx<N)
Schleife. Ich war die ganzen zwei Tage dabei. Kann jemand bitte den Code hinterfragen und mich wissen lassen, was das Problem verursacht? Alle Hilfe im Voraus geschätzt
Sie Code ist beide zu lang, um es manuell zu überprüfen (zumindest für mich). Es ist auch unvollständig, daher können wir das Problem nicht reproduzieren. Bitte destilliere dein Problem auf [mcve] herunter. In diesem Prozess können Sie es vielleicht sogar selbst herausfinden. Außerdem sollten Sie mehr Mühe darauf verwenden, zu beschreiben, wie es hängt. Verwenden Sie einen (parallelen) Debugger/Korrektheit Analysator, um herauszufinden, wo der Code hängt. – Zulan