【问题标题】:Using shared memory for communication of reader and writer threads使用共享内存进行读写器线程的通信
【发布时间】:2018-06-13 20:47:52
【问题描述】:

我应该同步两个线程(此时这两个线程使用两个信号量来完成)。第一个线程应该以 4 个字节(4 个字符)的缓冲区块读取文本文件并将其复制到共享内存。另一个应该从共享内存中读取并将其写入另一个文件。我的程序在仅完成 16 个字符的过程后停止(因为它无法访问共享内存)。你能帮我解决他的问题吗?

请注意,我在 Mac 中进行编码,这就是信号量与默认 POSIX 略有不同的原因。

提前致谢:)

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <time.h>
#include <unistd.h>
#include <semaphore.h>
#include <pthread.h>
#include <fcntl.h>

#define ANSI_COLOR_RED     "\x1b[31m"
#define ANSI_COLOR_GREEN   "\x1b[32m"
#define ANSI_COLOR_BLUE    "\x1b[34m"
//#define ANSI_COLOR_YELLOW  "\x1b[33m"
//#define ANSI_COLOR_MAGENTA "\x1b[35m"
//#define ANSI_COLOR_CYAN    "\x1b[36m"
#define ANSI_COLOR_RESET   "\x1b[0m"


typedef sem_t Semaphore;

Semaphore * semList[2];

Semaphore *schedularSemaphore;


/*
* SEMAPHORE USAGE
*/
Semaphore *make_semaphore(int value){
    Semaphore *semaphore = (Semaphore *) malloc(sizeof(Semaphore));
    semaphore = sem_open("/semaphore", O_CREAT, 0644, value);
    sem_unlink("/semaphore");
    return semaphore;
}

void semaphore_wait(Semaphore *semaphore){
    sem_wait(semaphore);
}

void semaphore_signal(Semaphore *semaphore){
    sem_post(semaphore);
}
/*
* SEMAPHORE USAGE
*/

#define  NOT_READY  -1
#define  FILLED     0
#define  TAKEN      1

struct Memory {
     int  status;
     char  *data;
};

int fdIN, fdOUT;            /* Input and output file descriptors */

int BUF_SIZE = 4;   
ssize_t ret_in, ret_out;    /* Number of bytes returned by read() and write() */
char buffer[4];             /* Character buffer */


Semaphore * writerSem;
Semaphore * readerSem;

int readFileWriteToShMem(){
    key_t          ShmKEY;
    int            ShmID;
    struct Memory  *ShmPTR;



    int doRead = 1;
    while (doRead == 1) {


        fprintf(stdout, ANSI_COLOR_BLUE "HERE1" ANSI_COLOR_RESET "\n");
        ShmKEY = ftok(".", 'x');
        ShmID = shmget(ShmKEY, sizeof(struct Memory), IPC_CREAT | 0666);

        if (ShmID < 0) {
            fprintf(stdout, "*** shmget error (reader) ***\n");
            exit(1);
        }

        fprintf(stdout, ANSI_COLOR_BLUE "HERE2" ANSI_COLOR_RESET "\n");



        fprintf(stdout, ANSI_COLOR_BLUE "HERE3" ANSI_COLOR_RESET "\n");
        semaphore_wait(readerSem);


        ShmPTR = (struct Memory *) shmat(ShmID, NULL, 0);

        if ((int) ShmPTR == -1) {
            fprintf(stdout, ANSI_COLOR_RED "*** shmat error (reader) ***" ANSI_COLOR_RESET "\n");
            exit(1);
        }
        fprintf(stdout, ANSI_COLOR_BLUE "HERE4" ANSI_COLOR_RESET "\n");

        if((ret_in = read (fdIN, &buffer, BUF_SIZE)) > 0){
            fprintf(stdout, ANSI_COLOR_BLUE "HERE5" ANSI_COLOR_RESET "\n");

            fprintf(stdout, ANSI_COLOR_BLUE "Reader has received a shared memory of 4 bytes..." ANSI_COLOR_RESET "\n");
            fprintf(stdout, ANSI_COLOR_BLUE "Reader has attached the shared memory..." ANSI_COLOR_RESET "\n");

            ShmPTR->status = NOT_READY;
            ShmPTR->data = buffer;
            fprintf(stdout, ANSI_COLOR_BLUE "Reader has filled  \"%s\" to shared memory..." ANSI_COLOR_RESET "\n",
            ShmPTR->data);

            ShmPTR->status = FILLED;

            semaphore_signal(writerSem);

            while (ShmPTR->status != TAKEN){
                usleep(1);
            }


        } else {
            fprintf(stdout, ANSI_COLOR_BLUE "HERE6" ANSI_COLOR_RESET "\n");
            doRead = 0;
        }
    }



    fprintf(stdout, ANSI_COLOR_BLUE "Reader has detected the completion of its task..." ANSI_COLOR_RESET "\n");
    shmdt((void *) ShmPTR);
    fprintf(stdout, ANSI_COLOR_BLUE "Reader has detached its shared memory..." ANSI_COLOR_RESET "\n");
    shmctl(ShmID, IPC_RMID, NULL);
    fprintf(stdout, ANSI_COLOR_BLUE "Reader has removed its shared memory..." ANSI_COLOR_RESET "\n");
    fprintf(stdout, ANSI_COLOR_BLUE "Reader exits...\n");
    exit(0);
}



int writeFileReadFromShMem(){
    key_t          ShmKEY;
    int            ShmID;
    struct Memory  *ShmPTR;



    int doWrite = 1;

    while (doWrite){

        ShmKEY = ftok(".", 'x');
        ShmID = shmget(ShmKEY, sizeof(struct Memory), 0666);
        if (ShmID < 0) {
            fprintf(stdout, ANSI_COLOR_GREEN "*** shmget error (writer) ***" ANSI_COLOR_RESET "\n");
            exit(1);
        }

        semaphore_wait(writerSem);

        fprintf(stdout, ANSI_COLOR_GREEN "Writer has received a shared memory of four bytes..." ANSI_COLOR_RESET "\n");

        ShmPTR = (struct Memory *) shmat(ShmID, NULL, 0);
        if ((int) ShmPTR == -1) {
            fprintf(stdout, ANSI_COLOR_GREEN "*** shmat error (writer) ***" ANSI_COLOR_RESET "\n");
            exit(1);
        }
        fprintf(stdout, ANSI_COLOR_GREEN "Writer has attached the shared memory..." ANSI_COLOR_RESET "\n");

        while (ShmPTR->status != FILLED)
        ;
        ret_out = write (fdOUT, &buffer, (ssize_t) ret_in);
        write(fdOUT, "\n", 1);
        if(ret_out != ret_in){
            ///Write error 
            fprintf(stdout, ANSI_COLOR_RED "write error" ANSI_COLOR_RESET "\n");
            doWrite = 0;

        }

        fprintf(stdout, ANSI_COLOR_GREEN "Writer found the data is ready..." ANSI_COLOR_RESET "\n");
        fprintf(stdout, ANSI_COLOR_GREEN "Writer found \"%s\" in shared memory..." ANSI_COLOR_RESET "\n",
            ShmPTR->data);

        ShmPTR->status = TAKEN;
        fprintf(stdout, ANSI_COLOR_GREEN "Writer has informed Reader data have been taken..." ANSI_COLOR_RESET "\n");

        semaphore_signal(readerSem);
    }

    shmdt((void *) ShmPTR);
    fprintf(stdout, ANSI_COLOR_GREEN "Writer has detached its shared memory..." ANSI_COLOR_RESET "\n");
    fprintf(stdout, ANSI_COLOR_GREEN "Writer exits..." ANSI_COLOR_RESET "\n");
    exit(0);
}



int main(int argc, char* argv[]){


    fdIN = open("textin.txt", O_RDONLY, 0644);

    fdOUT = open("textout.txt", O_WRONLY | O_CREAT, 0644);


    if (fdIN == -1) {
        perror ("error while opening input file");
        return 2;
    }

    if(fdOUT == -1){
        perror ("error while opening output file");
        return 3;
    }

    pthread_t *readerThread, *writerThread;

    readerThread    = (pthread_t *)malloc(sizeof(*readerThread));
    writerThread    = (pthread_t *)malloc(sizeof(*writerThread));

    readerSem = make_semaphore(1);
    writerSem = make_semaphore(0);

    pthread_create(readerThread, NULL, (void*)readFileWriteToShMem, NULL);
    pthread_create(writerThread, NULL, (void*)writeFileReadFromShMem, NULL);


    pthread_join(*readerThread, NULL);
    pthread_join(*writerThread, NULL);

    semaphore_signal(readerSem);

    return 1;

}

【问题讨论】:

    标签: multithreading memory buffer shared


    【解决方案1】:

    这是最终的工作代码。无需在每次迭代时初始化和访问共享内存。

    #include <stdio.h>
    #include <stdlib.h>
    #include <sys/types.h>
    #include <sys/ipc.h>
    #include <sys/shm.h>
    #include <time.h>
    #include <unistd.h>
    #include <semaphore.h>
    #include <pthread.h>
    #include <fcntl.h>
    
    #define ANSI_COLOR_RED     "\x1b[31m"
    #define ANSI_COLOR_GREEN   "\x1b[32m"
    #define ANSI_COLOR_BLUE    "\x1b[34m"
    //#define ANSI_COLOR_YELLOW  "\x1b[33m"
    //#define ANSI_COLOR_MAGENTA "\x1b[35m"
    //#define ANSI_COLOR_CYAN    "\x1b[36m"
    #define ANSI_COLOR_RESET   "\x1b[0m"
    
    typedef sem_t Semaphore;
    
    /*
    * SEMAPHORE USAGE
    */
    Semaphore *make_semaphore(int value){
        Semaphore *semaphore = (Semaphore *) malloc(sizeof(Semaphore));
        semaphore = sem_open("/semaphore", O_CREAT, 0644, value);
        sem_unlink("/semaphore");
        return semaphore;
    }
    
    void semaphore_wait(Semaphore *semaphore){
        sem_wait(semaphore);
    }
    
    void semaphore_signal(Semaphore *semaphore){
        sem_post(semaphore);
    }
    /*
    * SEMAPHORE USAGE
    */
    
    #define  NOT_READY  -1
    #define  FILLED     0
    #define  TAKEN      1
    
    struct Memory {
         int  status;
         char  *data;
    };
    
    int fdIN, fdOUT;            /* Input and output file descriptors */
    int BUF_SIZE = 4;   
    ssize_t ret_in, ret_out;    /* Number of bytes returned by read() and write() */
    char buffer[4];             /* Character buffer */
    
    Semaphore * writerSem, *readerSem;
    
    key_t          ShmKEY;
    int            ShmID;
    struct Memory  *ShmPTR;
    
    int readFileWriteToShMem(){
    
        ShmKEY = ftok(".", 'x');
        ShmID = shmget(ShmKEY, sizeof(struct Memory), IPC_CREAT | 0666);
        ShmPTR = (struct Memory *) shmat(ShmID, NULL, 0);
    
        int doRead = 1;
        while (doRead == 1) {
    
            if (ShmID < 0) {
                fprintf(stdout, "*** shmget error (reader) ***\n");
                exit(1);
            }
    
            semaphore_wait(readerSem);
    
            if ((int) ShmPTR == -1) {
                fprintf(stdout, ANSI_COLOR_RED "*** shmat error (reader) ***" ANSI_COLOR_RESET "\n");
                exit(1);
            }
    
            if((ret_in = read (fdIN, &buffer, BUF_SIZE)) > 0){
    
                fprintf(stdout, ANSI_COLOR_BLUE "Reader has attached the shared memory and received 4 bytes..." ANSI_COLOR_RESET "\n");
    
                ShmPTR->status = NOT_READY;
                ShmPTR->data = buffer;
                fprintf(stdout, ANSI_COLOR_BLUE "Reader has filled  \"%s\" to shared memory..." ANSI_COLOR_RESET "\n", ShmPTR->data);
    
                ShmPTR->status = FILLED;
    
                semaphore_signal(writerSem);
    
                while (ShmPTR->status != TAKEN){
                    usleep(1);
                    fprintf(stdout, ANSI_COLOR_BLUE "Reader is sleeping..." ANSI_COLOR_RESET "\n");
                }
    
            } else {
                doRead = 0;
            }
        }
    
        fprintf(stdout, ANSI_COLOR_BLUE "Reader has detected the completion of its task..." ANSI_COLOR_RESET "\n");
        shmdt((void *) ShmPTR);
        fprintf(stdout, ANSI_COLOR_BLUE "eader has detected the completion of its task and detached its shared memory..." ANSI_COLOR_RESET "\n");
        shmctl(ShmID, IPC_RMID, NULL);
        fprintf(stdout, ANSI_COLOR_BLUE "Reader has removed its shared memory and now halts..." ANSI_COLOR_RESET "\n");
        exit(0);
    }
    
    int writeFileReadFromShMem(){
    
        int doWrite = 1;
    
        while (doWrite){
    
            if (ShmID < 0) {
                fprintf(stdout, ANSI_COLOR_GREEN "*** shmget error (writer) ***" ANSI_COLOR_RESET "\n");
                exit(1);
            }
    
            semaphore_wait(writerSem);
    
            if ((int) ShmPTR == -1) {
                fprintf(stdout, ANSI_COLOR_GREEN "*** shmat error (writer) ***" ANSI_COLOR_RESET "\n");
                exit(1);
            }
            fprintf(stdout, ANSI_COLOR_GREEN "Writer has attached the shared memory and received 4 bytes..." ANSI_COLOR_RESET "\n");
    
            while (ShmPTR->status != FILLED);
    
            ret_out = write (fdOUT, &buffer, (ssize_t) ret_in);
            write(fdOUT, "\n", 1);
            if(ret_out != ret_in){
                ///Write error 
                fprintf(stdout, ANSI_COLOR_RED "write error" ANSI_COLOR_RESET "\n");
                doWrite = 0;
    
            }
    
            fprintf(stdout, ANSI_COLOR_GREEN "Writer found \"%s\" in shared memory..." ANSI_COLOR_RESET "\n",
                ShmPTR->data);
    
            ShmPTR->status = TAKEN;
            fprintf(stdout, ANSI_COLOR_GREEN "Writer has informed Reader data have been taken..." ANSI_COLOR_RESET "\n");
    
            semaphore_signal(readerSem);
        }
    
        shmdt((void *) ShmPTR);
        fprintf(stdout, ANSI_COLOR_GREEN "Writer has detached its shared memory and now halts..." ANSI_COLOR_RESET "\n");
        exit(0);
    }
    
    int main(int argc, char* argv[]){
    
    
        fdIN = open("textin.txt", O_RDONLY, 0644);
        fdOUT = open("textout.txt", O_WRONLY | O_CREAT, 0644);
    
    
        if (fdIN == -1) {
            perror ("error while opening input file");
            return 2;
        }
    
        if(fdOUT == -1){
            perror ("error while opening output file");
            return 3;
        }
    
        pthread_t *readerThread, *writerThread;
    
        readerThread    = (pthread_t *)malloc(sizeof(*readerThread));
        writerThread    = (pthread_t *)malloc(sizeof(*writerThread));
    
        readerSem = make_semaphore(1);
        writerSem = make_semaphore(0);
    
        pthread_create(readerThread, NULL, (void*)readFileWriteToShMem, NULL);
        pthread_create(writerThread, NULL, (void*)writeFileReadFromShMem, NULL);
    
        pthread_join(*readerThread, NULL);
        pthread_join(*writerThread, NULL);
    
        semaphore_signal(readerSem);
    
        return 1;
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-07-23
      • 1970-01-01
      • 1970-01-01
      • 2010-10-10
      • 1970-01-01
      • 1970-01-01
      • 2011-11-06
      • 2015-01-19
      相关资源
      最近更新 更多