2021-05-12 14:32:11
Linux 多執行緒條件變數同步
條件變數是執行緒同步的另一種方式,實際上,條件變數是號誌的底層實現,這也就意味著,使用條件變數可以擁有更大的自由度,同時也就需要更加小心的進行同步操作。條件變數使用的條件本身是需要使用互斥量進行保護的,執行緒在改變條件狀態之前必須首先鎖住互斥量,其他執行緒在獲得互斥量之前不會察覺到這種改變,因為互斥量必須在鎖定之後才能計算條件。
模型
#include<pthread.h>
pthread_t cond //準備條件變數
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; //初始化靜態的條件變數
pthread_cond_init() //初始化一個動態的條件變數
pthread_cond_wait() //等待條件變數變為真
pthread_cond_timedwait() //等待條件變數變為真,等待有時間限制。
pthread_cond_signal() //至少喚醒一個等待該條件的執行緒
pthread_cond_broadcast() //喚醒等待該條件的所有執行緒
pthread_cond_destroy() //銷毀一個條件變數
pthread_cond_init()
//初始化一個動態的條件變數
//成功返回0,失敗返回error number
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
cond:條件變數指標,這裡使用了restrict關鍵字
attr:條件變數屬性指標,預設屬性賦NULL
pthread_cond_wait() / pthread_cond_timedwait()
//等待條件變數為真。收到pthread_cond_broadcast()或pthread_cond_signal()就喚醒
//成功返回0,失敗返回error number
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
條件變數的使在多執行緒的程式中,因為各個執行緒都可以存取大部分的進程資源,所以我們為了保證公有資源的使用是可以控制的,在一個執行緒開始使用公有資源之前要嘗試獲取互斥鎖,在使用完畢之後要釋放互斥鎖,這樣就能保證每個公共資源在一個時刻都只能被一個執行緒使用,在一定程度上得到了控制,但這並不能解決同步的問題,考慮如下兩個執行緒:
Q=create_queue();
pthread_t mutex
//執行緒A,入隊
while(1){
lock(mutex);
in_queue(Q);
unlock(mutex);
}
//執行緒B,出隊
while(1){
lock(mutex);
out_queue(Q);
unlock(mutex);
}
上述程式碼可以實現兩個執行緒的互斥,即同一時刻只有一個執行緒在使用公有資源-佇列。但如果執行緒B獲取了鎖,但佇列中是空的,它的out_queue()也就是沒有意義的,所以我們這裡更需要一種方法將兩個執行緒進行同步:只有當佇列中有資料的時候才進行出隊。
我們設計這樣一種邏輯:
//執行緒B,出隊
while(1){
lock(mutex);
if(佇列是空,執行緒不應該執行){
釋放鎖;
continue;
}
out_queue(Q);
unlock(mutex);
}
這個程式就解決了上述的問題,即便執行緒B搶到了互斥鎖,但是如果佇列是空的,他就釋放鎖讓兩個執行緒重新搶鎖,希望這次執行緒A能搶到並往裡放一個資料。
但這個邏輯還有一個問題,就是多執行緒並行的問題,很有可能發生的一種情況是:執行緒B搶到了鎖,發現沒有資料,釋放鎖->執行緒A立即搶到了鎖並往裡放了一個資料->執行緒B執行continue,顯然,這種情況下是不應該continue的,因為執行緒B想要的條件在釋放鎖之後立即就被滿足了,它錯過了條件。
So,我們想一種反過來的邏輯:
//執行緒B,出隊
while(1){
lock(mutex);
if(佇列是空,執行緒不應該執行){
continue;
釋放鎖;
}
out_queue(Q);
unlock(mutex);
}
顯然這種方法有個致命的問題:一旦continue了,執行緒B自己獲得鎖就沒有被釋放,這樣執行緒A不可能搶到鎖,而B繼續加鎖就會形成死鎖!
Finaly,我們希望看到一個函數fcn,如果條件不滿足,能同時釋放鎖+停止執行執行緒,如果條件滿足,自己當時獲得的鎖還在
//執行緒B,出隊
while(1){
lock(mutex);
fcn(當前執行緒不應該執行,mutex) //if(當前執行緒不應該執行){釋放鎖“同時” continue;}
out_queue(Q);
unlock(mutex);
}
OK,這個就是pthread_cond_wait()的原理了,只不過它把continue變成了"休眠"這種由OS負責的操作,可以大大的節約資源。
當然,執行緒執行的條件是很難當作引數傳入一個函數的,POSIX多執行緒的模型使用系統提供的"條件變數"+"我們自己定義的具體條件" 來確定一個執行緒是否應該執行接下來的內容。"條件變數"只有真和假,所以一種典型的多執行緒同步的結構如下
//執行緒B,出隊
while(1){
lock(mutex);
while(條件不滿足)
pthread_cond_wait(cond,mutex)
//獲得互斥鎖可以同時保護while裡的條件和cond的判斷,二者總是用一把鎖保護,並一同釋放
//cond為假,就休眠同時釋放鎖,等待被cond為真喚醒,把自己獲得的鎖拿回來
//拿回自己的鎖再檢查執行緒執行條件,條件不滿足繼續迴圈,直到條件滿足跳出迴圈
//這個函數是帶著"執行緒的執行條件為真"+"cond為真"走出迴圈的
//這個函數返回後cond被重新設定為0
out_queue(Q);
unlock(mutex);
}
pthread_cond_braoadcast()/pthread_cond_signal()
//使條件變數為真並喚醒wait中的執行緒,前者喚醒所有wait的,後者喚醒一個
//成功返回0,失敗返回error number
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
pthread_cond_destroy()
//銷毀條件變數
//成功返回0,失敗返回error number
int pthread_cond_destroy(pthread_cond_t *cond);
例子-執行緒池
//thread.h
#ifndef __THREAD_H__
#define __THREAD_H__
#define THREAD_NUM 3
#define TASK_NUM 100
typedef struct{//每個節點的封裝格式,fcn是使用者自定義函數,arg是使用者自定義函數引數指標,保證通用性宣告為void
void* (*fcn)(void* arg);
void* arg;
}task_t;
typedef struct{ //使用者自定義函數的引數結構體
int x;
}argfcn_t;
//#define LQ_DATA_T task_t*
#define LQ_DATA_T task_t
#endif //__THREAD_H__
//lqueue.c
#include"thread.h"
#include"lqueue.h"
#include<stdlib.h>
...
//thread_pool.c
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<pthread.h>
#include"thread.h"
#include"lqueue.h"
//互斥量和條件變數
pthread_mutex_t lock;
pthread_cond_t cond;
//全域性的表
lqueue_t* Q;
//每個執行緒的任務,必須是這個形式的函數指標
void* do_task(void* p){
task_t data;
int ret=0;
while(1){
pthread_mutex_lock(&lock);
while(is_empty_lqueue(Q)){ //大家收到廣播,因為延遲,可能醒了好幾個,要判斷一下是不是自己
pthread_cond_wait(&cond,&lock); //先搶到鎖再醒
}
ret=out_lqueue(Q,&data);
pthread_mutex_unlock(&lock);
data.fcn(data.arg);
}
}
//建立執行緒池
void create_pool(void){
//初始化佇列
Q=create_lqueue();
//初始化互斥量
pthread_mutex_init(&lock,NULL);
//初始化條件變數
pthread_cond_init(&cond,NULL);
int i=THREAD_NUM;
pthread_t tid[THREAD_NUM];
while(i--)
pthread_create(&tid[i],NULL,do_task,NULL);
}
//準備函數
void* fcn(void* parg){ //使用者自定義的需要執行緒執行的函數
argfcn_t* i=(argfcn_t*)parg;
printf("this is task1n");
printf("task1:%dn",i->x);
}
//新增任務
void pool_add_task(void*(*pfcn)(void*parg),void*arg){
task_t task;
task.fcn=pfcn;
task.arg=arg;
in_lqueue(Q,task);
pthread_cond_signal(&cond); //新增了一個任務,用signal更好
}
int main(int argc, const char *argv[])
{
//建立執行緒池
create_pool();
//準備引數
argfcn_t argfcn;
argfcn.x=5;
//新增任務
pool_add_task(fcn,(void*)&argfcn);
pool_add_task(fcn,(void*)&argfcn);
pool_add_task(fcn,(void*)&argfcn);
pause();
return 0;
}
本文永久更新連結地址:http://www.linuxidc.com/Linux/2016-11/136664.htm
相關文章