Linux C++实现多线程同步的四种方式(超级详细)

背景问题:在特定的应用场景下,多线程不进行同步会造成什么问题?

通过多线程模拟多窗口售票为例:

include <iostream>

include<pthread.h>

include<stdio.h>

include<stdlib.h>

include<string.h>

include<unistd.h>

using namespace std;

int ticket_sum=20;

void *sell_ticket(void *arg)

{

for(int i=0; i<20; i++)

{

if(ticket_sum>0)

{

sleep(1);

cout<<"sell the "<<20-ticket_sum+1<<"th"<<endl;

ticket_sum--;

}

}

return 0;

}

int main()

{

int flag;

pthread_t tids[4];

for(int i=0; i<4; i++)

{

flag=pthread_create(&tids[i],NULL,&sell_ticket,NULL);

if(flag)

{

cout<<"pthread create error ,flag="<<flag<<endl;

return flag;

}

}

sleep(20);

void *ans;

for(int i=0; i<4; i++)

{

flag=pthread_join(tids[i],&ans);

if(flag)

{

cout<<"tid="<<tids[i]<<"join erro flag="<<flag<<endl;

return flag;

}

cout<<"ans="<<ans<<endl;

}

return 0;

}

分析:总票数只有20张,却卖出了23张,是非常明显的超买超卖问题,而造成这个问题的根本原因就是同时发生的各个线程都可以对ticket_sum进行读取和写入!

ps:

1.在并发情况下,指令执行的先后顺序由内核决定,同一个线程内部,指令按照先后顺序执行,但不同线程之间的指令很难说清楚是哪一个先执行,如果运行的结果依赖于不同线程执行的先后的话,那么就会形成竞争条件,在这样的情况下,计算的结果很难预知,所以应该尽量避免竞争条件的形成

2.最常见的解决竞争条件的方法是将原先分离的两个指令构成一个不可分割的原子操作,而其他任务不能插入到原子操作中!

3.对多线程来说,同步指的是在一定时间内只允许某一个线程访问某个资源,而在此时间内,不允许其他线程访问该资源!

4.线程同步的常见方法:互斥锁,条件变量,读写锁,信号量

一.互斥锁

本质就是一个特殊的全局变量,拥有lock和unlock两种状态,unlock的互斥锁可以由某个线程获得,一旦获得,这个互斥锁会锁上变成lock状态,此后只有该线程由权力打开该锁,其他线程想要获得互斥锁,必须得到互斥锁再次被打开之后

采用互斥锁来同步资源:

include <iostream>

include<pthread.h>

include<stdio.h>

include<stdlib.h>

include<string.h>

include<unistd.h>

using namespace std;

int ticket_sum=20;

pthread_mutex_t mutex_x=PTHREAD_MUTEX_INITIALIZER;//static init mutex

void *sell_ticket(void *arg)

{

for(int i=0; i<20; i++)

{

pthread_mutex_lock(&mutex_x);//atomic opreation through mutex lock

if(ticket_sum>0)

{

sleep(1);

cout<<"sell the "<<20-ticket_sum+1<<"th"<<endl;

ticket_sum--;

}

pthread_mutex_unlock(&mutex_x);

}

return 0;

}

int main()

{

int flag;

pthread_t tids[4];

for(int i=0; i<4; i++)

{

flag=pthread_create(&tids[i],NULL,&sell_ticket,NULL);

if(flag)

{

cout<<"pthread create error ,flag="<<flag<<endl;

return flag;

}

}

sleep(20);

void *ans;

for(int i=0; i<4; i++)

{

flag=pthread_join(tids[i],&ans);

if(flag)

{

cout<<"tid="<<tids[i]<<"join erro flag="<<flag<<endl;

return flag;

}

cout<<"ans="<<ans<<endl;

}

return 0;

}

分析:通过为售票的核心代码段加互斥锁使得其变成了一个原子性操作!不会被其他线程影响

1.互斥锁的初始化

互斥锁的初始化分为静态初始化和动态初始化

静态:pthread_mutex_t mutex_x=PTHREAD_MUTEX_INITIALIZER;//static init mutex

动态:pthread_mutex_init函数

ps:互斥锁静态初始化和动态初始化的区别?

待补充。。。。

2.互斥锁的相关属性及分类

pthread_mutexattr_init(pthread_mutexattr_t attr);
//销毁互斥锁属性
pthread_mutexattr_destroy(pthread_mutexattr_t attr);
//用于获取互斥锁属性
int pthread_mutexattr_getpshared(const pthread_mutexattr_t *restrict attr , int *restrict pshared);
//用于设置互斥锁属性
int pthread_mutexattr_setpshared(pthread_mutexattr_t *attr , int pshared);

attr表示互斥锁的属性

pshared表示互斥锁的共享属性,由两种取值:

1)PTHREAD_PROCESS_PRIVATE:锁只能用于一个进程内部的两个线程进行互斥(默认情况)

2)PTHREAD_PROCESS_SHARED:锁可用于两个不同进程中的线程进行互斥,使用时还需要在进程共享内存中分配互斥锁,然后为该互斥锁指定属性就可以了

互斥锁的分类:

int pthread_mutexattr_gettype(const pthread_mutexattr_t *restrict attr , int *restrict type);
//设置互斥锁类型
int pthread_mutexattr_settype(const pthread_mutexattr_t *restrict attr , int type);

参数type表示互斥锁的类型,总共有以下四种类型:

1.PTHREAD_MUTEX_NOMAL:标准互斥锁,第一次上锁成功,第二次上锁会失败并阻塞

2.PTHREAD_MUTEX_RECURSIVE:递归互斥锁,第一次上锁成功,第二次上锁还是会成功,可以理解为内部有一个计数器,每加一次锁计数器加1,解锁减1

3.PTHREAD_MUTEX_ERRORCHECK:检查互斥锁,第一次上锁会成功,第二次上锁出错返回错误信息,不会阻塞

4.PTHREAD_MUTEX_DEFAULT:默认互斥锁,第一次上锁会成功,第二次上锁会失败

3,测试加锁函数

int pthread_mutex_lock(&mutex):测试加锁函数在锁已经被占据时返回EBUSY而不是挂起等待,当然,如果锁没有被占领的话可以获得锁

为了清楚的看到两个线程争用资源的情况,我们使得其中一个函数使用测试加锁函数进行加锁,而另外一个使用正常的加锁函数进行加锁

include <iostream>

include<pthread.h>

include<stdio.h>

include<stdlib.h>

include<string.h>

include<unistd.h>

include<errno.h>

using namespace std;

int ticket_sum=20;

pthread_mutex_t mutex_x=PTHREAD_MUTEX_INITIALIZER;//static init mutex

void *sell_ticket_1(void *arg)

{

for(int i=0; i<20; i++)

{

pthread_mutex_lock(&mutex_x);

if(ticket_sum>0)

{

sleep(1);

cout<<"thread_1 sell the "<<20-ticket_sum+1<<"th ticket"<<endl;

ticket_sum--;

}

sleep(1);

pthread_mutex_unlock(&mutex_x);

sleep(1);

}

return 0;

}

void *sell_ticket_2(void *arg)

{

int flag;

for(int i=0; i<10; i++)

{

flag=pthread_mutex_trylock(&mutex_x);

if(flag==EBUSY)

{

cout<<"sell_ticket_2:the variable is locked by sell_ticket_1"<<endl;

}

else if(flag==0)

{

if(ticket_sum>0)

{

sleep(1);

cout<<"thread_2 sell the "<<20-ticket_sum+1<<"th tickets"<<endl;

ticket_sum--;

}

pthread_mutex_unlock(&mutex_x);

}

sleep(1);

}

return 0;

}

int main()

{

int flag;

pthread_t tids[2];

flag=pthread_create(&tids[0],NULL,&sell_ticket_1,NULL);

if(flag)

{

cout<<"pthread create error ,flag="<<flag<<endl;

return flag;

}

flag=pthread_create(&tids[1],NULL,&sell_ticket_2,NULL);

if(flag)

{

cout<<"pthread create error ,flag="<<flag<<endl;

return flag;

}

void *ans;

sleep(30);

flag=pthread_join(tids[0],&ans);

if(flag)

{

cout<<"tid="<<tids[0]<<"join erro flag="<<flag<<endl;

return flag;

}

else

{

cout<<"ans="<<ans<<endl;

}

flag=pthread_join(tids[1],&ans);

if(flag)

{

cout<<"tid="<<tids[1]<<"join erro flag="<<flag<<endl;

return flag;

}

else

{

cout<<"ans="<<ans<<endl;

}

return 0;

}

分析:通过测试加锁函数我们可以清晰的看到两个线程争用资源的情况

二.条件变量

互斥量不是万能的,比如某个线程正在等待共享数据内某个条件出现,可可能需要重复对数据对象加锁和解锁(轮询),但是这样轮询非常耗费时间和资源,而且效率非常低,所以互斥锁不太适合这种情况

我们需要这样一种方法:当线程在等待满足某些条件时使线程进入睡眠状态,一旦条件满足,就换线因等待满足特定条件而睡眠的线程

如果我们能够实现这样一种方法,程序的效率无疑会大大提高,而这种方法正是条件变量!

样例:

include <iostream>

include<pthread.h>

include<stdio.h>

include<stdlib.h>

include<string.h>

include<unistd.h>

include<errno.h>

using namespace std;

pthread_cond_t qready=PTHREAD_COND_INITIALIZER; //cond

pthread_mutex_t qlock=PTHREAD_MUTEX_INITIALIZER; //mutex

int x=10,y=20;

void *f1(void *arg)

{

cout<<"f1 start"<<endl;

pthread_mutex_lock(&qlock);

while(x<y)

{

pthread_cond_wait(&qready,&qlock);

}

pthread_mutex_unlock(&qlock);

sleep(3);

cout<<"f1 end"<<endl;

return 0;

}

void *f2(void *arg)

{

cout<<"f2 start"<<endl;

pthread_mutex_lock(&qlock);

x=20;

y=10;

cout<<"has a change,x="<<x<<" y="<<y<<endl;

pthread_mutex_unlock(&qlock);

if(x>y)

{

pthread_cond_signal(&qready);

}

cout<<"f2 end"<<endl;

return 0;

}

int main()

{

pthread_t tids[2];

int flag;

flag=pthread_create(&tids[0],NULL,f1,NULL);

if(flag)

{

cout<<"pthread 1 create error "<<endl;

return flag;

}

sleep(2);

flag=pthread_create(&tids[1],NULL,f2,NULL);

if(flag)

{

cout<<"pthread 2 create erro "<<endl;

return flag;

}

sleep(5);

return 0;

}

分析:线程1不满足条件被阻塞,然后线程2运行,改变了条件,线程2发行条件改变了通知线程1运行,然线程1不满足条件被阻塞,然后线程2运行,改变了条件,线程2发行条件改变了通知线程1运行,然后线程2结束,然后线程1继续运行,然后线程1结束,为了确保线程1先执行,在创建线程2之前我们sleep了2秒

ps:

1.条件变量通过运行线程阻塞和等待另一个线程发送信号的方法弥补互斥锁的不足,常常和互斥锁一起使用,使用时,条件变量被用来阻塞一个线程,当条件不满足时,线程往往解开响应的互斥锁并等待条件发生变化,一旦其他的某个线程改变了条件变量,它将通知响应的条件变量换线一个或多个正被此条件变量阻塞的线程,这些线程将重新锁定互斥锁并且重新测试条件是否满足

1.条件变量的相关函数

1)创建

静态方式:pthread_cond_t cond PTHREAD_COND_INITIALIZER

动态方式:int pthread_cond_init(&cond,NULL)

Linux thread 实现的条件变量不支持属性,所以NULL(cond_attr参数)

2)注销

int pthread_cond_destory(&cond)

只有没有线程在该条件变量上,该条件变量才能注销,否则返回EBUSY

因为Linux实现的条件变量没有分配什么资源,所以注销动作只包括检查是否有等待线程!(请参考条件变量的底层实现)

3)等待

条件等待:int pthread_cond_wait(&cond,&mutex)

计时等待:int pthread_cond_timewait(&cond,&mutex,time)

1.其中计时等待如果在给定时刻前条件没有被满足,则返回ETIMEOUT,结束等待

2.无论那种等待方式,都必须有一个互斥锁配合,以防止多个线程同时请求pthread_cond_wait形成竞争条件!

3.在调用pthread_cond_wait前必须由本线程加锁

4)激发

激发一个等待线程:pthread_cond_signal(&cond)

激发所有等待线程:pthread_cond_broadcast(&cond)

重要的是,pthread_cond_signal不会存在惊群效应,也就是是它最多给一个等待线程发信号,不会给所有线程发信号唤醒提他们,然后要求他们自己去争抢资源!

pthread_cond_signal会根据等待线程的优先级和等待时间来确定激发哪一个等待线程

下面看一个程序,找到程序存在的问题

include <iostream>

include<pthread.h>

include<stdio.h>

include<stdlib.h>

include<string.h>

include<unistd.h>

include<errno.h>

using namespace std;

pthread_cond_t taxi_cond=PTHREAD_COND_INITIALIZER; //taix arrive cond

pthread_mutex_t taxi_mutex=PTHREAD_MUTEX_INITIALIZER;// sync mutex

void *traveler_arrive(void *name)

{

cout<<"Traveler:"<<(char*)name<<" needs a taxi now!"<<endl;

pthread_mutex_lock(&taxi_mutex);

pthread_cond_wait(&taxi_cond,&taxi_mutex);

pthread_mutex_unlock(&taxi_mutex);

cout<<"Traveler:"<<(char*)name<<" now got a taxi!"<<endl;

pthread_exit((void*)0);

}

void *taxi_arrive(void *name)

{

cout<<"Taxi:"<<(char*)name<<" arriver."<<endl;

pthread_cond_signal(&taxi_cond);

pthread_exit((void*)0);

}

int main()

{

pthread_t tids[3];

int flag;

flag=pthread_create(&tids[0],NULL,taxi_arrive,(void*)("Jack"));

if(flag)

{

cout<<"pthread_create error:flag="<<flag<<endl;

return flag;

}

cout<<"time passing by"<<endl;

sleep(1);

flag=pthread_create(&tids[1],NULL,traveler_arrive,(void*)("Susan"));

if(flag)

{

cout<<"pthread_create error:flag="<<flag<<endl;

return flag;

}

cout<<"time passing by"<<endl;

sleep(1);

flag=pthread_create(&tids[2],NULL,taxi_arrive,(void*)("Mike"));

if(flag)

{

cout<<"pthread_create error:flag="<<flag<<endl;

return flag;

}

cout<<"time passing by"<<endl;

sleep(1);

void *ans;

for(int i=0; i<3; i++)

{

flag=pthread_join(tids[i],&ans);

if(flag)

{

cout<<"pthread_join error:flag="<<flag<<endl;

return flag;

}

cout<<"ans="<<ans<<endl;

}

return 0;

}

分析:程序由一个条件变量,用于提示乘客有出租车到达,还有一个同步锁,乘客到达之后就是等车(条件变量),出租车到达之后就是通知乘客,我们看到乘客Susan到达之后,并没有乘坐先到的Jack的车,而是等到Mike的车到了之后再乘坐Mike的车,Jack的车白白的闲置了,为什么会造成这种原因呢?分析一下代码:我们发现Jack出租车到达之后调用pthread_cond_signal(&taxi_cond)发现没有乘客,然后就直接结束线程了。。。。

正确的操作应该是:先到的Jack发现没有乘客,然后一直等待乘客,有乘客到了就直接走,而且我们应该统计一下乘客的数量

做如下改进:

1.增加乘客计数器,使得出租车在有乘客到达之后可以直接走,而不是又在原地等待别的乘客(僵死线程)

2.出租车到达函数加个while循环,没有乘客的时候一直等待,直到乘客到来

include <iostream>

include<pthread.h>

include<stdio.h>

include<stdlib.h>

include<string.h>

include<unistd.h>

include<errno.h>

using namespace std;

pthread_cond_t taxi_cond=PTHREAD_COND_INITIALIZER; //taix arrive cond

pthread_mutex_t taxi_mutex=PTHREAD_MUTEX_INITIALIZER;// sync mutex

void *traveler_arrive(void *name)

{

cout<<"Traveler:"<<(char*)name<<" needs a taxi now!"<<endl;

pthread_mutex_lock(&taxi_mutex);

pthread_cond_wait(&taxi_cond,&taxi_mutex);

pthread_mutex_unlock(&taxi_mutex);

cout<<"Traveler:"<<(char*)name<<" now got a taxi!"<<endl;

pthread_exit((void*)0);

}

void *taxi_arrive(void *name)

{

cout<<"Taxi:"<<(char*)name<<" arriver."<<endl;

pthread_exit((void*)0);

}

int main()

{

pthread_t tids[3];

int flag;

flag=pthread_create(&tids[0],NULL,taxi_arrive,(void*)("Jack"));

if(flag)

{

cout<<"pthread_create error:flag="<<flag<<endl;

return flag;

}

cout<<"time passing by"<<endl;

sleep(1);

flag=pthread_create(&tids[1],NULL,traveler_arrive,(void*)("Susan"));

if(flag)

{

cout<<"pthread_create error:flag="<<flag<<endl;

return flag;

}

cout<<"time passing by"<<endl;

sleep(1);

flag=pthread_create(&tids[2],NULL,taxi_arrive,(void*)("Mike"));

if(flag)

{

cout<<"pthread_create error:flag="<<flag<<endl;

return flag;

}

cout<<"time passing by"<<endl;

sleep(1);

void *ans;

for(int i=0; i<3; i++)

{

flag=pthread_join(tids[i],&ans);

if(flag)

{

cout<<"pthread_join error:flag="<<flag<<endl;

return flag;

}

cout<<"ans="<<ans<<endl;

}

return 0;

}

三.读写锁

可以多个线程同时读,但是不能多个线程同时写

1.读写锁比互斥锁更加具有适用性和并行性

2.读写锁最适用于对数据结构的读操作读操作次数多余写操作次数的场合!

3.锁处于读模式时可以线程共享,而锁处于写模式时只能独占,所以读写锁又叫做共享-独占锁

4.读写锁有两种策略:强读同步和强写同步

在强读同步中,总是给读者更高的优先权,只要写者没有进行写操作,读者就可以获得访问权限

在强写同步中,总是给写者更高的优先权,读者只能等到所有正在等待或者执行的写者完成后才能进行读

不同的系统采用不同的策略,比如航班订票系统使用强写同步,图书馆查阅系统采用强读同步

根据不同的业务场景,采用不同的策略

1)初始化的销毁读写锁

静态初始化:pthread_rwlock_t rwlock=PTHREAD_RWLOCK_INITIALIZER

动态初始化:int pthread_rwlock_init(rwlock,NULL),NULL代表读写锁采用默认属性

销毁读写锁:int pthread_rwlock_destory(rwlock)

在释放某个读写锁的资源之前,需要先通过pthread_rwlock_destory函数对读写锁进行清理。释放由pthread_rwlock_init函数分配的资源

如果你想要读写锁使用非默认属性,则attr不能为NULL,得给attr赋值

int pthread_rwlockattr_init(attr),给attr初始化

int pthread_rwlockattr_destory(attr),销毁attr

2)以写的方式获取锁,以读的方式获取锁,释放读写锁

int pthread_rwlock_rdlock(rwlock),以读的方式获取锁

int pthread_rwlock_wrlock(rwlock),以写的方式获取锁

int pthread_rwlock_unlock(rwlock),释放锁

上面两个获取锁的方式都是阻塞的函数,也就是说获取不到锁的话,调用线程不是立即返回,而是阻塞执行,在需要进行写操作的时候,这种阻塞式获取锁的方式是非常不好的,你想一下,我需要进行写操作,不但没有获取到锁,我还一直在这里等待,大大拖累效率

所以我们应该采用非阻塞的方式获取锁:

int pthread_rwlock_tryrdlock(rwlock)

int pthread_rwlock_trywrlock(rwlock)

读写锁的样例:

include <iostream>

include<pthread.h>

include<stdio.h>

include<stdlib.h>

include<string.h>

include<unistd.h>

include<errno.h>

using namespace std;

int num=5;

pthread_rwlock_t rwlock;

void *reader(void *arg)

{

pthread_rwlock_rdlock(&rwlock);

cout<<"reader "<<(long)arg<<" got the lock"<<endl;

pthread_rwlock_unlock(&rwlock);

return 0;

}

void *writer(void *arg)

{

pthread_rwlock_wrlock(&rwlock);

cout<<"writer "<<(long)arg<<" got the lock"<<endl;

pthread_rwlock_unlock(&rwlock);

return 0;

}

int main()

{

int flag;

long n=1,m=1;

pthread_t wid,rid;

pthread_attr_t attr;

flag=pthread_rwlock_init(&rwlock,NULL);

if(flag)

{

cout<<"rwlock init error"<<endl;

return flag;

}

pthread_attr_init(&attr);

pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);//thread sepatate

for(int i=0;i<num;i++)

{

if(i%3)

{

pthread_create(&rid,&attr,reader,(void *)n);

cout<<"create reader "<<n<<endl;

n++;

}else

{

pthread_create(&wid,&attr,writer,(void *)m);

cout<<"create writer "<<m<<endl;

m++;

}

}

sleep(5);//wait other done

return 0;

}

分析:3个读线程,2个写线程,读线程比写线程多

当读写锁是写状态时,在锁被解锁之前,所有试图对这个锁加锁的线程都会被阻塞

当读写锁是读状态时,在锁被解锁之前,所有视图以读模式对它进行加锁的线程都可以得到访问权,但是以写模式对它进行加锁的线程会被阻塞

所以读写锁默认是强读模式!

四.信号量

信号量(sem)和互斥锁的区别:互斥锁只允许一个线程进入临界区,而信号量允许多个线程进入临界区

1)信号量初始化

int sem_init(&sem,pshared,v)

pshared为0表示这个信号量是当前进程的局部信号量

pshared为1表示这个信号量可以在多个进程之间共享

v为信号量的初始值

成功返回0,失败返回-1

2)信号量值的加减

int sem_wait(&sem):以原子操作的方式将信号量的值减去1

int sem_post(&sem):以原子操作的方式将信号量的值加上1

3)对信号量进行清理

int sem_destory(&sem)

通过信号量模拟2个窗口,10个客人进行服务的过程

样例:

include <iostream>

include<pthread.h>

include<stdio.h>

include<stdlib.h>

include<string.h>

include<unistd.h>

include<errno.h>

include<semaphore.h>

using namespace std;

int num=10;

sem_t sem;

void *get_service(void *cid)

{

int id=*((int*)cid);

if(sem_wait(&sem)==0)

{

sleep(5);

cout<<"customer "<<id<<" get the service"<<endl;

cout<<"customer "<<id<<" done "<<endl;

sem_post(&sem);

}

return 0;

}

int main()

{

sem_init(&sem,0,2);

pthread_t customer[num];

int flag;

for(int i=0;i<num;i++)

{

int id=i;

flag=pthread_create(&customer[i],NULL,get_service,&id);

if(flag)

{

cout<<"pthread create error"<<endl;

return flag;

}else

{

cout<<"customer "<<i<<" arrived "<<endl;

}

sleep(1);

}

//wait all thread done

for(int j=0;j<num;j++)

{

pthread_join(customer[j],NULL);

}

sem_destroy(&sem);

return 0;

}

分析:信号量的值代表空闲的服务窗口,每个窗口一次只能服务一个人,有空闲窗口,开始服务前,信号量-1,服务完成后信号量+1

需要C/C++ Linux服务器开发学习资料加qun3223296726(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章