【问题标题】:How to (properly) use robust pthreads for process synchronization?如何(正确)使用健壮的 pthread 进行进程同步?
【发布时间】:2011-10-20 16:33:39
【问题描述】:

我们在生产系统中存在错误,其中进程在持有共享内存互斥锁时出现段错误。我们希望它在死亡时释放锁。我们使用 sem_wait()/sem_post(),但是做功课,我发现这个 API 不允许这样的行为:

http://www.usenetmessages.com/view.php?c=computer&g=1074&id=78029&p=0

文章说,答案是使用强大的 pthreads API。我找到了以下关于此主题的文章:

http://www.embedded-linux.co.uk/tutorial/mutex_mutandis

但是,在实现了以下代码之后,我的行为不可靠,也就是说,我是否应该告诉进程 3,例如,进行段错误,代码工作得很好。其他进程唤醒,识别出一个进程在持有互斥锁时死亡,然后恢复。但是,我应该告诉进程 0 死亡,还是应该删除第 63 行的睡眠调用,一旦失败的进程自行终止,其他进程就不会唤醒。我做错了吗?

#include <stdio.h>
#include <stdlib.h>
#include <features.h>
#define __USE_POSIX
#include <signal.h>
#include <sys/types.h>
#include <unistd.h>
#define __USE_MISC
#include <sys/mman.h>
#include <fcntl.h>
#include <errno.h>
#define __USE_GNU   /* Necessario para usar a API PTHREAD_MUTEX_ROBUST_NP */
#define __USE_UNIX98 /* Necessario para usar a funcao pthread_mutexattr_settype */
#include <pthread.h>
#include <sys/wait.h>
static void *shrd;

static int child_main(int slot, int segfault) {
    pthread_mutex_t   *lock = (pthread_mutex_t *) shrd;
    int                err;

    if ( 0 != (err=pthread_mutex_lock(lock)) ) {
        switch(err) {
        case EINVAL:
            printf("Lock invalido no filho [%d]\n", slot);
            goto excecao; 

        case EDEADLK:
            printf("O filho [%d] tentou travar um lock que jah possui.\n", slot);
            break;

        case EOWNERDEAD:
            printf("Filho [%d] foi informado que o processo que estava com o lock morreu.\n", slot);
            if ( 0 == pthread_mutex_consistent_np(lock) ) {
                printf("Filho [%d] retornou o lock para um estado consistente.\n", slot);
            } else {
                fprintf(stderr, "Nao foi possivel retornar o lock a um estado consistente.\n");
                goto desistir;
            }

            if ( 0 != (err=pthread_mutex_lock(lock)) ) {
                fprintf(stderr, "Apos recuperar o estado do lock, nao foi possivel trava-lo: %d\n", err);
                goto desistir;
            }


        case ENOTRECOVERABLE:
            printf("O filho [%d] foi informado de que o lock estah permanentemente em estado inconsistente.\n", slot);
            goto desistir;

        default:
            printf("Erro desconhecido ao tentar travar o lock no filho [%d]: [%d]\n", slot, err);
            goto excecao; 
        }
    }

    printf("Filho [%d] adquiriu o lock.\n", slot);

    if ( segfault == slot ) {
        printf("Matando o PID [%d] com SIGSEGV.\n", getpid());
        kill(getpid(), SIGSEGV); 
    } else {
        sleep(1);
    }

    if ( 0 != (err = pthread_mutex_unlock(lock)) ) {
        switch (err) {
        case EPERM:
            printf("O filho [%d] tentou liberar o lock, mas nao o possui.\n", slot);
            break;

        default:
            fprintf(stderr, "Erro inesperado ao liberar o lock do filho [%d]: [%d]\n", slot, err);
        }
    } else {
        printf("Filho [%d] retornou o lock.\n", slot);
    }

    return 0;

excecao:
    fprintf(stderr, "Programa terminado devido excecao.\n");
    return 1;

desistir:
    fprintf(stderr, "A execucao do sistema nao deve prosseguir. Abortando todos os processos.\n");
    kill(0, SIGTERM);

    /* unreachable */
    return 1;
}

int main(int argc, const char * const argv[]) {
    pid_t               filhos[10];
    int                 status;
    pid_t               p;
    int                 segfault = -1;
    pthread_mutexattr_t attrs;

    if ( argc > 1 ) {
        segfault = atoi(argv[1]);
        if ( segfault < 0 || segfault > 9 )
            segfault = -1;
    }

    if ( (shrd = mmap(NULL, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0)) == MAP_FAILED ) {
        perror("Erro ao criar shrd mem:\n");
        exit(1);
    }

    pthread_mutexattr_init         (&attrs);
    pthread_mutexattr_settype      (&attrs, PTHREAD_MUTEX_RECURSIVE_NP);
    pthread_mutexattr_setrobust_np (&attrs, PTHREAD_MUTEX_ROBUST_NP);
    pthread_mutexattr_setpshared   (&attrs, PTHREAD_PROCESS_SHARED);
    /* 
        Devido a um BUG na glibc 2.5 (que eh a usada pelo CentOS 5,
        a unica forma de fazer os mutexes robustos funcionarem eh
        setando o protocolo para PTHREAD_PRIO_INHERIT:
        http://sourceware.org/ml/libc-help/2010-04/msg00028.html
    */
    pthread_mutexattr_setprotocol  (&attrs, PTHREAD_PRIO_INHERIT);
    pthread_mutex_init             ((pthread_mutex_t*) shrd, &attrs);
    pthread_mutexattr_destroy      (&attrs);

    for (size_t i=0; i<sizeof(filhos)/sizeof(pid_t); ++i) {
        if ( (filhos[i]=fork()) == 0 ) {
            return child_main((int) i, segfault);
        } else {
            if ( filhos[i] < 0 ) {
                fprintf(stderr, "Erro ao criar o filho [%zu]. Abortando.\n", i);
                exit(1);
            }
        }
    }

    for (size_t i=0; i<sizeof(filhos)/sizeof(pid_t); ++i) {
        do {
            p = waitpid(filhos[i], &status, 0);
        } while (p != -1);
    }

    printf("Pai encerrou a sua execucao.\n");

    return 0;
}

顺便说一句:我正在 CentOS 5 上编译,64 位:

$ uname -rm
2.6.18-194.el5 x86_64
glibc-2.5-49
gcc-4.1.2-48.el5

(对不起,代码上的句子和cmets都是葡萄牙语,我的母语。)

【问题讨论】:

    标签: linux pthreads robust


    【解决方案1】:

    我尝试了其他一些方法,即: 1. 使用 POSIX 屏障 2. 在 forking() 时让父级持有锁,并在每个子级递增一个计数器后释放它。

    第一种方法根本不起作用,但我发布了我使用的源代码,因为我可能在使用 API 时犯了一些错误:

    在 child_main 上:

    pthread_barrier_t *barr = (pthread_barrier_t *) ((char *) shrd + sizeof(pthread_mutex_t));
    ...
    int rc = pthread_barrier_wait(barr);
    if(rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD)
    {
       printf("Nao foi possivel esperar na barreira.\n");
       exit(-1);
    }
    

    主线:

    pthread_barrierattr_t   barr_attrs;
    pthread_barrier_t      *barr;
    ...
    initialize(pthread_barrierattr_init,       &barr_attrs);
    initialize(pthread_barrierattr_setpshared, &barr_attrs, PTHREAD_PROCESS_SHARED);
    barr = (pthread_barrier_t *) ((char *) shrd + sizeof(pthread_mutex_t));
    
    if ( (init_result = pthread_barrier_init(barr, &barr_attrs, 10)) != 0 ) {
       printf("Nao foi possivel iniciar a barreira.\n");
       exit(EXIT_FAILURE);
    }
    

    Initialize是一个宏,定义为:

     #define initialize(func, ...) \
     do { \
        init_result = func(__VA_ARGS__); \
        if ( 0 != init_result ) { \
          stored_errno = errno; \
          func_name = #func; \
          goto erro_criacao_semaforo; \
        } \
     } while(0);
    

    第二种方法似乎有效:

    在 child_main 上:

    int               *contador = (int *) ((char *) shrd + sizeof(pthread_mutex_t) + sizeof(int));
    ...
    int *n = (int *)(lock+1);
    ...
    if ( 0 != (err=pthread_mutex_lock(lock)) ) {
    ...
    

    主线:

    volatile int           *n; // Cada filho iniciado incrementa esta variavel. 
                               // Qdo ela chega em 10, liberamos o lock.
    ...
    n          = (int *) ((char *) shrd + sizeof(pthread_mutex_t));
    ...
    pthread_mutex_lock(mutex);
    for (i=0; i<sizeof(filhos)/sizeof(pid_t); ++i) {
    ... // the fork goes here.
    }
    
    while (*n != 10); // Isto garante que todos os filhos cheguem ao lock.
    pthread_mutex_unlock(mutex);
    

    但是一旦我添加了一个随机的睡眠时间,所以它们变得不同步,我又遇到了一个死锁:

    在 child_main 上:

    int                num_sorteado;
    struct timespec    dessincronizador = { 1, 0 };
    
    int *n = (int *)(lock+1);
    
    num_sorteado = 1 + (int) (999999.0 * (rand() / (RAND_MAX + 1.0)));
    dessincronizador.tv_nsec = num_sorteado;
    nanosleep(&dessincronizador, NULL);
    
    if ( 0 != (err=pthread_mutex_lock(lock)) ) {
    ...
    

    遗憾的是,似乎没有可靠的方法来了解进程在持有锁时死亡,所以解决我们问题的最佳方法是捕获正在死亡进程的信号并引发 kill(0, SIGTERM) 到让其他进程也死掉。

    【讨论】:

    • 这是一个答案,还是试图扩展您的问题?
    【解决方案2】:

    您的EOWNERDEAD 块在ENOTRECOVERABLE 块之前错过了break。 此外,根据pthread_mutex_lock 手册页,在第一次调用pthread_mutex_lock() 之后,即使返回EOWNERDEAD,调用者也会持有锁。因此,您不应该在EOWNERDEAD 的块内再次调用它。

    【讨论】:

    • 感谢@caruccio 先生的回复。实际上,代码中存在 ENOTRECOVERABLE 之前的中断。在复制到站点之前,我可能在删除夹板标记时将其删除。正如您所指出的,在 pthread_mutex_consistent_np() 之后删除锁似乎在与原始代码相同的情况下起作用。我在那里有它,因为 mutex_mutandis 文章说它是必需的。但是,在将互斥体返回到一致状态后,不管有没有锁,事实是,如果没有第 63 行的 sleep,或者传递 0 作为参数,程序在段错误后挂起。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-07-08
    • 2010-12-04
    • 2017-07-26
    • 1970-01-01
    • 2013-09-17
    • 1970-01-01
    • 2022-09-27
    相关资源
    最近更新 更多