编绎: g++ -g -lpthread -luuid 2.cpp commfunc.cpp threadinit.cpp -o move_mi_org
说明:
并发调用迁移用户工具脚本c++ 基于邮件系统工具的调用.由于NNDD 工具一次调用需要1S左右,有几亿用户需要迁移,则何年马月才能搞完。需要并发,叫研发搞则是无可能的,没有kpi啊。
1)2为mail.cpp commfunc.cpp 为函数通用文件 threadinit .cpp 为线程文件。
作用: 迁移用户从一台机器到另一个机器
./move_mi_org ./move_mi_org file_name global_thread_number local_flag target_minodeid target_mivol_num target_orgid file_name:号码文件 global_thread_number:处理线程数量 local_flag:是否在本地mi 1:是[会执行mv操作] 0:不是 target_minodeid:目标mi的nodeid target_mivol_num:目标mi挂载卷总数 target_orgid:目标ORG example as below::::::::::::::: ./move_mi_org filename 10 0 175 8 1 -----------------------------------------xiao-------------------------------
2.cpp #include <iostream> #include <cstdio> #include <stdint.h> #include <cstdlib> #include <cstring> #include <ctime> #include <ctype.h> #include <stdarg.h> #include <unistd.h> #include <fcntl.h> #include <errno.h> #include <signal.h> #include <sys/types.h> #include <sched.h> #include <sys/socket.h> #include <sys/stat.h> #include <sys/wait.h> #include <arpa/inet.h> #include <netinet/in.h> #include <netdb.h> #include <dirent.h> #include <syslog.h> #include <sys/time.h> #include <net/if.h> #include <sys/ipc.h> #include <sys/msg.h> #include <pthread.h> #include <semaphore.h> #include <stdexcept> #include <queue> #include <string> #include <sys/ipc.h> #include <sys/msg.h> #include <sys/shm.h> #include <sys/sem.h> #include <sys/resource.h> #include <sys/ioctl.h> #include <utility> #include <map> #include <fstream> #include <sstream> #include "threadinit.hpp" #include "commfunc.hpp" #include "GetTime.hpp" //#include <mutex> //#include <condition_variable> #define BUF 15 using namespace std; queue<string> global_qe; int32_t global_thread_number; ofstream global_log_file; ofstream global_result_file; ofstream result_folder_log_file; int32_t is_log=1; string global_startday; string global_endday; int32_t global_select_type; bool global_folder_detail; char global_sz_delist_file[1024]={0}; pthread_mutex_t map_fileio_mutex=PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t log_mutex=PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t log_result_mutex =PTHREAD_MUTEX_INITIALIZER; //std::condition_variable data_con; //mutable std::mutex mut; pthread_cond_t g_cond =PTHREAD_COND_INITIALIZER ; bool g_local_flag=false; bool g_md5flag=true; struct sigaction oldact; unsigned long long global_line; string target_minodeid; int32_t target_mivol_num; string target_orgid; string bak_uin; GenTime gettime; int32_t operate_kernel_sigact() { struct sigaction act; sigemptyset(&act.sa_mask); act.sa_handler=SIG_IGN; act.sa_flags |= SA_NOCLDWAIT; if(sigaction(SIGCHLD,&act,&oldact) < 0){ return -2; } return 0; } int32_t operate_kernel_sigset() { sigset_t setnew,setold; sigfillset(&setnew); sigdelset(&setnew,SIGCHLD); sigprocmask(SIG_BLOCK, &setnew, NULL); return 0; } void killfunc(int sig){ cerr <<"sig happen"<<endl; } void handler(int sig) { printf("capture a SIGALRM signal %d \n",sig); } void help(char *s ){ cout <<s<<" file_name global_thread_number local_flag target_minodeid target_mivol_num target_orgid"<<endl; cout <<"file_name:号码文件"<<endl; cout <<"global_thread_number:处理线程数量"<<endl; cout <<"local_flag:是否在本地mi 1:是[会执行mv操作] 0:不是"<<endl; cout <<"target_minodeid:目标mi的nodeid"<<endl; cout <<"target_mivol_num:目标mi挂载卷总数"<<endl; cout <<"target_orgid:目标ORG"<<endl; cout <<"example as below:::::::::::::::"<<endl; cout <<s<<" filename 10 0 175 8 1"<<endl; cout <<"-----------------------------------------xiao-------------------------------"<<endl; exit(1); } int main(int argc,char* argv[]) { if(argc != 7){ help(argv[0]); } /*create daemon process. signal(SIGHUP,SIG_IGN); pid_t daemon_pid=fork(); if(daemon_pid==-1){ cerr<<"Error to create daemon process ."<<endl; exit(1); } if(daemon_pid>0){ exit(0); } if(daemon_pid==0){ setsid(); umask(0); } daemon_pid=fork(); if(daemon_pid==-1){ cerr<<"Error to create daemon process ."<<endl; exit(1); } if(daemon_pid>0){ exit(0); } operate_kernel_sigact(); operate_kernel_sigset(); */ struct rlimit limit; limit.rlim_cur = RLIM_INFINITY; limit.rlim_max = RLIM_INFINITY; if(setrlimit(RLIMIT_CORE, &limit)<0){ cerr<<"Error to set rlimit"<<endl; } limit.rlim_cur = 102400; limit.rlim_max = 102400; if(setrlimit(RLIMIT_NOFILE, &limit)<0){ cerr<<"Error to set rlimit"<<endl; } limit.rlim_cur = RLIM_INFINITY; limit.rlim_max = RLIM_INFINITY; if(setrlimit(RLIMIT_AS, &limit)<0){ cerr<<"Error to set rlimit"<<endl; } limit.rlim_cur = 10485760; limit.rlim_max = 10485760; if(setrlimit(RLIMIT_STACK, &limit)<0){ cerr<<"Error to set rlimit"<<endl; } // a.out file_number thread_number char buf[BUF]; int32_t fd_open; char* file_name=argv[1]; global_thread_number=atoi(argv[2]); //int32_t data_queue=500; int32_t data_queue=1; int32_t local_flag=atoi(argv[3]); target_minodeid=argv[4]; target_mivol_num=atoi(argv[5]); target_orgid=argv[6]; for(int32_t i=0; i<argc;i++){ cout <<"argv["<<i<<"]:"<<argv[i]<<endl; } struct stat64 sst; struct stat st; bak_uin="/home/move_all/bak_uin/"; if(stat(bak_uin.c_str(),&st) <0){ cerr <<"error:"<<bak_uin<<endl; if( mkdir((bak_uin+"dat/").c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH)== -1){ cerr<<"create dir:"<<bak_uin<<" failed"<<endl; exit(1); } } if(!(S_ISDIR(st.st_mode))){ cerr <<"errordir:"<<bak_uin<<endl; exit(1); } if (local_flag == 0) { g_local_flag=false; }else if( local_flag == 1 ){ g_local_flag=true; }else{ cerr<<"local_flag error "<<endl; exit(1); } //exit(0); static char bufff[1024]={0}; int32_t count=readlink("/proc/self/exe",bufff,sizeof(bufff)); if(count <0 || count >=(int)sizeof(bufff)){ memset(bufff, 0, sizeof(buf)); } bufff[count]='\0'; char *p=NULL; p=strrchr(bufff,'/'); *p='\0'; sprintf(global_sz_delist_file,"%s/%s",bufff,"del_sender_list"); cout << global_sz_delist_file<<endl; char *today_date=gettime.get_ymd2_time(); string tmplog_var=bak_uin+"tmplog."+string(today_date); string result_file_var=bak_uin+"result_file."+string(today_date); global_log_file.open(tmplog_var.c_str(),ios_base::app); if(!global_log_file){ cerr<<"Error log file path or some errors occured"<<endl; cerr<<strerror(errno)<<endl; exit(1); } global_result_file.open(result_file_var.c_str(),ios_base::app); if(!global_result_file){ cerr<<"global_result_filelog file path or some errors occured"<<endl; exit(1); } char* tmp_file="tmp_file"; cout <<file_name<<endl; //fd_open=open(file_name,O_RDONLY); FILE * pNumberFile; pNumberFile=fopen(file_name,"r"); if (pNumberFile == NULL) { perror ("Error opening file"); exit(1); } char* tmp; string str,str_que; unsigned long long line=0; // if(fd_open<0){ // cout <<"error"<<endl; //} unsigned long long x_single_read=0; unsigned long long x_read=0; // char buff[1024]; memset(buff,'\0',1024); int fd_rwtmp_open=open(tmp_file,O_RDWR|O_CREAT,S_IWUSR|S_IRUSR ); cout <<fd_rwtmp_open<<endl; read(fd_rwtmp_open,buff,sizeof(char)*1024); cout <<"aaaaa:"<<buff<<endl; close(fd_rwtmp_open); std::vector<std::string> vct; SplitStringIntoVector(buff,"@",vct); for (int i=0; i<vct.size();++i){ cout <<i<<":"<<vct[i]<<endl; } MASHBD_WORK::WORK_thread mw(global_thread_number); mw.dome(); if(vct.size()==3){ /*if(lseek(fd_open,atoll(vct[1].c_str()),SEEK_SET)<0){ cerr<<"lseek"<<strerror(errno)<<endl; }*/ if(fseek(pNumberFile,atoll(vct[1].c_str()),SEEK_SET) != 0){ cerr<<"fseek error"<<strerror(errno)<<endl; exit(1); } line=atoll(vct[0].c_str()) ; global_line=line; x_read=x_read+atoll(vct[1].c_str()); unsigned long long number= atoll(vct[2].c_str()); string recode=convertIntlong(line+1)+"@"+convertIntlong(x_read)+"@"+convertIntlong(number); common_log_file("INFO","CONTINU_START_TMP_FILE="+recode,global_log_file); } while(fgets(buf,sizeof(buf),pNumberFile) != NULL && !feof(pNumberFile)){ /* day rotate*/ char *now_date=gettime.get_ymd2_time(); if(strcasecmp(today_date,now_date)!=0){ delete [] today_date; today_date=now_date; global_log_file.close(); global_result_file.close(); //string tmplog_var="tmplog."+string(today_date); //string result_file_var="result_file."+string(today_date); string tmplog_var=bak_uin+"tmplog."+string(today_date); string result_file_var=bak_uin+"result_file."+string(today_date); global_log_file.open(tmplog_var.c_str(),ios_base::app); global_result_file.open(result_file_var.c_str(),ios_base::app); if(!global_log_file){ cerr<<"Error to open result file to exchage day write results"<<endl; continue; } if(!global_result_file){ cerr<<"Error to open result file to exchage day write results"<<endl; continue; } } x_read=x_read+strlen(buf); buf[strlen(buf)-1]='\0'; // cout <<"line:"<<line+1<<" len:"<<strlen(buf)<<" content:"<<buf<<endl; string tmpbuf=buf; int fd_w_open; while(1){ if(global_qe.size()>data_queue){ //cerr<<"warn queue more than "<<data_queue<<" "<<"size:"<<global_qe.size()<<" loopwait"<<endl; usleep(5000); continue; }else{ // pthread_mutex_lock(&map_fileio_mutex); //std::unique_lock<std::mutex> lk(mut); global_qe.push(tmpbuf); //data_con.notify_one(); // pthread_cond_signal(&g_cond); // pthread_mutex_unlock(&map_fileio_mutex); fd_w_open=open(tmp_file,O_WRONLY|O_CREAT|O_TRUNC,S_IRWXU | S_IXGRP | S_IRGRP | S_IROTH | S_IXOTH); string recode=convertIntlong(line+1)+"@"+convertIntlong(x_read)+"@"+tmpbuf; common_log_file("INFO","INSERT_TMP_FILE="+recode,global_log_file); unsigned long long x_single_write=write(fd_w_open,recode.c_str(),recode.length()); if(x_single_write<0){ cerr<<"write error"<<endl; exit(1); } global_line=line+1; close(fd_w_open); cout <<"write_recode:"<<recode<<" fd_w_open:"<<fd_w_open<<endl; break; } } cout <<"line:"<<line+1<<" len:"<<strlen(buf)<<" content:"<<buf<<endl; ++line; } delete [] today_date; sleep(60); cout <<"out<<"<<endl; common_log_file("WARN","MAIN FINISH BEFORE OUT CCCCCCCCCCCCCCCCCCCCCCCCCCCC",global_log_file); return 0; }
cat threadinit.cpp //#include "HttpTcp.hpp" #include "threadinit.hpp" #include <iostream> #include <cstdio> #include <ctime> #include <string> #include <cstdlib> #include <cstring> #include <fstream> #include <vector> #include <map> #include <unistd.h> #include <pthread.h> #include <queue> #include <sstream> #include <sys/epoll.h> #include <sys/stat.h> #include <sys/types.h> #include "commfunc.hpp" #define BUF 65536 using namespace std; struct workapp_id{ int32_t kid; }; int32_t do_command(int32_t kid,string uuid_str_tmp,string queue_front,unsigned mod_q){ int32_t g_select_count =1; int32_t ret=1; int32_t g_flag; string system_str=""; string cmdname=""; bool ret_flag=false; system_str="/home/richmail/bin/NNDD--move-user-to-miid " +queue_front+" \"miNodeId="+target_minodeid+"&miNodeVolId="+convertIntlong(mod_q)+"&orgid="+target_orgid+"\""; cmdname="--move-user-to-miid"; ret_flag=true; FILE *fstream=NULL; char buff[1024]; memset(buff,0,sizeof(buff)); string logout="/home/richmail/bin/NNDDNAME "+queue_front+" &"; system(logout.c_str()); cout<<"cmd="<<system_str<<endl; // return 0; if(NULL==(fstream=popen(system_str.c_str(),"r"))) { fprintf(stderr,"tid=%s execute command failed: %s",uuid_str_tmp.c_str(),strerror(errno)); common_log_file("ERROR","tid="+uuid_str_tmp+" popenfstream="+string(strerror(errno))+" error="+convertInt(errno)+" cmd="+system_str+" queue_front="+queue_front+" kid="+convertInt(kid),global_log_file); ret= -1; } common_log_file("INFO","tid="+uuid_str_tmp+ " cmd="+system_str+" kid="+convertInt(kid),global_log_file); string tmp_count=""; while(NULL!=fgets(buff, sizeof(buff), fstream)&& !feof(fstream)) { string tmpb=queue_front+" "+buff; tmp_count=tmp_count+buff; common_log_file("INFO","tid="+uuid_str_tmp+" result= "+trim_string(tmpb)+" vol_id="+convertIntlong(mod_q)+" kid="+convertInt(kid),global_log_file); //result_log_file(tmpb.c_str(),global_result_file); } if(tmp_count.compare("")==0 ){ string queue_front1=queue_front+" "+cmdname +" nothing putout\r\n"; result_log_file(queue_front1.c_str(),global_result_file); pclose(fstream); return ret ; } //src_mi_vol_id=1| src_mi_path=01/52/8/l960967432.dat| dst_res_id=1 pclose(fstream); std::size_t found1 = tmp_count.find("src_mi_path="); std::size_t found2 = tmp_count.find("| dst_res_id="); std::size_t found3 = tmp_count.find("success"); bool suc=false; if(found3!=std::string::npos){ suc=true; }else{ result_log_file((queue_front+" NONE NONE RICHMAIL_ERROR\n").c_str(),global_result_file); } if(g_local_flag == 1){ if (found1!=std::string::npos && found2!=std::string::npos && found3!=std::string::npos){ string src_mi_path=tmp_count.substr(found1+12,found2-found1-12); std::size_t found = src_mi_path.find_last_of("/"); string dat=src_mi_path.substr(found+1); cerr<<"src_mi_path:"<<src_mi_path<<" only-dat:"<<dat<<endl; // mv src_mi_path string BAK_UIN_PATH=bak_uin+"dat/"+dat; string info_path="/nfsdata/mi/midata_"+src_mi_path; /* if(rename(info_path.c_str(),BAK_UIN_PATH.c_str())<0){ cerr<<"rename "<<info_path<<","<<BAK_UIN_PATH<<" error: "<<strerror(errno)<<endl; common_log_file("INFO","t+convertInt(kid),global_log_file); return ret; } */ struct stat st; if((lstat(info_path.c_str(),&st)!=0 )) { if(suc){ result_log_file((queue_front+" "+info_path+" "+BAK_UIN_PATH+" NOT_LOCALMIUSER_SUCC\n").c_str(),global_result_file); common_log_file("INFO","tid="+uuid_str_tmp+" number="+queue_front+" is not localmiuser",global_log_file); return 1; } cerr<<"stat64 file:"<<info_path<<" error:"<<strerror(errno)<<endl; common_log_file("ERROR","tid="+uuid_str_tmp+" number="+queue_front+" Failed to lsstat:"+info_path+" ERROR="+strerror(errno)+" kid="+convertInt(kid),global_log_file); return -1; } if(!S_ISREG(st.st_mode)){ cerr<<"st.st_mode file:"<<info_path<<" error:"<<strerror(errno)<<endl; common_log_file("ERROR","tid="+uuid_str_tmp+" number="+queue_front+" Failed to st.mode:"+info_path+" ERROR="+strerror(errno)+" kid="+convertInt(kid),global_log_file); return -1; } int32_t fd_r_open=open(info_path.c_str(),O_RDONLY); int32_t fd_w_open=open(BAK_UIN_PATH.c_str(),O_WRONLY|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IROTH ); if(fd_r_open <0){ close(fd_r_open); close(fd_w_open); cerr<<"failed to open src_file:"<<info_path<<" error:"<<strerror(errno)<<endl; common_log_file("ERROR","tid="+uuid_str_tmp+" number="+queue_front+" Failed open file src_mi_path="+src_mi_path+" ERROR="+strerror(errno)+" kid="+convertInt(kid),global_log_file); return -1; } if(fd_w_open <0){ close(fd_r_open); close(fd_w_open); cerr<<"failed to open bak_uin_file:"<<BAK_UIN_PATH<<" error:"<<strerror(errno)<<endl; common_log_file("ERROR","tid="+uuid_str_tmp+" number="+queue_front+" Failed open file BAK_UIN_PATH="+BAK_UIN_PATH+" ERROR="+strerror(errno)+" kid="+convertInt(kid),global_log_file); //return -1; exit(1); } ssize_t x_single_read=0; ssize_t x_single_write=0; unsigned long long write_size=0; char buf[BUF]; while(1){ memset(buf,'\0',BUF); x_single_read=read(fd_r_open,buf,sizeof(char)*BUF); if(x_single_read<0){ cerr<<"read file return<0 for file"<<strerror(errno)<<endl; common_log_file("ERROR","tid="+uuid_str_tmp+" number="+queue_front+" Failed Read return<0 file src_mi_path="+src_mi_path+" ERROR="+strerror(errno)+" kid="+convertInt(kid),global_log_file); close(fd_r_open); close(fd_w_open); return -1; } if(x_single_read==0){ break;} x_single_write=write(fd_w_open,buf,sizeof(char)*x_single_read); if(x_single_write<0){ cerr<<"write file return<0 for file:"<<BAK_UIN_PATH<<" error:"<<strerror(errno)<<endl; common_log_file("ERROR","tid="+uuid_str_tmp+" number="+queue_front+" Failed write return<0 file BAK_UIN_PATH="+BAK_UIN_PATH+" ERROR="+strerror(errno)+" kid="+convertInt(kid),global_log_file); close(fd_r_open); close(fd_w_open); return -1; } write_size=write_size+x_single_write; } close(fd_r_open); close(fd_w_open); if(write_size != st.st_size){ common_log_file("ERROR","tid="+uuid_str_tmp+" number="+queue_front+" write_size != st.size"+" w_f="+BAK_UIN_PATH+" w_size="+convertIntlong(write_size)+" r_file="+info_path+" st.size="+convertIntlong(st.st_size),global_log_file); return -1; } // result_log_file((queue_front+" "+info_path+" "+BAK_UIN_PATH+" LOCALMIUSER_SUCC\n").c_str(),global_result_file); common_log_file("INFO","tid="+uuid_str_tmp+" number="+queue_front+" SUCC write_size=st.size"+" w_f="+BAK_UIN_PATH+" w_size="+convertIntlong(write_size)+" r_file="+info_path+" st.size="+convertIntlong(st.st_size),global_log_file); string tmp_str=""; char buff_tmp[1024]; if(g_md5flag){ string md5sum_str="md5sum "+info_path+" "+BAK_UIN_PATH; FILE *fstream_tmp=NULL; if(NULL==(fstream_tmp=popen(md5sum_str.c_str(),"r"))){ fprintf(stderr,"tid=%s execute command failed: %s",uuid_str_tmp.c_str(),strerror(errno)); common_log_file("ERROR","tid="+uuid_str_tmp+" md5sum="+md5sum_str+" kid="+convertInt(kid),global_log_file); return -1; } memset(buff_tmp,0,sizeof(buff_tmp)); while(NULL!=fgets(buff_tmp, sizeof(buff_tmp), fstream_tmp)&& !feof(fstream_tmp)){ //tmp_str=tmp_str+buff_tmp; buff_tmp[strlen(buff_tmp)-1]='\0'; tmp_str=tmp_str+buff_tmp+" "; //common_log_file("INFO","t+convertInt(kid),global_log_file); } pclose(fstream_tmp); common_log_file("INFO","tid="+uuid_str_tmp+" md5sum="+tmp_str+" kid="+convertInt(kid),global_log_file); //result_log_file((queue_front+" "+info_path+" "+BAK_UIN_PATH+" "+tmp_str+" LOCALMIUSER_SUCC\n").c_str(),global_result_file); } //result_log_file((queue_front+" "+info_path+" "+BAK_UIN_PATH+" LOCALMIUSER_SUCC\n").c_str(),global_result_file); //unlink if(unlink(info_path.c_str())<0){ cerr<<" unlink "<<info_path<<" error:"<<strerror(errno)<<endl; common_log_file("ERROR","tid="+uuid_str_tmp+" unlink="+info_path+" error:"+strerror(errno)+" kid="+convertInt(kid),global_log_file); result_log_file((queue_front+" "+info_path+" "+BAK_UIN_PATH+" "+tmp_str+" LOCALMIUSER_SUCC NOT UNLINK\n").c_str(),global_result_file); return -1; } common_log_file("INFO","tid="+uuid_str_tmp+" unlink="+info_path+" SUCC kid="+convertInt(kid),global_log_file); result_log_file((queue_front+" "+info_path+" "+BAK_UIN_PATH+" "+tmp_str+" LOCALMIUSER_SUCC Y UNLINK\n").c_str(),global_result_file); } }else{ if(suc){ result_log_file((queue_front+" NOT_LOCALMIUSER_SUCC\n").c_str(),global_result_file); } } // pclose(fstream); return ret; } int32_t MASHBD_THREAD_TOP::ThreadTop::create_thread(void *(*funcme)(void *args),int32_t tran_id) { pthread_t *files=new pthread_t[1]; pthread_attr_t *filed=new pthread_attr_t[1]; ////////new add size_t stacksize; struct rlimit rlim; pthread_attr_init(filed); pthread_attr_getstacksize(filed, &stacksize); getrlimit(RLIMIT_STACK, &rlim); //cout<<"Thread current stack size: "<<(size_t)rlim.rlim_cur<<endl; //cout<<"Thread real stack size: "<<stacksize<<endl; stacksize=(size_t)rlim.rlim_cur; pthread_attr_setstacksize(filed, stacksize); ///end pthread_attr_setdetachstate(filed,PTHREAD_CREATE_DETACHED); if(tran_id >=0){ struct workapp_id * wid=(struct workapp_id *)malloc(sizeof(struct workapp_id)); wid->kid=tran_id; if(pthread_create(files,filed,funcme,wid)!=0){ common_log_file("ThreadCreate","Here create thread failed",global_log_file); } } else{ if(pthread_create(files,filed,funcme,NULL)!=0){ common_log_file("ThreadCreate","Here create thread failed",global_log_file); } } return 0; } int32_t thread_epoll_handle_me(int32_t kid){ //string queue_front ; while(1){ string queue_front ; string tmp; char tmp_buf[50]; string uuid_str; string uuid_str_tmp; for (int32_t i=0; i<4;i++){ snprintf(tmp_buf,50,"%02X",(rand()%255)/15); tmp.append(tmp_buf); } uuid_str=get_uuid(); uuid_str_tmp=uuid_str+"-"+tmp; unsigned long long ssize=0; unsigned mod_q=0; try{ pthread_mutex_lock(&map_fileio_mutex); //pthread_cond_wait(&g_cond,&map_fileio_mutex); //bool iret =global_qe.empty(); if(!global_qe.empty()){ srand( (unsigned)time( NULL ) ); mod_q=((rand()+target_mivol_num)% target_mivol_num)+1 ; if (mod_q <1 || mod_q > target_mivol_num ){ cerr<<" mod_q <1 or > "<< target_mivol_num<<endl; common_log_file("ERROR","mod_q <1 or >"+convertIntlong(mod_q),global_log_file); exit(1); } /////pthread_cond_wait(&g_cond,&map_fileio_mutex); queue_front=global_qe.front(); ssize=global_qe.size(); cout <<ssize<<" do pop****************** qis:"<<queue_front<<" target_mivol_rand:"<<mod_q<<endl; global_qe.pop(); pthread_cond_signal(&g_cond); common_log_file("INFO","tid="+uuid_str_tmp+" size="+convertIntlong(ssize)+" oldqueue_front="+queue_front+" opration=pop vol_id="+convertIntlong(mod_q)+" kid="+convertInt(kid),global_log_file); cout<<"bbb queue_front"<<queue_front<<endl; if(queue_front.compare("")==0){ common_log_file("INFO","tid="+uuid_str_tmp+" line="+convertIntlong(global_line)+" size="+convertIntlong(ssize)+" queue_front=EMPT Y_2 vol_id="+convertIntlong(mod_q)+" kid="+convertInt(kid),global_log_file); pthread_mutex_unlock(&map_fileio_mutex); continue; } pthread_mutex_unlock(&map_fileio_mutex); }else{ //common_log_file("ERROR","ssize>0 but empty=" ,global_log_file); pthread_mutex_unlock(&map_fileio_mutex); continue; } }catch(std::exception& e){ string ex=e.what(); cerr <<e.what()<<endl; common_log_file("ERROR","EXCEPTION="+ex,global_log_file); continue; } common_log_file("INFO","tid="+uuid_str_tmp+" line="+convertIntlong(global_line)+" size="+convertIntlong(ssize)+" queue_front="+queue_front+" opration=front kid="+convertInt(kid),global_log_file); cout << "the front queue is :"<<queue_front<<" kid:"<<kid<<endl; if(queue_front.length()>1){ int ret =do_command(kid,uuid_str_tmp,queue_front,mod_q); if(ret <0){ cerr<<uuid_str_tmp<<":"<<ret<<endl; } continue; } continue; } return 0; } void *work_thread(void *args) { int32_t kid=((struct workapp_id *)args)->kid; thread_epoll_handle_me(kid); common_log_file("ERROR","thread_epoll_handle_me=out kid="+convertInt(kid),global_log_file); pthread_exit(NULL); } int32_t MASHBD_WORK::WORK_thread::dome () { //this.cs_tok=global_thread_number; for(int32_t i=0;i <cs_tok;i++){ create_thread(&work_thread,i); } return 0; }