在并发编程中,管程(monitor)是一个同步构件,管程实现了同一时间点,最多只有一个线程可以执行管程的某个子程序。与那些通过修改数据结构实现互斥访问的并发程序设计相比,管程的实现很大程度上简化了程序设计。

管程可以确保一次只有一个进程执行管程中的程序,因此程序员不需要显式地编写同步代码,但是如果需要就某些特定条件上的同步,则需要定义一些条件结构(condition variable)来实现,并且对条件变量的操作仅有wait()和signal(),如下:

condition x, y;

x.wait();

...

x.signal();

调用x.wait()操作可能会使得一个进程挂起,直到另一个进程调用x.signal()操作。与信号量中的signal()操作相比,管程中如果在没有任何进程挂起的情况下调用signal()没有任何作用,而在信号量中,则必然会改变信号量的状态。

一个管程(mointor)的示意图如下所示:

 使用互斥量和条件变量实现线程同步控制

一个mointor中的程序运行前必须首先获取mutex,直至程序运行完成或者线程等待的某个条件发生时才释放mutex。当一个线程执行mointor中的一个子程序时,称为占用(occupy)该mointor,因此必须等到没有其他线程执行管程程序时方可调用管程程序,这是互斥保证。在管程的简单实现中,编译器为每个管程对象自动加入一把私有的mutex(lock),初始状态为unlock,管程中的每个对象入口处执行lock操作,出口处执行unlock操作。

因此设计monitor时至少必须包含mutex(lock) object(互斥量)和condition variables(条件变量)。一个条件变量可以看作是等待该条件发生的线程集合。

注:monitor也称为<线程安全对象/类/模块>。

 

条件变量

为何需要条件变量?

考虑如下一个busy waiting loop:

while not(P)

    do skip

如果仅有mutex,则线程必须等待P为真时才能继续执行。如此,将会导致其他线程无法进入临界区使得条件P为真,因此该管程可能发生死锁。

可以用条件变量解决。一个条件变量C可以看作是一个线程队列,其中存放的线程正在等待与之关联的条件变为真。当一个线程等待一个条件变量C时,其将mutex释放,然后其他线程就可以进入该管程中,通过改变C的值可以使得条件C满足,因此对条件变量C可以有如下操作:

(1)wait(c, m):线程调用该操作,等待条件C满足后继续执行,在等待过程中,释放mutex,因此此过程中,该线程不占用管程。

(2)signal(c):线程调用该操作表明此时条件C为真。

一个线程发生signal()后,至少有两个线程想要占用包含条件变量的管程:发出signal()操作的线程P,等待条件变量的线程Q,此时有两种选择:

1.非阻塞式条件变量:Q继续等待直到P完成。

2.阻塞式条件变量:P继续等待直到Q完成。

两种条件变量类型

阻塞式条件变量

也被称为霍尔风格(Hoare-style)管程,如下图所示:

 使用互斥量和条件变量实现线程同步控制

每个管程包含两个线程队列e,s,其中:

e:入口队列

s:发出signal的线程队列

对于每个条件变量C,有一个线程队列,用C.q表示,如上图的a.q、b.q,这些队列很多情况下可以实现为FIFO模式。

阻塞式条件变量实现如下:

 使用互斥量和条件变量实现线程同步控制

非阻塞式条件变量

也称为Mesa风格管程,如下图所示:

 使用互斥量和条件变量实现线程同步控制

该模型中,发出signal()操作的线程不会失去管程的占用权,被notified()的线程将会被移到队列e中,相较于阻塞式条件变量,该模型不需要队列s。例如Pthread中的条件变量就采用这种非阻塞模式,即发出signal()操作的线程优先级高于被notified()的线程,要使用这种条件变量:首先利用pthread_mutex_lock获取互斥锁,然后调用pthread_cond_wait在线程睡眠等待之前先释放互斥锁,在其被唤醒后再重新获取互斥锁。关于pthread条件变量如下会有详细介绍。

非阻塞条件变量实现如下:

 使用互斥量和条件变量实现线程同步控制

 

POSIX同步之互斥锁和条件变量的使用

如下为经典的有界缓冲区问题,可以用生产者/消费者模型描述,示意图如下:

 使用互斥量和条件变量实现线程同步控制

采用互斥量的生产者/消费者代码如下:

  1 [root@bogon unp]# cat producer_consumer_mutex.c
  2 #include <unistd.h>
  3 #include <sys/types.h>
  4 #include <pthread.h>
  5 #include <stdlib.h>
  6 #include <string.h>
  7 #include <errno.h>
  8 #include <stdio.h>
  9 
 10 #define CONSUMER_COUNT 1        /* 1个消费者线程 */
 11 #define PRODUCER_COUNT 3        /* 3个生产者线程 */
 12 #define BUFFERSIZE 10
 13 
 14 int g_buffer[BUFFERSIZE];
 15 
 16 unsigned short in = 0;
 17 unsigned short out = 0;
 18 
 19 pthread_mutex_t g_mutex;
 20 
 21 pthread_t g_thread[CONSUMER_COUNT + PRODUCER_COUNT];    /* 存放生产者和消费者的线程号 */
 22 
 23 void* consumer(void* arg)
 24 {
 25         int num = (int)arg;
 26         /* 不断消费 */
 27         while (1)
 28         {
 29                 pthread_mutex_lock(&g_mutex);
 30 
 31                 /* 打印仓库当前状态 */
 32                 int i;
 33                 for (i = 0; i < BUFFERSIZE; i++) 
 34                 {
 35                         if (g_buffer[i] == -1)
 36                                 printf("g_buffer[%d] = %s\n", i, "null");
 37                         else
 38                                 printf("g_buffer[%d] = %d\n", i, g_buffer[i]);
 39 
 40                         if (i == out)
 41                                 printf("g_buffer[%d]可以消费\n", i);
 42                 }
 43 
 44                 /* 消费产品 */
 45                 printf("thread %d 开始消费产品 %d\n", num, g_buffer[out]);
 46 sleep(4);       /* 消费一个产品需要4秒 */
 47                 g_buffer[out] = -1;
 48                 printf("消费完毕\n");
 49                 out = (out + 1) % BUFFERSIZE;
 50 
 51                 pthread_mutex_unlock(&g_mutex);
 52         }
 53 
 54         return NULL;
 55 }
 56 
 57 void* producer(void* arg)
 58 {
 59         int num = (int)arg;
 60         /* 不断生产 */
 61         while (1)
 62         {
 63                 pthread_mutex_lock(&g_mutex);
 64 
 65                 /* 打印仓库当前状态 */
 66                 int i;
 67                 for (i = 0; i < BUFFERSIZE; i++)
 68         {
 69                 if (g_buffer[i] == -1)
 70                 printf("g_buffer[%d] = %s\n", i, "null");
 71             else
 72                 printf("g_buffer[%d] = %d\n", i, g_buffer[i]);
 73   
 74             if (i == in)
 75                 printf("g_buffer[%d]可以生产\n", i);
 76         }
 77 
 78                 /* 生产产品 */
 79                 g_buffer[in]++;
 80                 printf("thread %d 开始生产产品 %d\n", num, g_buffer[in]);
 81                 sleep(2);       /* 生产一个产品需要2秒 */
 82                 printf("生产完毕\n");
 83                 in = (in + 1) % BUFFERSIZE;
 84 
 85                 pthread_mutex_unlock(&g_mutex);
 86         }
 87 
 88         return NULL;
 89 }
 90 
 91 int main(void)
 92 {
 93         /* 初始化仓库 */
 94         int i;
 95         for (i = 0; i < BUFFERSIZE; i++)
 96                 g_buffer[i] = -1;
 97 
 98         /* 创建消费者线程,线程号为:[0, CONSUMER_COUNT) */
 99         for (i = 0; i < CONSUMER_COUNT; i++)
100         {
101                 pthread_create(&g_thread[i], NULL, consumer, (void*)i);
102         }
103 
104         /* 创建生产者线程,线程号为:[CONSUMER_COUNT, CONSUMER_COUNT + PRODUCER_COUNT) */
105         for (i = 0; i < PRODUCER_COUNT; i++)
106         {
107                 pthread_create(&g_thread[i + CONSUMER_COUNT], NULL, producer, (void*)(i + CONSUMER_COUNT));
108         }
109 
110         /* 等待创建的所有线程退出 */
111         for (i = 0; i < CONSUMER_COUNT + PRODUCER_COUNT; i++)
112         {
113                 pthread_join(g_thread[i], NULL);
114         }
115 
116         return 0;
117 }
118 
119 // output
120 ...
121 thread 2 开始生产产品 4
122 生产完毕
123 g_buffer[0] = 4
124 g_buffer[1] = 4
125 g_buffer[2] = 4
126 g_buffer[3] = 2
127 g_buffer[3]可以生产
128 g_buffer[4] = 2
129 g_buffer[5] = 1
130 g_buffer[6] = 1
131 g_buffer[7] = 0
132 g_buffer[8] = 0
133 g_buffer[9] = 4
134 thread 1 开始生产产品 3
135 生产完毕
136 g_buffer[0] = 4
137 g_buffer[1] = 4
138 g_buffer[2] = 4
139 g_buffer[3] = 3
140 g_buffer[4] = 2
141 g_buffer[5] = 1
142 g_buffer[6] = 1
143 g_buffer[7] = 0
144 g_buffer[8] = 0
145 g_buffer[9] = 4
146 g_buffer[9]可以消费
147 thread 0 开始消费产品 4
148 消费完毕
149 ...
View Code

相关文章: