编绎:
g++  -g   -lpthread -luuid  2.cpp commfunc.cpp threadinit.cpp   -o move_mi_org

 说明:

    并发调用迁移用户工具脚本c++ 基于邮件系统工具的调用.由于NNDD 工具一次调用需要1S左右,有几亿用户需要迁移,则何年马月才能搞完。需要并发,叫研发搞则是无可能的,没有kpi啊。

    1)2为mail.cpp  commfunc.cpp 为函数通用文件  threadinit .cpp 为线程文件。

作用: 迁移用户从一台机器到另一个机器

 ./move_mi_org 
./move_mi_org file_name  global_thread_number local_flag target_minodeid target_mivol_num target_orgid
file_name:号码文件
global_thread_number:处理线程数量
local_flag:是否在本地mi 1:是[会执行mv操作] 0:不是
target_minodeid:目标mi的nodeid
target_mivol_num:目标mi挂载卷总数
target_orgid:目标ORG
example as below:::::::::::::::
./move_mi_org filename 10 0 175 8 1
-----------------------------------------xiao-------------------------------
2.cpp 


#include <iostream>
#include <cstdio>
#include <stdint.h>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <ctype.h>
#include <stdarg.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
#include <sys/types.h>
#include <sched.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netdb.h>
#include <dirent.h>
#include <syslog.h>
#include <sys/time.h>
#include <net/if.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdexcept>
#include <queue>
#include <string>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <sys/resource.h>
#include <sys/ioctl.h>
#include <utility>
#include <map>
#include <fstream>
#include <sstream>
#include "threadinit.hpp"
#include "commfunc.hpp"
#include "GetTime.hpp"
//#include <mutex>
//#include <condition_variable>
#define BUF 15

using namespace std;
queue<string> global_qe;
int32_t global_thread_number;
ofstream global_log_file;
ofstream global_result_file;
ofstream result_folder_log_file;
int32_t is_log=1;
string global_startday;
string global_endday;
int32_t global_select_type;
bool global_folder_detail;
char global_sz_delist_file[1024]={0};


pthread_mutex_t map_fileio_mutex=PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t log_mutex=PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t log_result_mutex =PTHREAD_MUTEX_INITIALIZER;
//std::condition_variable data_con; 
//mutable std::mutex mut;  
pthread_cond_t g_cond =PTHREAD_COND_INITIALIZER ;
bool g_local_flag=false;
bool g_md5flag=true;
struct sigaction oldact;
unsigned long long global_line;
string target_minodeid;
int32_t target_mivol_num;
string target_orgid;
string bak_uin;

GenTime gettime;

int32_t operate_kernel_sigact()
{
        struct sigaction act;
        sigemptyset(&act.sa_mask);
        act.sa_handler=SIG_IGN;
        act.sa_flags |= SA_NOCLDWAIT;
        if(sigaction(SIGCHLD,&act,&oldact) < 0){
                return -2;
        }
        return 0;
}

int32_t operate_kernel_sigset()
{
        sigset_t setnew,setold;
        sigfillset(&setnew);
        sigdelset(&setnew,SIGCHLD);
        sigprocmask(SIG_BLOCK, &setnew, NULL);
        return 0;
}


void killfunc(int sig){

cerr <<"sig  happen"<<endl;

}
void handler(int sig)
{
    printf("capture a SIGALRM signal %d \n",sig);
}
void help(char *s ){
        cout <<s<<" file_name  global_thread_number local_flag target_minodeid target_mivol_num target_orgid"<<endl;
    cout <<"file_name:号码文件"<<endl;
    cout <<"global_thread_number:处理线程数量"<<endl;
    cout <<"local_flag:是否在本地mi 1:是[会执行mv操作] 0:不是"<<endl;
    cout <<"target_minodeid:目标mi的nodeid"<<endl;
    cout <<"target_mivol_num:目标mi挂载卷总数"<<endl;
        cout <<"target_orgid:目标ORG"<<endl;
        cout <<"example as below:::::::::::::::"<<endl;
        cout <<s<<" filename 10 0 175 8 1"<<endl;
    cout <<"-----------------------------------------xiao-------------------------------"<<endl;
        exit(1);
}
int main(int argc,char* argv[]) {
        if(argc != 7){
                        help(argv[0]);
    }
        /*create daemon process.

                        signal(SIGHUP,SIG_IGN);
                        pid_t daemon_pid=fork();
                        if(daemon_pid==-1){
                                        cerr<<"Error to create daemon process ."<<endl;
                                        exit(1);
                        }
                        if(daemon_pid>0){
                                        exit(0);
                        }
                        if(daemon_pid==0){
                                        setsid();
                        umask(0);
                        }
                daemon_pid=fork();
                        if(daemon_pid==-1){
                                        cerr<<"Error to create daemon process ."<<endl;
                                        exit(1);
                        }
                        if(daemon_pid>0){
                                        exit(0);
                        }
                operate_kernel_sigact();
        operate_kernel_sigset();
       */
                struct rlimit limit;
                limit.rlim_cur = RLIM_INFINITY;
                limit.rlim_max = RLIM_INFINITY;
                if(setrlimit(RLIMIT_CORE, &limit)<0){
                        cerr<<"Error to set rlimit"<<endl;
                }
                limit.rlim_cur = 102400;
                limit.rlim_max = 102400;
                if(setrlimit(RLIMIT_NOFILE, &limit)<0){
                        cerr<<"Error to set rlimit"<<endl;
                }
                limit.rlim_cur = RLIM_INFINITY;
                limit.rlim_max = RLIM_INFINITY;
                if(setrlimit(RLIMIT_AS, &limit)<0){
                        cerr<<"Error to set rlimit"<<endl;
                }
                limit.rlim_cur = 10485760;
                limit.rlim_max = 10485760;
                if(setrlimit(RLIMIT_STACK, &limit)<0){
                        cerr<<"Error to set rlimit"<<endl;
                }



                // a.out file_number thread_number
        char buf[BUF];
        int32_t fd_open;
        char* file_name=argv[1];
                global_thread_number=atoi(argv[2]);
                //int32_t data_queue=500;
                int32_t data_queue=1;
                int32_t local_flag=atoi(argv[3]);
                target_minodeid=argv[4];
                target_mivol_num=atoi(argv[5]);
            target_orgid=argv[6];
                for(int32_t i=0; i<argc;i++){
                        cout <<"argv["<<i<<"]:"<<argv[i]<<endl;

                }
                struct stat64 sst;
                struct stat st;
        bak_uin="/home/move_all/bak_uin/";
        if(stat(bak_uin.c_str(),&st) <0){
            cerr <<"error:"<<bak_uin<<endl;
           if( mkdir((bak_uin+"dat/").c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH)== -1){
                cerr<<"create dir:"<<bak_uin<<" failed"<<endl;
                exit(1);
            }
        }
                if(!(S_ISDIR(st.st_mode))){
            cerr <<"errordir:"<<bak_uin<<endl;
            exit(1);
        }
                if (local_flag == 0) {
           g_local_flag=false;

                }else if( local_flag == 1 ){
                        g_local_flag=true;
                }else{
                 cerr<<"local_flag error "<<endl;
                 exit(1);
                }


                //exit(0);
                static  char bufff[1024]={0};
                int32_t count=readlink("/proc/self/exe",bufff,sizeof(bufff));
        if(count <0 || count >=(int)sizeof(bufff)){
                memset(bufff, 0, sizeof(buf));

        }
        bufff[count]='\0';
                char *p=NULL;
                p=strrchr(bufff,'/');
                *p='\0';

                sprintf(global_sz_delist_file,"%s/%s",bufff,"del_sender_list");
                cout << global_sz_delist_file<<endl;
                char *today_date=gettime.get_ymd2_time();
                string tmplog_var=bak_uin+"tmplog."+string(today_date);
                string result_file_var=bak_uin+"result_file."+string(today_date);
                global_log_file.open(tmplog_var.c_str(),ios_base::app);
                if(!global_log_file){
                cerr<<"Error log file path or some errors occured"<<endl;
                cerr<<strerror(errno)<<endl;
                exit(1);
                }
                global_result_file.open(result_file_var.c_str(),ios_base::app);
                if(!global_result_file){
                cerr<<"global_result_filelog file path or some errors occured"<<endl;
                exit(1);
                }

                char* tmp_file="tmp_file";
        cout <<file_name<<endl;
        //fd_open=open(file_name,O_RDONLY);
        FILE * pNumberFile;
        pNumberFile=fopen(file_name,"r");
                if (pNumberFile == NULL) {
                        perror ("Error opening file");
                        exit(1);
                }
                char* tmp;
        string str,str_que;
        unsigned long long line=0;
       // if(fd_open<0){
         //       cout <<"error"<<endl;
        //}
        unsigned long long x_single_read=0;
                unsigned long long x_read=0;
        //
        char buff[1024];
         memset(buff,'\0',1024);
int fd_rwtmp_open=open(tmp_file,O_RDWR|O_CREAT,S_IWUSR|S_IRUSR );
        cout <<fd_rwtmp_open<<endl;
        read(fd_rwtmp_open,buff,sizeof(char)*1024);
        cout <<"aaaaa:"<<buff<<endl;
        close(fd_rwtmp_open);
        std::vector<std::string> vct;
        SplitStringIntoVector(buff,"@",vct);
        for (int i=0; i<vct.size();++i){
                cout <<i<<":"<<vct[i]<<endl;
        }

                MASHBD_WORK::WORK_thread  mw(global_thread_number);
                mw.dome();
        if(vct.size()==3){
                /*if(lseek(fd_open,atoll(vct[1].c_str()),SEEK_SET)<0){
                        cerr<<"lseek"<<strerror(errno)<<endl;
                }*/
                if(fseek(pNumberFile,atoll(vct[1].c_str()),SEEK_SET) != 0){
                        cerr<<"fseek error"<<strerror(errno)<<endl;
                        exit(1);
                }
                line=atoll(vct[0].c_str()) ;
                global_line=line;
                x_read=x_read+atoll(vct[1].c_str());
                unsigned long long number= atoll(vct[2].c_str());
                string recode=convertIntlong(line+1)+"@"+convertIntlong(x_read)+"@"+convertIntlong(number);
                common_log_file("INFO","CONTINU_START_TMP_FILE="+recode,global_log_file);
        }



                while(fgets(buf,sizeof(buf),pNumberFile) != NULL && !feof(pNumberFile)){

                        /* day rotate*/
                        char *now_date=gettime.get_ymd2_time();
                        if(strcasecmp(today_date,now_date)!=0){
                                delete [] today_date;
                                today_date=now_date;
                                global_log_file.close();
                                global_result_file.close();
                                //string tmplog_var="tmplog."+string(today_date);
                                //string result_file_var="result_file."+string(today_date);
                                string tmplog_var=bak_uin+"tmplog."+string(today_date);
                                string result_file_var=bak_uin+"result_file."+string(today_date);
                                global_log_file.open(tmplog_var.c_str(),ios_base::app);
                                global_result_file.open(result_file_var.c_str(),ios_base::app);
                                if(!global_log_file){
                                        cerr<<"Error to open result file to exchage day write results"<<endl;
                                        continue;
                                }
                                if(!global_result_file){
                                        cerr<<"Error to open result file to exchage day write results"<<endl;
                                        continue;
                                }
                        }
                        x_read=x_read+strlen(buf);
                        buf[strlen(buf)-1]='\0';
                //      cout <<"line:"<<line+1<<" len:"<<strlen(buf)<<" content:"<<buf<<endl;
                        string tmpbuf=buf;
                        int fd_w_open;
                        while(1){
             if(global_qe.size()>data_queue){
                //cerr<<"warn queue more than "<<data_queue<<" "<<"size:"<<global_qe.size()<<" loopwait"<<endl;
                usleep(5000);
                continue;
                }else{
                                // pthread_mutex_lock(&map_fileio_mutex);
                //std::unique_lock<std::mutex> lk(mut);                
                global_qe.push(tmpbuf);
                                //data_con.notify_one();  
                        //      pthread_cond_signal(&g_cond);
                        //      pthread_mutex_unlock(&map_fileio_mutex);
                                fd_w_open=open(tmp_file,O_WRONLY|O_CREAT|O_TRUNC,S_IRWXU | S_IXGRP | S_IRGRP | S_IROTH | S_IXOTH);
                string recode=convertIntlong(line+1)+"@"+convertIntlong(x_read)+"@"+tmpbuf;
                common_log_file("INFO","INSERT_TMP_FILE="+recode,global_log_file);
                unsigned long long x_single_write=write(fd_w_open,recode.c_str(),recode.length());
                if(x_single_write<0){
                        cerr<<"write error"<<endl;
                    exit(1);
                }
                global_line=line+1;
                                close(fd_w_open);
                                cout <<"write_recode:"<<recode<<" fd_w_open:"<<fd_w_open<<endl;
                                break;
                }
                        }
                        cout <<"line:"<<line+1<<" len:"<<strlen(buf)<<" content:"<<buf<<endl;
                        ++line;
                }
                delete [] today_date;
    sleep(60);
        cout <<"out<<"<<endl;
    common_log_file("WARN","MAIN FINISH BEFORE OUT CCCCCCCCCCCCCCCCCCCCCCCCCCCC",global_log_file);
        return 0;
}
cat threadinit.cpp
//#include "HttpTcp.hpp"
#include  "threadinit.hpp"
#include <iostream>
#include <cstdio>
#include <ctime>
#include <string>
#include <cstdlib>
#include <cstring>
#include <fstream>
#include <vector>
#include <map>
#include <unistd.h>
#include <pthread.h>
#include <queue>
#include <sstream>
#include <sys/epoll.h>
#include <sys/stat.h> 
#include <sys/types.h>
#include "commfunc.hpp"
#define BUF 65536

using namespace std;
struct workapp_id{
        int32_t kid;
};
int32_t do_command(int32_t kid,string uuid_str_tmp,string queue_front,unsigned mod_q){
        int32_t g_select_count =1;
        int32_t ret=1;
        int32_t g_flag;
        string system_str="";
        string cmdname="";
        bool ret_flag=false;
                
                 system_str="/home/richmail/bin/NNDD--move-user-to-miid " +queue_front+" \"miNodeId="+target_minodeid+"&miNodeVolId="+convertIntlong(mod_q)+"&orgid="+target_orgid+"\"";
                                 cmdname="--move-user-to-miid";
                                ret_flag=true;


                                        FILE *fstream=NULL;
                                        char buff[1024];
                                        memset(buff,0,sizeof(buff));
                                        string logout="/home/richmail/bin/NNDDNAME "+queue_front+" &";
                                        system(logout.c_str());
                    cout<<"cmd="<<system_str<<endl;
                                //      return 0;
                                        if(NULL==(fstream=popen(system_str.c_str(),"r")))
                                        {
                                        fprintf(stderr,"tid=%s execute command failed: %s",uuid_str_tmp.c_str(),strerror(errno));
                                        common_log_file("ERROR","tid="+uuid_str_tmp+" popenfstream="+string(strerror(errno))+" error="+convertInt(errno)+" cmd="+system_str+" queue_front="+queue_front+" kid="+convertInt(kid),global_log_file);
                                        ret= -1;
                                        }
                                        common_log_file("INFO","tid="+uuid_str_tmp+ " cmd="+system_str+" kid="+convertInt(kid),global_log_file);
                                        string tmp_count="";
                                        while(NULL!=fgets(buff, sizeof(buff), fstream)&& !feof(fstream))
                                        {
                                                string tmpb=queue_front+" "+buff;
                                                tmp_count=tmp_count+buff;
                                                common_log_file("INFO","tid="+uuid_str_tmp+" result= "+trim_string(tmpb)+" vol_id="+convertIntlong(mod_q)+" kid="+convertInt(kid),global_log_file);
                                                //result_log_file(tmpb.c_str(),global_result_file);
                                        }
                                        if(tmp_count.compare("")==0 ){

                                                string queue_front1=queue_front+" "+cmdname +" nothing putout\r\n";
                                                                result_log_file(queue_front1.c_str(),global_result_file);
                                                pclose(fstream);
                                                return ret ;
                                        }
                                        //src_mi_vol_id=1| src_mi_path=01/52/8/l960967432.dat| dst_res_id=1
                                        pclose(fstream);

                                        std::size_t found1 = tmp_count.find("src_mi_path=");
                                        std::size_t found2 = tmp_count.find("| dst_res_id=");
                                        std::size_t found3 = tmp_count.find("success");
                                        bool suc=false;
                                        if(found3!=std::string::npos){
                                                suc=true;
                                        }else{
                                                result_log_file((queue_front+" NONE NONE RICHMAIL_ERROR\n").c_str(),global_result_file);
                                        }

                                        if(g_local_flag == 1){
                                                if (found1!=std::string::npos && found2!=std::string::npos && found3!=std::string::npos){

                                                        string src_mi_path=tmp_count.substr(found1+12,found2-found1-12);
                                                        std::size_t found = src_mi_path.find_last_of("/");
                                                        string dat=src_mi_path.substr(found+1);

                                                        cerr<<"src_mi_path:"<<src_mi_path<<" only-dat:"<<dat<<endl;
                                                        // mv src_mi_path
                                                        string  BAK_UIN_PATH=bak_uin+"dat/"+dat;
                                                        string  info_path="/nfsdata/mi/midata_"+src_mi_path;
                                        /*              if(rename(info_path.c_str(),BAK_UIN_PATH.c_str())<0){
                                                                cerr<<"rename "<<info_path<<","<<BAK_UIN_PATH<<" error: "<<strerror(errno)<<endl;
                                                                common_log_file("INFO","t+convertInt(kid),global_log_file);
                                                                return ret;
                                                        }
                                        */
                                                        struct stat st;
                                                        if((lstat(info_path.c_str(),&st)!=0 )) {
                                                                if(suc){
                                                                        result_log_file((queue_front+" "+info_path+" "+BAK_UIN_PATH+" NOT_LOCALMIUSER_SUCC\n").c_str(),global_result_file);
                                                                        common_log_file("INFO","tid="+uuid_str_tmp+" number="+queue_front+" is not localmiuser",global_log_file);
                                                                    return 1;
                                                                }
                                                                cerr<<"stat64 file:"<<info_path<<" error:"<<strerror(errno)<<endl;
                                                                common_log_file("ERROR","tid="+uuid_str_tmp+" number="+queue_front+" Failed to lsstat:"+info_path+" ERROR="+strerror(errno)+" kid="+convertInt(kid),global_log_file);
                                                                return -1;
                                                        }
                                                        if(!S_ISREG(st.st_mode)){
                                                                cerr<<"st.st_mode file:"<<info_path<<" error:"<<strerror(errno)<<endl;
                                                                common_log_file("ERROR","tid="+uuid_str_tmp+" number="+queue_front+" Failed to st.mode:"+info_path+" ERROR="+strerror(errno)+" kid="+convertInt(kid),global_log_file);
                                                                return -1;
                                                        }
                                                        int32_t fd_r_open=open(info_path.c_str(),O_RDONLY);
                                                        int32_t fd_w_open=open(BAK_UIN_PATH.c_str(),O_WRONLY|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IROTH );
                                                        if(fd_r_open <0){
                                                                close(fd_r_open);
                                                                close(fd_w_open);
                                                                cerr<<"failed to open src_file:"<<info_path<<" error:"<<strerror(errno)<<endl;
                                                                common_log_file("ERROR","tid="+uuid_str_tmp+" number="+queue_front+" Failed open file src_mi_path="+src_mi_path+" ERROR="+strerror(errno)+" kid="+convertInt(kid),global_log_file);
                                                                return -1;
                                                        }
                                                        if(fd_w_open <0){
                                                                close(fd_r_open);
                                close(fd_w_open);
                                cerr<<"failed to open bak_uin_file:"<<BAK_UIN_PATH<<" error:"<<strerror(errno)<<endl;
                                common_log_file("ERROR","tid="+uuid_str_tmp+" number="+queue_front+" Failed open file BAK_UIN_PATH="+BAK_UIN_PATH+" ERROR="+strerror(errno)+" kid="+convertInt(kid),global_log_file);
                                                                //return -1;
                                                                exit(1);
                            }
                                                        ssize_t x_single_read=0;
                                                        ssize_t x_single_write=0;
                                                        unsigned long long write_size=0;
                                                        char buf[BUF];

                                                        while(1){
                                                                memset(buf,'\0',BUF);
                                                                x_single_read=read(fd_r_open,buf,sizeof(char)*BUF);
                                                                if(x_single_read<0){
                                                                        cerr<<"read file return<0 for file"<<strerror(errno)<<endl;
                                                                        common_log_file("ERROR","tid="+uuid_str_tmp+" number="+queue_front+" Failed Read  return<0 file src_mi_path="+src_mi_path+" ERROR="+strerror(errno)+" kid="+convertInt(kid),global_log_file);
                                                                        close(fd_r_open);
                                        close(fd_w_open);
                                                                        return  -1;
                                                                }

                                                                if(x_single_read==0){ break;}
                                                                x_single_write=write(fd_w_open,buf,sizeof(char)*x_single_read);
                                                                if(x_single_write<0){
                                                                        cerr<<"write file return<0 for file:"<<BAK_UIN_PATH<<" error:"<<strerror(errno)<<endl;
                                                                         common_log_file("ERROR","tid="+uuid_str_tmp+" number="+queue_front+" Failed write  return<0 file BAK_UIN_PATH="+BAK_UIN_PATH+" ERROR="+strerror(errno)+" kid="+convertInt(kid),global_log_file);
                                                                        close(fd_r_open);
                                        close(fd_w_open);
                                                                        return -1;
                                                                }
                                                                write_size=write_size+x_single_write;
                                                        }
                                                        close(fd_r_open);
                                                        close(fd_w_open);
                                                        if(write_size != st.st_size){

                                                                common_log_file("ERROR","tid="+uuid_str_tmp+" number="+queue_front+" write_size != st.size"+" w_f="+BAK_UIN_PATH+" w_size="+convertIntlong(write_size)+" r_file="+info_path+" st.size="+convertIntlong(st.st_size),global_log_file);
                                                                return -1;
                                                        }
                                                        //      result_log_file((queue_front+" "+info_path+" "+BAK_UIN_PATH+" LOCALMIUSER_SUCC\n").c_str(),global_result_file);
                                                                common_log_file("INFO","tid="+uuid_str_tmp+" number="+queue_front+" SUCC write_size=st.size"+" w_f="+BAK_UIN_PATH+" w_size="+convertIntlong(write_size)+" r_file="+info_path+" st.size="+convertIntlong(st.st_size),global_log_file);
                                                        string tmp_str="";
                            char buff_tmp[1024];
                                                        if(g_md5flag){
                                                        string md5sum_str="md5sum "+info_path+" "+BAK_UIN_PATH;
                                                        FILE *fstream_tmp=NULL;
                                                        if(NULL==(fstream_tmp=popen(md5sum_str.c_str(),"r"))){
                                                                fprintf(stderr,"tid=%s execute command failed: %s",uuid_str_tmp.c_str(),strerror(errno));
                                                                common_log_file("ERROR","tid="+uuid_str_tmp+" md5sum="+md5sum_str+" kid="+convertInt(kid),global_log_file);
                                                                return -1;
                                                        }
                                                        memset(buff_tmp,0,sizeof(buff_tmp));
                                                        while(NULL!=fgets(buff_tmp, sizeof(buff_tmp), fstream_tmp)&& !feof(fstream_tmp)){
                                                                //tmp_str=tmp_str+buff_tmp;
                                                                buff_tmp[strlen(buff_tmp)-1]='\0';
                                                            tmp_str=tmp_str+buff_tmp+" ";
                                                                //common_log_file("INFO","t+convertInt(kid),global_log_file);
                                                        }
                                                        pclose(fstream_tmp);
                                                        common_log_file("INFO","tid="+uuid_str_tmp+" md5sum="+tmp_str+" kid="+convertInt(kid),global_log_file);
                                                        //result_log_file((queue_front+" "+info_path+" "+BAK_UIN_PATH+" "+tmp_str+" LOCALMIUSER_SUCC\n").c_str(),global_result_file);
                                                        }
                                                        //result_log_file((queue_front+" "+info_path+" "+BAK_UIN_PATH+" LOCALMIUSER_SUCC\n").c_str(),global_result_file);
                                                        //unlink 
                                                    if(unlink(info_path.c_str())<0){
                                                                cerr<<" unlink "<<info_path<<" error:"<<strerror(errno)<<endl;
                                                                common_log_file("ERROR","tid="+uuid_str_tmp+" unlink="+info_path+" error:"+strerror(errno)+" kid="+convertInt(kid),global_log_file);
                                                                result_log_file((queue_front+" "+info_path+" "+BAK_UIN_PATH+" "+tmp_str+" LOCALMIUSER_SUCC NOT UNLINK\n").c_str(),global_result_file);
                                                                return -1;
                                                        }
                                                        common_log_file("INFO","tid="+uuid_str_tmp+" unlink="+info_path+" SUCC kid="+convertInt(kid),global_log_file);
                                                        result_log_file((queue_front+" "+info_path+" "+BAK_UIN_PATH+" "+tmp_str+" LOCALMIUSER_SUCC Y UNLINK\n").c_str(),global_result_file);
 


                                                }
                                        }else{
                                                if(suc){
                                                        result_log_file((queue_front+" NOT_LOCALMIUSER_SUCC\n").c_str(),global_result_file);
                                                }
                                        }
                        //              pclose(fstream);

        return ret;

}
int32_t MASHBD_THREAD_TOP::ThreadTop::create_thread(void *(*funcme)(void *args),int32_t tran_id)
{
        pthread_t *files=new pthread_t[1];
        pthread_attr_t *filed=new pthread_attr_t[1];
        ////////new add
        size_t stacksize;
        struct rlimit rlim;
        pthread_attr_init(filed);
        pthread_attr_getstacksize(filed, &stacksize);
        getrlimit(RLIMIT_STACK, &rlim);
        //cout<<"Thread current stack size: "<<(size_t)rlim.rlim_cur<<endl;
        //cout<<"Thread real stack size: "<<stacksize<<endl;
        stacksize=(size_t)rlim.rlim_cur;
        pthread_attr_setstacksize(filed, stacksize);
        ///end
        pthread_attr_setdetachstate(filed,PTHREAD_CREATE_DETACHED);
        if(tran_id >=0){
                struct workapp_id * wid=(struct workapp_id *)malloc(sizeof(struct workapp_id));
                wid->kid=tran_id;
                if(pthread_create(files,filed,funcme,wid)!=0){
                        common_log_file("ThreadCreate","Here create thread failed",global_log_file);
                }

        }
        else{
                if(pthread_create(files,filed,funcme,NULL)!=0){
                        common_log_file("ThreadCreate","Here create thread failed",global_log_file);
                }
        }
        return 0;
}

int32_t thread_epoll_handle_me(int32_t kid){
        //string queue_front ;

        while(1){

                string queue_front ;
                string tmp;
                char tmp_buf[50];
                string uuid_str;
                string uuid_str_tmp;
                for (int32_t i=0; i<4;i++){
        snprintf(tmp_buf,50,"%02X",(rand()%255)/15);
        tmp.append(tmp_buf);
        }
                uuid_str=get_uuid();
        uuid_str_tmp=uuid_str+"-"+tmp;

                unsigned long long ssize=0;
                unsigned mod_q=0;
                        try{
                                pthread_mutex_lock(&map_fileio_mutex);
                                //pthread_cond_wait(&g_cond,&map_fileio_mutex); 
                //bool iret =global_qe.empty();
                                if(!global_qe.empty()){
                                        srand( (unsigned)time( NULL ) );
                    mod_q=((rand()+target_mivol_num)% target_mivol_num)+1 ;
                                        if (mod_q <1 || mod_q > target_mivol_num ){
                                                cerr<<" mod_q <1 or > "<< target_mivol_num<<endl;
                                                common_log_file("ERROR","mod_q <1 or >"+convertIntlong(mod_q),global_log_file);
                                                exit(1);
                                        }
                                        /////pthread_cond_wait(&g_cond,&map_fileio_mutex); 
                                        queue_front=global_qe.front();
                    ssize=global_qe.size();
                                        cout <<ssize<<" do pop****************** qis:"<<queue_front<<" target_mivol_rand:"<<mod_q<<endl;
                                        global_qe.pop();
                                        pthread_cond_signal(&g_cond);
                                        common_log_file("INFO","tid="+uuid_str_tmp+" size="+convertIntlong(ssize)+" oldqueue_front="+queue_front+" opration=pop vol_id="+convertIntlong(mod_q)+" kid="+convertInt(kid),global_log_file);
                                                cout<<"bbb  queue_front"<<queue_front<<endl;
                                        if(queue_front.compare("")==0){

                                                common_log_file("INFO","tid="+uuid_str_tmp+" line="+convertIntlong(global_line)+" size="+convertIntlong(ssize)+" queue_front=EMPT  Y_2 vol_id="+convertIntlong(mod_q)+" kid="+convertInt(kid),global_log_file);
                                                pthread_mutex_unlock(&map_fileio_mutex);
                                                continue;

                                        }
                                         pthread_mutex_unlock(&map_fileio_mutex);
                                }else{

                                        //common_log_file("ERROR","ssize>0 but empty=" ,global_log_file);
                                        pthread_mutex_unlock(&map_fileio_mutex);
                                        continue;
                                }
                        }catch(std::exception& e){
                                        string ex=e.what();
                                        cerr <<e.what()<<endl;
                                        common_log_file("ERROR","EXCEPTION="+ex,global_log_file);
                                        continue;
                        }

                        common_log_file("INFO","tid="+uuid_str_tmp+" line="+convertIntlong(global_line)+" size="+convertIntlong(ssize)+" queue_front="+queue_front+" opration=front kid="+convertInt(kid),global_log_file);
                        cout << "the front queue is :"<<queue_front<<" kid:"<<kid<<endl;




                                if(queue_front.length()>1){
                                        int ret =do_command(kid,uuid_str_tmp,queue_front,mod_q);
                                        if(ret <0){
                                                cerr<<uuid_str_tmp<<":"<<ret<<endl;
                                        }
                                        continue;
                                }

                continue;


        }
return 0;
}

void *work_thread(void *args)
{
        int32_t kid=((struct workapp_id *)args)->kid;
        thread_epoll_handle_me(kid);
        common_log_file("ERROR","thread_epoll_handle_me=out kid="+convertInt(kid),global_log_file);
        pthread_exit(NULL);
}

int32_t MASHBD_WORK::WORK_thread::dome ()
{
        //this.cs_tok=global_thread_number;
        for(int32_t i=0;i <cs_tok;i++){
                create_thread(&work_thread,i);
        }
        return 0;
}

 

相关文章:

  • 2022-12-23
  • 2021-10-08
  • 2021-12-05
  • 2021-08-14
  • 2022-12-23
  • 2022-12-23
  • 2021-07-14
  • 2021-10-10
猜你喜欢
  • 2021-12-21
  • 2022-12-23
  • 2022-12-23
  • 2021-10-18
  • 2022-01-27
相关资源
相似解决方案