MPI + OpenMP分段错误和不可预测的行为

轻描淡写

经过数小时尝试解决类似问题但未成功的决定,我决定将其发布。我正在编写一个C ++ MPI + OpenMP代码,其中一个MPI节点(服务器)将双精度数组发送到其他节点。服务器产生线程以便同时发送给许多客户端。串行版本(仅使用MPI)效果很好,单线程版本也是如此。多线程版本(openmp)在随机迭代后不断抛出分段错误错误。线printf("%d: cur_idx:%d, opt_k.k:%d, idx:%d, N:%d \n", tid, cur_idx,opt_k.k,idx,N)在每次迭代时打印出这些值。不可预测性是迭代的次数(在一次事件中,当我尝试在之后立即再次运行它时,代码成功运行仅引发了段错误)。但是,它总是以num_threads = 1完成。getData返回结构的向量,结构定义为(int,int,double *)。

这是代码

double *tStatistics=new double[8], tmp_time; // wall clock time
double SY, Sto;
int a_tasks=0, file_p=0;
vector<myDataType *> d = getData();

int idx=0; opt_k.k=1; opt_k.proc_files=0; opt_k.p=this->node_sz;
opt_k.proc_files=0; SY=0; Sto=0;
std::fill(header,header+SZ_HEADER,-1);

omp_set_num_threads(5);// for now
// parallel region

#pragma omp parallel default(none) shared(d,idx,SY,Sto) private(a_tasks)
{
    double *myHeader=new double[SZ_HEADER];
    std::fill(myHeader,myHeader+SZ_HEADER,0);
    int tid = omp_get_thread_num(), cur_idx, cur_k; int N;
    //#pragma omp atomic
        N=d.size();
    while (idx<N) {
        // Assign tasks and fetch results where available
        cur_idx=N;
        #pragma omp critical(update__idx)
        {
            if (idx<N) {
                cur_idx=idx; cur_k=opt_k.k; idx+=cur_k;
            }
        }
        if (cur_idx<N) {
            printf("%d: cur_idx:%d, opt_k.k:%d, idx:%d, N:%d \n", tid, cur_idx,opt_k.k,idx,N);
            MPI_Recv(myHeader,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MY_COMM_GRP,this->Stat);
            if(this->Stat->MPI_TAG == TAG_HEADER){ // serve tasks
                while (cur_k && cur_idx<N) {
                    myHeader[1]=d[cur_idx]->nRows; myHeader[2]=d[cur_idx]->nCols; myHeader[3]=cur_idx; myHeader[9]=--cur_k;
                    MPI_Send(myHeader,SZ_HEADER,MPI_DOUBLE,(int)myHeader[4],TAG_DATA,MY_COMM_GRP);
                    MPI_Send(d[cur_idx]->data,d[cur_idx]->nRows*d[cur_idx]->nCols,MPI_DOUBLE,(int)myHeader[4],TAG_DATA,MY_COMM_GRP);
                    delete[] d[cur_idx]->data;  ++cur_idx;
                }
            }else if(this->Stat->MPI_TAG == TAG_RESULT){ // collect results
                printf("%d - 4\n", tid);
            }

        } //end if(loopmain)
    } // end while(loopmain)

} // end parallel section

message("terminate slaves");
for(int i=1;i<node_sz;++i){ // terminate
  MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MY_COMM_GRP,this->Stat);
  MPI_Send(header,SZ_HEADER,MPI_DOUBLE,(int)header[4],TAG_TERMINATE,MY_COMM_GRP);
}
return 0;

另一个匹配功能是

void CMpifun::slave2()
{
    double *Data; vector<myDataType> dataQ; vector<hist_type> resQ;
    char out_opt='b'; // irrelevant
    myDataType *out_im = new myDataType;    hist_type *out_hist;    CLdp ldp;
    int file_cnt=0; double tmp_t; //local variables

    while (true) { // main while loop
        header[4]=myRank;   MPI_Send(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_HEADER,MY_COMM_GRP);
        MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,MPI_ANY_TAG,MY_COMM_GRP,this->Stat);
        if(this->Stat->MPI_TAG == TAG_TERMINATE) {
            break;
        }
        //receive data
        while(true) {
            Data=new double[(int)(header[1]*header[2])];
            MPI_Recv(Data,(int)(header[1]*header[2]),MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat);
            myDataType d; d.data=Data; d.nRows=(int)header[1]; d.nCols=(int)header[2];
            //dataQ.push_back(d);
            delete[] Data;
            file_cnt++;
            if ((int)header[9]) {
                MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat);
            } else break;
        }
    } // end main while loop
    message("terminating");

我已经尝试了所有解决类似问题的建议。这是我的环境设置

export OMP_WAIT_POLICY="active" 
export OMP_NUM_THREADS=4  
export OMP_DYNAMIC=true # "true","false"  
export OMP_STACKSIZE=200M # 
export KMP_STACKSIZE=$OMP_STACKSIZE  
ulimit -s unlimited

非常感谢所有投入。我越来越相信这与内存分配有某种关系,但也不知道为什么。我现在有以下代码:

double CMpifun::sendData2()
{
double *tStatistics=new double[8], tmp_time; // wall clock time
double SY, Sto; int a_tasks=0, file_p=0;
vector<myDataType *> d = getData();

int idx=0; opt_k.k=1; opt_k.proc_files=0; opt_k.p=this->node_sz;
opt_k.proc_files=0; SY=0; Sto=0;
std::fill(header,header+SZ_HEADER,-1);

omp_set_num_threads(224);// for now
// parallel region

#pragma omp parallel default(none) shared(idx,SY,Sto,d) private(a_tasks)
{
    double *myHeader=new double[SZ_HEADER];
    std::fill(myHeader,myHeader+SZ_HEADER,0);
    int tid = omp_get_thread_num(), cur_idx, cur_k; int N;

    //#pragma omp critical(update__idx)
    {
        N=d.size();
    }
    while (idx<N) {
        // Assign tasks and fetch results where available
        cur_idx=N;
        #pragma omp critical(update__idx)
        {
            if (idx<N) {
                cur_idx=idx; cur_k=opt_k.k; idx+=cur_k;
            }
        }
        if (cur_idx<N) {
            //printf("%d: cur_idx:%d, opt_k.k:%d, idx:%d, N:%d \n", tid, cur_idx,opt_k.k,idx,N);
            printf("%d: cur_idx:%d, N:%d \n", tid, cur_idx,N);
            //#pragma omp critical(update__idx)
            {
                MPI_Recv(myHeader,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MY_COMM_GRP,this->Stat);
            }
            if(this->Stat->MPI_TAG == TAG_HEADER){ // serve tasks
                while (cur_k && cur_idx<N) {
                    //#pragma omp critical(update__idx)
                    {
                        myHeader[1]=d[cur_idx]->nRows; myHeader[2]=d[cur_idx]->nCols;   myHeader[3]=cur_idx;
                        myHeader[9]=--cur_k;
                        MPI_Send(myHeader,SZ_HEADER,MPI_DOUBLE,(int)myHeader[4],TAG_DATA,MY_COMM_GRP);
                        MPI_Send(d[cur_idx]->data,d[cur_idx]->nRows*d[cur_idx]->nCols,MPI_DOUBLE,(int)myHeader[4],TAG_DATA,MY_COMM_GRP);
                        delete[] d[cur_idx]->data;
                    }
                    ++cur_idx;
                }
            }else if(this->Stat->MPI_TAG == TAG_RESULT){ // collect results
                printf("%d - 4\n", tid);
            }

        } //end if(loopmain)
    } // end while(loopmain)

} // end parallel section

message("terminate slaves");
for(int i=1;i<node_sz;++i){ // terminate
  MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MY_COMM_GRP,this->Stat);
  MPI_Send(header,SZ_HEADER,MPI_DOUBLE,(int)header[4],TAG_TERMINATE,MY_COMM_GRP);
}
return 0;

这是一对

void CMpifun::slave2()
{
double *Data; vector<myDataType> dataQ; vector<hist_type> resQ;
char out_opt='b'; // irrelevant
myDataType *out_im = new myDataType;    hist_type *out_hist;    CLdp ldp;
int file_cnt=0; double tmp_t; //local variables

while (true) { // main while loop
header[4]=myRank;   MPI_Send(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_HEADER,MY_COMM_GRP);
MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,MPI_ANY_TAG,MY_COMM_GRP,this->Stat);
if(this->Stat->MPI_TAG == TAG_TERMINATE) {
    break;
}
//receive data
while(true) {
    Data=new double[(int)(header[1]*header[2])];
    MPI_Recv(Data,(int)(header[1]*header[2]),MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat);
    myDataType *d=new myDataType; d->data=Data; d->nRows=(int)header[1]; d->nCols=(int)header[2];
    dataQ.push_back(*d);
    delete[] Data;
    file_cnt++;
    if ((int)header[9]) {
        MPI_Recv(header,SZ_HEADER,MPI_DOUBLE,MASTER,TAG_DATA,MY_COMM_GRP,this->Stat);
    } else break;
}

// Error section: Uncommenting next line causes seg fault
/*while (dataQ.size()) { // process data
    out_hist = new hist_type();
    myDataType d = dataQ.back(); dataQ.pop_back(); // critical section
    ldp.process(d.data, d.nRows,d.nCols,out_opt,out_im, out_hist);
    resQ.push_back(*out_hist); out_hist=0;
    delete[] d.data; delete[] out_im->data;
}*/

//time_arr[1] /= file_cnt; time_arr[2] /= file_cnt;
//header[6]=time_arr[0]; header[7]=time_arr[1]; header[8]=time_arr[2];
//header[4]=myRank; header[9]=resQ.size();

} // end main while loop

更新是,如果我取消注释Slave2()函数中的while循环,则运行不会完成。我不明白的是,此函数(slave2)完全没有openmp / threading,但似乎起作用。此外,它不与线程函数共享任何变量。如果我将麻烦的部分注释掉,则无论我设置了多少线程(4、18、300),代码都将运行。我的OpenMP环境变量保持不变。的输出limit -a如下,

core file size          (blocks, -c) 0
data seg size           (kbytes, -d) unlimited
scheduling priority             (-e) 0
file size               (blocks, -f) unlimited
pending signals                 (-i) 30473
max locked memory       (kbytes, -l) 64
max memory size         (kbytes, -m) unlimited
open files                      (-n) 1024
pipe size            (512 bytes, -p) 8
POSIX message queues     (bytes, -q) 819200
real-time priority              (-r) 0
stack size              (kbytes, -s) 37355
cpu time               (seconds, -t) unlimited
max user processes              (-u) 30473
virtual memory          (kbytes, -v) unlimited
file locks                      (-x) unlimited

我的构造函数还调用了mpi_init_thread。为了解决@Tim问题,new在遵循类似问题的解决方案建议后,我之所以使用动态内存(带有)是为了避免过大的堆栈内存。感谢您的协助。

提姆

我看到的最大问题是您的代码展示了许多竞争条件。您所看到的不稳定行为无疑是由此引起的。请记住,每次访问OpenMP中的共享变量(通过shared关键字或通过全局作用域声明)时,都在访问该内存,该内存可由该组中的任何其他线程读取或写入,而不能保证顺序。例如,

N = d.size();

是竞争条件,因为std::vector它不是线程安全的。因为您在类内部使用OpenMP,所以任何成员变量也被视为“全局”,因此默认情况下不是线程安全的。

正如@ tim18所指出的,由于您是从OpenMP并行区域中调用MPI例程,因此应使用MPI_Init_thread函数将MPI运行时初始化为线程安全的


顺便说一句,您的C ++需要做一些工作。您永远不要使用newdelete在用户级代码中使用。使用RAII管理对象生存期,并将大型数据结构包装在为您管理生存期的瘦对象中。例如,这条线

delete[] d[cur_idx]->data;

告诉我,您的代码中潜伏着恶魔,等待释放给毫无戒心的用户(可能是您!)。顺便提及,这也是比赛条件。许多恶魔!

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章