一、简介
共享内存为在多个进程之间共享和传递数据提供了一种有效的方式。
但它本身并未提供同步机制。
在实际编程中,可以使用
  信号量,
  传递消息(使用管道或IPC消息),
  生成信号,
  条件变量,
等方法来提供读写之间的有效的同步机制。
 
本例程序使用信号量进行同步,
主要是因为它方便,使用广泛,且独立于进程。
 
本例程序实现了,
生产者进程:
  每次读取YUV输入文件的一帧,
  然后将其写到共享内存中。
消费者进程:
  每次从共享内存中读到一帧,
  处理后,
  将其写到输出文件。
两个进程间使用信号量来保证同步处理每一帧。
 
本例程序很好地示范了共享内存和信号量的机制,
对于实际程序的开发很有意义。
 
二、生产者进程
common.h
用来设置一些测试用的基本参数。
  • /*
  •  * \File
  •  * common.h
  •  */
  • #ifndef __COMMON_H__
  • #define __COMMON_H__
  • #define TEST_FILE "coastguard_cif.yuv"
  • #define OUTPUT_FILE "coastguard_cif_trsd.yuv"
  • #define FYUV_WIDTH 352
  • #define FYUV_HEIGHT 288
  • #endif
  • 共享内存相关的头文件。
    shm_com.h
  • /*
  •  * \File
  •  * shm_com.h
  •  * \Brief
  •  */
  • #ifndef __SHM_COM_H__
  • #define __SHM_COM_H__
  • #define SHM_SEED 1001
  • #define MAX_SHM_SIZE 2048*2048*3 
  • typedef struct shared_use_st
  • {
  •   int end_flag;              //用来标记进程间的内存共享是否结束: 0, 未结束; 1, 结束
  •   char shm_sp[MAX_SHM_SIZE]; //共享内存的空间
  • }shared_use_st;
  • #endif
  •  
    信号量的头文件
    semaphore.h
  • /*
  •  * \File
  •  * semaphore.h
  •  * \Brief
  •  * semaphore operation
  •  */
  • #ifndef __SEMAPHORE_H__
  • #define __SEMAPHORE_H__
  • #define SEM_SEED 1000
  • union semun
  • {
  •   int val;
  •   struct semid_ds *buf;
  •   unsigned short *array;
  • };
  • int set_semvalue(int sem_id, int value);
  • void del_semvalue(int sem_id);
  • int semaphore_p(int sem_id);
  • int semaphore_v(int sem_id);
  • #endif
  •  
    帧处理的头文件
    frame.h
  • /*
  •  * \File
  •  * frame.h
  •  * \Brief
  •  *
  •  */
  • #ifndef __FRAME_H__
  • #define __FRAME_H__
  • #include "shm_com.h"
  • #define PIX_VALUE 1
  • typedef enum 
  • {
  •   YUV420,
  •   YUV422,
  •   YUV444,
  •   RGB
  • }frame_type_e;
  • typedef struct 
  • {
  •   int frm_w; // width
  •   int frm_h; // height
  •   frame_type_e frm_type; 
  •   int frm_size;
  •   char *frm_comps;
  • }frame_t, *frame_p;
  • int init_frame(frame_p frame, int width, int height, frame_type_e type);
  • int free_frame(frame_p frame);
  • int read_frame_from_file(frame_p frame, FILE* fp);
  • int write_frame_into_file(FILE* fp, frame_p frame);
  • int read_frame_from_shm(frame_p frame, shared_use_st* shm);
  • int write_frame_into_shm(shared_use_st* shm, frame_p frame);
  • int crop_frame(frame_p frame, int l_offset, int t_offset, int c_w, int c_h);
  • #endif
  • 生产者进程的基本流程:
    打开输入文件并初始化帧;
    创建并初始化共享内存和信号量;
    然后每次读取一帧,
      用信号量获取共享内存的权限后,
      将读取的帧写入共享内存,
      再释放共享内存的权限。
    当处理完所有的帧后,
    设置结束标志,
    并释放相关的资源。
    producer.c
  • /*
  •  * \File
  •  * producer.c
  •  * \Brief
  •  * Test shared-memory and message-queue
  •  */
  • #include <stdio.h>
  • #include <stdlib.h>
  • #include <string.h>
  • #include <errno.h>
  • #include <unistd.h>
  • #include <sys/shm.h>
  • #include <sys/sem.h>
  • #include "common.h"
  • #include "semaphore.h"
  • #include "shm_com.h"
  • #include "frame.h"
  • int main(char argc, char* argv[])
  • {
  •   FILE* fp_in = NULL;
  •   frame_t frame;
  •   int frame_cnt = 0;
  •   int sem_id; // semaphore id
  •   int shm_id; // shared-memory id
  •   void* shared_memory = (void*)0;
  •   shared_use_st* shared_stuff;
  •   /* Open input file */
  •   if ((fp_in = fopen(TEST_FILE, "rb")) < 0 )
  •   {
  •     printf("Open input file failed: %s\n", TEST_FILE);
  •     exit(EXIT_FAILURE);
  •   }
  •   /* Init frame */
  •   init_frame(&frame, FYUV_WIDTH, FYUV_HEIGHT, YUV420);
  •   printf("FRAME: w = %d, h = %d\n", frame.frm_w, frame.frm_h);
  •   /* Create and init semaphore */
  •   sem_id = semget((key_t)SEM_SEED, 1, 0666 | IPC_CREAT);
  •   if (sem_id == -1)
  •   {
  •     fprintf(stderr, "semget failed.\n");
  •     exit(EXIT_FAILURE);
  •   }
  •   /* Init shared-memory */
  •   shm_id = shmget((key_t)SHM_SEED, sizeof(struct shared_use_st), 0666 | IPC_CREAT);
  •   if (shm_id == -1)
  •   {
  •     fprintf(stderr, "shmget failed.\n");
  •     exit(EXIT_FAILURE);
  •   }
  •   shared_memory = shmat(shm_id, (void*)0, 0);
  •   if (shared_memory == (void*)-1)
  •   {
  •     fprintf(stderr, "shmat failed.\n");
  •     exit(EXIT_FAILURE);
  •   }
  •   shared_stuff = (struct shared_use_st*)shared_memory;
  •   shared_stuff->end_flag = 0;
  •   printf("FRAME_CNT: %d\n", frame_cnt);
  •   set_semvalue(sem_id, 1);
  •   while (read_frame_from_file(&frame, fp_in) == 0)
  •   {
  •     semaphore_p(sem_id);
  •     /* Write it into shared memory */
  •     write_frame_into_shm(shared_stuff, &frame);
  •     shared_stuff->end_flag = 0;
  •  
  •     semaphore_v(sem_id);
  •     frame_cnt++;
  •     printf("FRAME_CNT: %d\n", frame_cnt);
  •   }
  •   semaphore_p(sem_id);
  •   shared_stuff->end_flag = 1;
  •   semaphore_v(sem_id);
  •   /* over */
  •   printf("\nProducer over!\n");
  •   fclose(fp_in);
  •   free_frame(&frame);
  •   del_semvalue(sem_id);
  •   if (shmdt(shared_memory) == -1)
  •   {
  •     fprintf(stderr, "shmdt failed.\n");
  •     exit(EXIT_FAILURE);
  •   }
  •   exit(EXIT_SUCCESS);
  • }
  •  
    三、消费者进程
    消费者进程的基本流程:
    打开输出文件并初始化帧;
    获取共享内存和信号量;
    每次
      得到共享内存的权限后,
      从共享内存中读取一帧并获得结束标志
      进行帧处理,
      释放共享内存的权限。
    直到结束标志为真。
     
    最后释放相关的资源。
    consumer.c
  • /*
  •  * \File
  •  * consumer.c
  •  * \Brief
  •  * 
  •  */
  • #include <stdio.h>
  • #include <stdlib.h>
  • #include <string.h>
  • #include <errno.h>
  • #include <unistd.h>
  • #include <sys/shm.h>
  • #include <sys/sem.h>
  • #include "common.h"
  • #include "semaphore.h"
  • #include "shm_com.h"
  • #include "frame.h"
  • int main(char argc, char *argv[])
  • {
  •   FILE* fp_out = NULL;
  •   frame_t frame;
  •   int frame_cnt = 0;
  •   int sem_id; // semaphore id
  •   int shm_id; // shared-memory id
  •   void* shared_memory = (void*)0;
  •   shared_use_st* shared_stuff;
  •   int end_flag = 0;
  •   /* Open output file */
  •   if ((fp_out = fopen(OUTPUT_FILE, "wb")) < 0 )
  •   {
  •     printf("Open output file failed: %s\n", OUTPUT_FILE);
  •     exit(EXIT_FAILURE);
  •   }
  •   /* Init frame */
  •   init_frame(&frame, FYUV_WIDTH, FYUV_HEIGHT, YUV420);    
  •   printf("FRAME: w = %d, h = %d\n", frame.frm_w, frame.frm_h);
  •   /* Create semaphore */
  •   sem_id = semget((key_t)SEM_SEED, 1, 0666 | IPC_CREAT);
  •   if (sem_id == -1)
  •   {
  •     fprintf(stderr, "semget failed.\n");
  •     exit(EXIT_FAILURE);
  •   }
  •   /* Init shared-memory */
  •   shm_id = shmget((key_t)SHM_SEED, sizeof(struct shared_use_st), 0666 | IPC_CREAT);
  •   if (shm_id == -1)
  •   {
  •     fprintf(stderr, "shmget failed.\n");
  •     exit(EXIT_FAILURE);
  •   }
  •   shared_memory = shmat(shm_id, (void*)0, 0);
  •   if (shared_memory == (void*)-1)
  •   {
  •     fprintf(stderr, "shmat failed.\n");
  •     exit(EXIT_FAILURE);
  •   }
  •   shared_stuff = (struct shared_use_st*)shared_memory;
  •   printf("FRAME_CNT: %d\n", frame_cnt);
  •   /* 
  •    * 必须先置0,
  •    * 否则会因生产者进程的异常退出未释放信号量而导致程序出错 
  •    */
  •   set_semvalue(sem_id, 0);
  •   do
  •   {
  •     semaphore_p(sem_id); 
  •     /* Read frame from shared-memory */
  •     read_frame_from_shm(&frame, shared_stuff);    
  •     end_flag = shared_stuff->end_flag;
  •     crop_frame(&frame, 10, 10, 40, 40);
  •     write_frame_into_file(fp_out, &frame);
  •     semaphore_v(sem_id);
  •     frame_cnt++;
  •     printf("FRAME_CNT: %d\n", frame_cnt);
  •   } while(!end_flag);
  •   /* Over */
  •   printf("\nConsumer over!\n");
  •   fclose(fp_out);
  •   free_frame(&frame);
  •   if (shmdt(shared_memory) == -1)
  •   {
  •     fprintf(stderr, "shmdt failed.\n");
  •     exit(EXIT_FAILURE);
  •   }
  •   exit(EXIT_SUCCESS);
  • }
  •  
    四、信号量函数和帧处理函数
    信号量函数
    semaphore.c
  • /*
  •  * \File
  •  * semaphore.c
  •  * \Breif
  •  * 
  •  */
  • #include <stdio.h>
  • #include <stdlib.h>
  • #include <unistd.h>
  • #include <sys/sem.h>
  • #include "semaphore.h"
  • /* init semaphore by semctl */
  • int set_semvalue(int sem_id, int value)
  • {
  •   union semun sem_union;
  •     
  •   sem_union.val = value;
  •   if (semctl(sem_id, 0, SETVAL, sem_union) == -1)
  •     return 1;
  •   return 0;
  • }
  • /* delete semaphore by sectl */
  • void del_semvalue(int sem_id)
  • {
  •   union semun sem_union;
  •   if (semctl(sem_id, 0, IPC_RMID, sem_union) == -1)
  •     fprintf(stderr, "Failed to delete semaphore\n");
  • }
  • /* P(v) */
  • int semaphore_p(int sem_id)
  • {
  •   struct sembuf sem_b;
  •   sem_b.sem_num = 0;
  •   sem_b.sem_op = -1; /* P(v) */
  •   sem_b.sem_flg = SEM_UNDO;
  •   if (semop(sem_id, &sem_b, 1) == -1)
  •   {
  •     fprintf(stderr, "semaphore_p failed\n");
  •     return 1;
  •   }
  •   return 0;
  • }
  • /* V(v) */
  • int semaphore_v(int sem_id)
  • {
  •   struct sembuf sem_b;
  •   sem_b.sem_num = 0;
  •   sem_b.sem_op = 1; // V(v)
  •   sem_b.sem_flg = SEM_UNDO;
  •   if (semop(sem_id, &sem_b, 1) == -1)
  •   {
  •     fprintf(stderr, "semaphore_v failed\n");
  •     return 1;
  •   }
  •   return 0;
  • }
  • 帧处理函数
    frame.c
  • /*
  •  * \File
  •  * frame.c
  •  */
  • #include <stdio.h>
  • #include <stdlib.h>
  • #include <unistd.h>
  • #include <memory.h>
  • #include <assert.h>
  • #include <errno.h>
  • #include "frame.h"
  •  
  • /*
  •  * \Brief
  •  * init frame
  •  */
  • int init_frame(frame_p frame, int width, int height, frame_type_e type)
  • {
  •   assert(frame != NULL);
  •         
  •   frame->frm_w = width;
  •   frame->frm_h = height;
  •   frame->frm_type = type;
  •   switch(frame->frm_type)
  •   {
  •   case YUV420:
  •     frame->frm_size = (frame->frm_w * frame->frm_h * 3) / 2;
  •     break;    
  •   case YUV422:
  •     frame->frm_size = frame->frm_w * frame->frm_h * 2; 
  •     break;
  •   case YUV444:
  •     frame->frm_size = frame->frm_w * frame->frm_h * 3; 
  •     break;    
  •   case RGB:
  •     frame->frm_size = frame->frm_w * frame->frm_h * 3;
  •     break;
  •   default:
  •     fprintf(stderr, "frame type is invalid.");
  •     return 1; 
  •   }
  •   if ((frame->frm_comps = (char*)calloc(frame->frm_size, sizeof(char))) == NULL)
  •   {
  •     fprintf(stderr, "calloc failed.");
  •     return 1;
  •   } 
  •   return 0;
  • }
  • /*
  •  * \Brief
  •  * init frame
  •  */
  • int free_frame(frame_p frame)
  • {
  •   assert(frame != NULL);
  •   free(frame->frm_comps);
  •   return 0;
  • }
  •   
  • /*
  •  * \Brief
  •  * read a frame from file
  •  */
  • int read_frame_from_file(frame_p frame, FILE* fp)
  • {
  •   int ret;
  •   assert(frame != NULL && fp != NULL);
  •   if (ret = (fread(frame->frm_comps, sizeof(char), frame->frm_size, fp)) 
  •      != frame->frm_size)
  •   {
  •     fprintf(stderr, "read_frame_from_file failed. %d\n", ret);
  •     return 1;
  •   }
  •   return 0;
  • /*
  •  * \Brief
  •  * write a frame into file
  •  */
  • int write_frame_into_file(FILE* fp, frame_p frame)
  • {
  •   assert(frame != NULL && fp != NULL);
  •   if (fwrite(frame->frm_comps, sizeof(char), frame->frm_size, fp)
  •       != frame->frm_size)
  •   {
  •     fprintf(stderr, "write_frame_into_file failed.\n");
  •     return 1;
  •   }
  •   return 0;
  • /*
  •  * \Brief
  •  * read a frame from shared-memory
  •  */
  • int read_frame_from_shm(frame_p frame, shared_use_st* shm)
  • {
  •   assert(frame != NULL && shm != NULL);
  •   memcpy((char*)frame->frm_comps, (char*)shm->shm_sp, frame->frm_size);
  •   return 0;
  •  
  • /*
  •  * \Brief
  •  * write a frame into shared-memory
  •  */
  • int write_frame_into_shm(shared_use_st* shm, frame_p frame) 
  • {
  •   assert(frame != NULL && shm != NULL);
  •   memcpy((char*)shm->shm_sp, (char*)frame->frm_comps, frame->frm_size);
  •   return 0;
  • /*
  •  * \Brief
  •  * crop frame
  •  */
  • int crop_frame(frame_p frame, int l_offset, int t_offset, int c_w, int c_h)
  • {
  •   char* crop_loc;
  •   int index_h;
  •   assert(frame != NULL);
  •   if ((l_offset + c_w) > frame->frm_w)
  •   {
  •     printf("Crop width is out of range.\n");
  •     return 1;
  •   }
  •   if ((t_offset + c_h) > frame->frm_h)
  •   {
  •     printf("Crop height is out of range.\n");
  •     return 1;
  •   }
  •   crop_loc = frame->frm_comps + (t_offset * frame->frm_w) + l_offset; 
  •   for (index_h = 0; index_h < c_h; index_h++)
  •   {
  •     memset(crop_loc, PIX_VALUE, c_w);
  •     crop_loc += frame->frm_w;
  •   }
  •   return 0;
  • }
  •  
    五、编译与运行
     
    makefile.producer
  • OBJECTS = producer.o semaphore.o frame.o
  • CC = gcc
  • CFLAG = -g -Wa 
  • producer : $(OBJECTS)
  •   $(CC) $(CFLAG) -o producer $(OBJECTS)
  • producer.o: common.h semaphore.h frame.h shm_com.h
  • semaphore.o:semaphore.h
  • frame.o: frame.h
  • .PHONY:clean
  • clean:
  •   rm producer $(OBJECTS)
  •  
    makefile.consumer
  • OBJECTS = consumer.o semaphore.o frame.o
  • CC = gcc
  • CFLAG = -g -Wa
  • consumer : $(OBJECTS)
  •   $(CC) $(CFLAG) -o consumer $(OBJECTS)
  • consumer.o: common.h semaphore.h frame.h shm_com.h
  • semaphore.o:semaphore.h
  • frame.o:frame.h
  • .PHONY:clean
  • clean:
  •   rm consumer $(OBJECTS)
  • 编译与运行:
  • $make -f makefile.producer
  • $make -f makefile.consumer
  • $producer &
  • $consumer
  • 相关文章: