首頁 > 軟體

Linux 多執行緒條件變數同步

2020-06-16 17:29:48

條件變數是執行緒同步的另一種方式,實際上,條件變數是號誌的底層實現,這也就意味著,使用條件變數可以擁有更大的自由度,同時也就需要更加小心的進行同步操作。條件變數使用的條件本身是需要使用互斥量進行保護的,執行緒在改變條件狀態之前必須首先鎖住互斥量,其他執行緒在獲得互斥量之前不會察覺到這種改變,因為互斥量必須在鎖定之後才能計算條件。

模型

#include<pthread.h>
pthread_t cond              //準備條件變數
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;     //初始化靜態的條件變數
pthread_cond_init()         //初始化一個動態的條件變數
pthread_cond_wait()         //等待條件變數變為真
pthread_cond_timedwait()    //等待條件變數變為真,等待有時間限制。
pthread_cond_signal()       //至少喚醒一個等待該條件的執行緒
pthread_cond_broadcast()    //喚醒等待該條件的所有執行緒
pthread_cond_destroy()      //銷毀一個條件變數

pthread_cond_init()

//初始化一個動態的條件變數
//成功返回0,失敗返回error number
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);

cond:條件變數指標,這裡使用了restrict關鍵字
attr:條件變數屬性指標,預設屬性賦NULL

pthread_cond_wait() / pthread_cond_timedwait()

//等待條件變數為真。收到pthread_cond_broadcast()或pthread_cond_signal()就喚醒
//成功返回0,失敗返回error number
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);

條件變數的使在多執行緒的程式中,因為各個執行緒都可以存取大部分的進程資源,所以我們為了保證公有資源的使用是可以控制的,在一個執行緒開始使用公有資源之前要嘗試獲取互斥鎖,在使用完畢之後要釋放互斥鎖,這樣就能保證每個公共資源在一個時刻都只能被一個執行緒使用,在一定程度上得到了控制,但這並不能解決同步的問題,考慮如下兩個執行緒:

Q=create_queue();
pthread_t mutex
//執行緒A,入隊
while(1){
    lock(mutex);
    in_queue(Q);
    unlock(mutex);
}

//執行緒B,出隊
while(1){
    lock(mutex);
    out_queue(Q);
    unlock(mutex);
}

上述程式碼可以實現兩個執行緒的互斥,即同一時刻只有一個執行緒在使用公有資源-佇列。但如果執行緒B獲取了鎖,但佇列中是空的,它的out_queue()也就是沒有意義的,所以我們這裡更需要一種方法將兩個執行緒進行同步:只有當佇列中有資料的時候才進行出隊。
我們設計這樣一種邏輯:

//執行緒B,出隊
while(1){
    lock(mutex);
    if(佇列是空,執行緒不應該執行){
        釋放鎖;
        continue;
    }
    out_queue(Q);
    unlock(mutex);
}

這個程式就解決了上述的問題,即便執行緒B搶到了互斥鎖,但是如果佇列是空的,他就釋放鎖讓兩個執行緒重新搶鎖,希望這次執行緒A能搶到並往裡放一個資料。
但這個邏輯還有一個問題,就是多執行緒並行的問題,很有可能發生的一種情況是:執行緒B搶到了鎖,發現沒有資料,釋放鎖->執行緒A立即搶到了鎖並往裡放了一個資料->執行緒B執行continue,顯然,這種情況下是不應該continue的,因為執行緒B想要的條件在釋放鎖之後立即就被滿足了,它錯過了條件。
So,我們想一種反過來的邏輯:

//執行緒B,出隊
while(1){
    lock(mutex);
    if(佇列是空,執行緒不應該執行){
        continue;
        釋放鎖;
    }
    out_queue(Q);
    unlock(mutex);
}

顯然這種方法有個致命的問題:一旦continue了,執行緒B自己獲得鎖就沒有被釋放,這樣執行緒A不可能搶到鎖,而B繼續加鎖就會形成死鎖!
Finaly,我們希望看到一個函數fcn,如果條件不滿足,能同時釋放鎖+停止執行執行緒,如果條件滿足,自己當時獲得的鎖還在

//執行緒B,出隊
while(1){
    lock(mutex);
    fcn(當前執行緒不應該執行,mutex)     //if(當前執行緒不應該執行){釋放鎖“同時” continue;}
    out_queue(Q);
    unlock(mutex);
}

OK,這個就是pthread_cond_wait()的原理了,只不過它把continue變成了"休眠"這種由OS負責的操作,可以大大的節約資源。
當然,執行緒執行的條件是很難當作引數傳入一個函數的,POSIX多執行緒的模型使用系統提供的"條件變數"+"我們自己定義的具體條件" 來確定一個執行緒是否應該執行接下來的內容。"條件變數"只有,所以一種典型的多執行緒同步的結構如下

//執行緒B,出隊
while(1){
    lock(mutex);
    while(條件不滿足)
        pthread_cond_wait(cond,mutex)
        //獲得互斥鎖可以同時保護while裡的條件和cond的判斷,二者總是用一把鎖保護,並一同釋放  
        //cond為假,就休眠同時釋放鎖,等待被cond為真喚醒,把自己獲得的鎖拿回來           
        //拿回自己的鎖再檢查執行緒執行條件,條件不滿足繼續迴圈,直到條件滿足跳出迴圈
        //這個函數是帶著"執行緒的執行條件為真"+"cond為真"走出迴圈的
        //這個函數返回後cond被重新設定為0
    out_queue(Q);
    unlock(mutex);
}

pthread_cond_braoadcast()/pthread_cond_signal()

//使條件變數為真並喚醒wait中的執行緒,前者喚醒所有wait的,後者喚醒一個
//成功返回0,失敗返回error number
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);

pthread_cond_destroy()

//銷毀條件變數
//成功返回0,失敗返回error number
int pthread_cond_destroy(pthread_cond_t *cond);

例子-執行緒池


//thread.h
#ifndef __THREAD_H__
#define __THREAD_H__
#define THREAD_NUM 3
#define TASK_NUM 100


typedef struct{//每個節點的封裝格式,fcn是使用者自定義函數,arg是使用者自定義函數引數指標,保證通用性宣告為void
    void* (*fcn)(void* arg);
    void* arg;
}task_t;
typedef struct{        //使用者自定義函數的引數結構體
    int x;
}argfcn_t;

//#define LQ_DATA_T task_t*
#define LQ_DATA_T task_t

#endif  //__THREAD_H__

//lqueue.c
#include"thread.h"
#include"lqueue.h"
#include<stdlib.h>
...

//thread_pool.c
#include<stdio.h>   
#include<stdlib.h>
#include<string.h>
#include<pthread.h>
#include"thread.h"
#include"lqueue.h"

//互斥量和條件變數
pthread_mutex_t lock;
pthread_cond_t cond;


//全域性的表
lqueue_t* Q;

//每個執行緒的任務,必須是這個形式的函數指標
void* do_task(void* p){   
    task_t data;
    int ret=0;
    while(1){
        pthread_mutex_lock(&lock);
        while(is_empty_lqueue(Q)){          //大家收到廣播,因為延遲,可能醒了好幾個,要判斷一下是不是自己
            pthread_cond_wait(&cond,&lock);     //先搶到鎖再醒
        }
        ret=out_lqueue(Q,&data);
        pthread_mutex_unlock(&lock);
        data.fcn(data.arg);
    }
}

//建立執行緒池
void create_pool(void){
    //初始化佇列
    Q=create_lqueue();
    //初始化互斥量
    pthread_mutex_init(&lock,NULL);
    //初始化條件變數
    pthread_cond_init(&cond,NULL);
    int i=THREAD_NUM;
    pthread_t tid[THREAD_NUM];
    while(i--)
        pthread_create(&tid[i],NULL,do_task,NULL);
}



//準備函數
void* fcn(void* parg){                //使用者自定義的需要執行緒執行的函數
    argfcn_t* i=(argfcn_t*)parg;
    printf("this is task1n");
    printf("task1:%dn",i->x);
}

//新增任務
void pool_add_task(void*(*pfcn)(void*parg),void*arg){
    task_t task;
    task.fcn=pfcn;
    task.arg=arg;

    in_lqueue(Q,task);
    pthread_cond_signal(&cond); //新增了一個任務,用signal更好
}

int main(int argc, const char *argv[])
{
    //建立執行緒池
    create_pool();

    //準備引數
    argfcn_t argfcn;
    argfcn.x=5;

    //新增任務
    pool_add_task(fcn,(void*)&argfcn);  
    pool_add_task(fcn,(void*)&argfcn);  
    pool_add_task(fcn,(void*)&argfcn);  
    pause();    
    return 0;
}

本文永久更新連結地址http://www.linuxidc.com/Linux/2016-11/136664.htm


IT145.com E-mail:sddin#qq.com