信号量的使用(进程内,进程间)

本文从WordPress迁移而来, 查看全部WordPress迁移文章

主要实践一下,利用信号量来实现互斥与同步,在线程间,进程间的使用不同

一:信号量分为有名信号量,无名信号量,有一篇写得不错的博客

http://www.cnblogs.com/LubinLew/p/POSIX-semaphores.html

另外一篇关于信号量,锁,的文章

http://www.cnblogs.com/fuyunbiyi/p/3475602.html

简单来说,有名信号量由sem_open打开,其第一个参数是一个字符串,正是该信号量的名字;有名信号量由内核持续,正是因此多个进程间才能利用它;由于是内核持续,一个进程结束了,信号量并不会被关闭删除,因而最后一个退出的进程,要负责关闭有名信号量并且将其从内核中删除
无名信号量由sem_init打开,不需要名字,进程持续,进程结束了,信号量就没了,因此无名信号量往往在进程内使用,因此多用于线程间;sem_init()的第二个参数,解释是,0表示进程间不共享,1表示进程间共享;实践结果是1,多个进程间也无法共享,有些博客也说这个1是没用的;实际上还是有用的,看怎么用罢了,最后会使用共享内存的方式,用sem_init创建一个多进程间共享的信号量,实现同步与互斥

注意:以下所有程序编译时添加 -lpthread

二:多线程实现生产者消费者问题,修改信号量的初始值,达到“一放一取”的协同工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/*
empty,full,同于同步
mutex用于互斥
*/
#include <cstdio>
#include <cstring>
#include <cstdlib>
#include <time.h>
#include <semaphore.h>
#include <pthread.h>
#define BUFFERSIZE 10

int buffer[BUFFERSIZE],bufferSize;
int inPointer,outPointer;
sem_t empty,full,mutex;

// 生产者线程函数
void *Producer(void *name) {
for (int ith = 0; ith < 10; ith++) {
sem_wait(&empty);
sem_wait(&mutex);

int data = rand() % 1000 + 1;
buffer[inPointer] = data;
printf("%s 在缓冲区%d 放入数据 %d\n",(char*)name, inPointer, data);
inPointer = (inPointer + 1) % bufferSize;

sem_post(&mutex);
sem_post(&full);
}
}

// 消费者线程函数
void *Consumer(void *name) {
for (int ith = 0; ith < 10; ith++) {
sem_wait(&full);
sem_wait(&mutex);

int data = buffer[outPointer];
printf(" %s 在缓冲区%d 取出数据 %d\n",(char*)name, outPointer, data);
outPointer = (outPointer + 1) % bufferSize;

sem_post(&mutex);
sem_post(&empty);
}
}

int main() {
// 初始化缓冲区
memset(buffer, -1, sizeof(buffer));
bufferSize = 10;
inPointer = outPointer = 0;

// 初始化信号量
//一般情况下,empty应该初始化为buffsize
//但如果你想看到生产者,消费者“一放一取”地协同工作
//你可以将empty初始化为1
sem_init(&empty, 0, bufferSize);
//sem_init(&empty, 0, 1);
sem_init(&full, 0, 0);
sem_init(&mutex, 0, 1);

// 创建线程
char proName[] = "生产者1号";
char conName[] = "消费者1号";
pthread_t proThread, conThread;
int pt = pthread_create(&proThread, NULL, Producer, proName);
int ct = pthread_create(&conThread, NULL, Consumer, conName);
pthread_join(proThread, NULL);
pthread_join(conThread, NULL);

// 销毁信号量(释放资源)
sem_destroy(&empty);
sem_destroy(&full);
sem_destroy(&mutex);

return 0;

}

三:创建有名信号量,父进程fork出一个子进程,两者轮流打印自己的pid

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <unistd.h>
#include <cstdio>
#include <cstdio>
#include <cstring>
#include <semaphore.h>
#include <fcntl.h> //O_CREATE O_EXCL
#include <sys/stat.h>
#include <sys/wait.h>
#include <time.h>

#define OPEN_FLAG O_CREAT
#define OPEN_MODE 00777
#define INIT_V 0

int main() {
char mutex1_name[] = "PS_mutex1";
char mutex2_name[] = "PS_mutex2";
pid_t chipid = fork();

// 创建有名信号量
sem_t *mutex1 = sem_open(mutex1_name, OPEN_FLAG, OPEN_MODE, INIT_V);
sem_t *mutex2 = sem_open(mutex2_name, OPEN_FLAG, OPEN_MODE, INIT_V);

if (chipid < 0) {
puts("fork error");
return 0;
}
else if (chipid == 0) {
for (int i = 0; i < 10; i++) {
sem_wait(mutex2);
printf("(%d).child pid = %d\n", i,getpid());
sem_post(mutex1);
}
}
else {
for (int i = 0; i < 10; i++) {
sem_post(mutex2);
sem_wait(mutex1);
printf("(%d).father pid = %d\n", i,getpid());
sleep(1);
}
int stat;
wait(&stat); //父进程等待子进程结束防止子进程变为僵死进程,同时保证了父进程是最后结束的进程
sem_close(mutex1); // 从进程内关闭该信号量
sem_unlink(mutex1_name); // 从内核中删除该信号量,这才是真正的删除
sem_close(mutex2);
sem_unlink(mutex2_name);
}
return 0;
}

四:编写两个程序(这次不采用fork),使它们利用有名信号量,轮流写文件

这连个程序并没有考虑太多的错误处理,只是为了演示,所以先运行程序一,再运行程序二,否则会出错;因为程序一是创建有名信号量的,程序二只是打开

  • ss1.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <unistd.h>
#include <cstdio>
#include <cstdio>
#include <cstring>
#include <semaphore.h>
#include <fcntl.h> //O_CREATE O_EXCL
#include <sys/stat.h>
#include <sys/wait.h>
#include <time.h>

int main() {
char file_name[] = "share_file";
char mutex_name1[] = "psmutex1";
char mutex_name2[] = "psmutex2";

//用sem_open创建有名信号量,由内核持续,O_CREAT表示创建
sem_t *mutex1 = sem_open(mutex_name1, O_CREAT, 0644, 0);
sem_t *mutex2 = sem_open(mutex_name2, O_CREAT, 0644, 0);

for (int i = 0; i < 5; i++) {
sem_post(mutex2);
sem_wait(mutex1);
FILE *fp = fopen(file_name, "a+");
fprintf(fp, "(%d).semshare1 pid = %d\n", i,getpid());
fclose(fp);
}
sleep(3);
//关闭信号量,只是在进程层面关系,内核仍然持有该信号量
sem_close(mutex1);
sem_close(mutex2);
//unlink才是从内核中将该信号量删除
sem_unlink(mutex_name1);
sem_unlink(mutex_name2);
return 0;
}
  • ss2.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <unistd.h>
#include <cstdio>
#include <cstdio>
#include <cstring>
#include <semaphore.h>
#include <fcntl.h> //O_CREATE O_EXCL
#include <sys/stat.h>
#include <sys/wait.h>
#include <time.h>

int main() {
char file_name[] = "share_file";
char mutex_name1[] = "psmutex1";
char mutex_name2[] = "psmutex2";
// 打开已有的信号量
sem_t *mutex1 = sem_open(mutex_name1, O_EXCL);
sem_t *mutex2 = sem_open(mutex_name2, O_EXCL);

for (int i = 0; i < 5; i++) {
sem_wait(mutex2);
//printf("semshare2 pid = %d\n", getpid());
FILE *fp = fopen(file_name, "a+");
fprintf(fp, "(%d).semshare2 pid = %d\n", i,getpid());
fclose(fp);
sem_post(mutex1);
}
return 0;
}
  • 查看文件结果

五:使用共享内存

shm_open, ftruncate, mmap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#include <unistd.h>
#include <semaphore.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <cstdio>
#include <cstring>
#include <cstdlib>
#include <time.h>
using namespace std;

#define OFLAG O_CREAT|O_RDWR
#define MODE 00777

struct shm_ds { // 共享存储的数据结构
sem_t sem1;
sem_t sem2;
};

int main() {
char shm_name[] = "myshm";
// 创建一个共享存储对象,实际是一个文件,在/dev/shm下可以找到
int shm_fd = shm_open(shm_name, OFLAG, MODE);
if (shm_fd < 0) {
perror("shm_open error");
return 0;
}
// 切分文件到一个需要的大小,这里选取st_size=sizeof(shm_ds);
ftruncate(shm_fd, sizeof(shm_ds));
// 将刚才的共享文件映射到内存
shm_ds *p= (shm_ds *)mmap(NULL, sizeof(shm_ds), PROT_READ|PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (p == NULL) {
perror("mmap error");
return 0;
}
// 初始化信号量,注意第二个参数为1,多个进程间可共享该信号量
// 可以尝试将 第二个参数设置为0,观察一下会发生什么
sem_init(&(p->sem1), 1, 0);
sem_init(&(p->sem2), 1, 0);

pid_t child_pid = fork();
if (child_pid < 0) {
perror("fork error");
return 0;
}
else if (child_pid == 0) {
for (int i = 0; i < 5; i++) {
sem_wait(&(p->sem2));
printf("(%d). child pid=%d\n",i, getpid());
sem_post(&(p->sem1));
}
}
else {
for (int i = 0; i < 5; i++) {
sem_post(&(p->sem2));
sem_wait(&(p->sem1));
printf("(%d). father pid=%d\n",i, getpid());
sleep(1);
}
// 父进程等待子进程结束,这样防止将死进程且保证父进程最后结束
int stat;
wait(&stat);
// 一定不要删除掉那个共享文件(除非你有意保留)
// 如果不删除,程序结束后在 /dev/shm 下可以找到 "myshm" 这个文件;
// 下次还要创建 "myshm" 将失败
shm_unlink(shm_name);
}
}

附加:使用线程锁实现线程间的互斥

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/*
本程序使用线程锁(互斥锁),保证
tmp=sv, tmo+=1, sv=tmp;
是原子操作

互斥量分为下面三种:
快速型(PTHREAD_MUTEX_FAST_NP)。这种类型也是默认的类型。该线程的行为正如上面所说的。
递归型(PTHREAD_MUTEX_RECURSIVE_NP)。如果遇到我们上面所提到的死锁情况,同一线程循环给互斥量上锁,那么系统将会知道该上锁行为来自同一线程,那么就会同意线程给该互斥量上锁。
错误检测型(PTHREAD_MUTEX_ERRORCHECK_NP)。如果该互斥量已经被上锁,那么后续的上锁将会失败而不会阻塞,pthread_mutex_lock()操作将会返回EDEADLK。
*/

#include <cstdio>
#include <pthread.h>
#define LL long long

LL sharedValue;
pthread_mutex_t mutex; // = PTHREAD_MUTEX_INITIALIZER;

void *threadFunc(void *arg) {
LL tmp = 0LL;
for (int i = 0; i < 100000; i++) {
pthread_mutex_lock(&mutex);
tmp = sharedValue;
tmp += 1LL;
sharedValue = tmp;
pthread_mutex_unlock(&mutex);
}
}

int main() {
sharedValue = 0LL;

pthread_t p1,p2,p3;
// mutex = PTHREAD_MUTEX_INITIALIZER;
// 可以做到把一个互斥锁初始化为快速型
pthread_mutex_init(&mutex,PTHREAD_MUTEX_FAST_NP);
int pc1 = pthread_create(&p1, NULL, threadFunc, NULL);
int pc2 = pthread_create(&p2, NULL, threadFunc, NULL);
int pc3 = pthread_create(&p3, NULL, threadFunc, NULL);
pthread_join(p1,NULL);
pthread_join(p2,NULL);
pthread_join(p3,NULL);
// 销毁锁,即释放它所占有的资源
pthread_mutex_destroy(&mutex);

printf("sharedValue = %lld\n", sharedValue);
return 0;
}