2021-05-12 14:32:11
Linux核心情景分析之訊息佇列
2020-06-16 17:29:49
早期的Unix通訊只有管道與信號,管道的缺點:
所載送的資訊是無格式的位元組流,不知道分界線在哪,也沒通訊規範,另外缺乏控制手段,比如保溫優先順序,管道機制的大小只有1頁,管道很容易寫滿而讀取沒有及時,傳送者只能休眠,強化了管道機制同步要求.另外管道機制開銷不少,尤其是當傳送資訊量很少時,平均每個位元組所耗費的代價非常高
Linux核心為系統IPC提供了一個統一的系統呼叫ipc()
int ipc(unsignedint call,int firtst,int second,int third,void*ptr,int firth);
其中第一個引數為具體的操作碼
#define SEMOP 1
#define SEMGET 2
#define SEMCTL 3
#define MSGSND 11
#define MSGRCV 12
#define MSGGET 13
#define MSGCTL 14
#define SHMAT 21
#define SHMDT 22
#define SHMGET 23
#define SHMCTL 24
操作碼SEM開頭的都是號誌設定,MSG開頭則是訊息佇列,"SHM"開頭則是共用記憶體區設定,其他引數因具體操作而不同,不過為了方便使用,c語言庫函數分別提供了semget,msgget,msgsnd等庫函數,這些庫函數把其呼叫統一成了系統呼叫ipc.
核心入口為sys_ipc()
訊息佇列並沒有納入檔案系統範疇,所以並不佔用開啟檔案號,核心有個全域性的資料結構msg_ids.專門用來管理報文佇列
static struct ipc_ids msg_ids;
struct ipc_ids {
int size;
int in_use;
int max_id;
unsignedshort seq;
unsignedshort seq_max;
struct semaphore sem;
spinlock_t ary;
struct ipc_id* entries;//指向一個結構陣列
};
//ipc_id如下定義
struct ipc_id {
struct kern_ipc_perm* p;
};
kern_ipc_perm結構如下定義
/* used by in-kernel data structures */
struct kern_ipc_perm
{
key_t key;//建值
uid_t uid;
gid_t gid;
uid_t cuid;
gid_t cgid;
mode_t mode;
unsignedlong seq;
};
每個報文佇列都有個佇列頭,msg_queue資料結構
/* one msq_queue structure for each present queue on the system */
struct msg_queue {
struct kern_ipc_perm q_perm;
time_t q_stime;/* last msgsnd time */
time_t q_rtime;/* last msgrcv time */
time_t q_ctime;/* last change time */
unsignedlong q_cbytes;/* current number of bytes on queue */
unsignedlong q_qnum;/* number of messages in queue */
unsignedlong q_qbytes;/* max number of bytes on queue */
pid_t q_lspid;/* pid of last msgsnd */
pid_t q_lrpid;/* last receive pid */
struct list_head q_messages;
struct list_head q_receivers;
struct list_head q_senders;
};
每個msg_queue資料結構唯一對應著一個報文佇列,這些資料結構以及結構之間的關係可以如下總結:
全域性ipc_ids資料結構的msg_ids是系統中所有報文佇列的總根,總跟的指標entries指向一個ipc_id結構陣列,陣列中的每個元素都是ipc_id資料結構,結構中有個指標p指向一個kern_ipc_perm資料結構,由於kern_ipc_perm資料結構是報文佇列頭msg_queue的第一個成分,陣列元素的指標p實際指向一個報文對哦,陣列大小已經決定了可以建立的報文佇列數目
訊息佇列的建立或獲取(msgget)
//可以用於2個目的,通過給定的key建立佇列,通過給定的key查詢已存在佇列
asmlinkage long sys_msgget (key_t key,int msgflg)
{
int id, ret =-EPERM;
struct msg_queue *msq;//佇列
down(&msg_ids.sem);
if(key == IPC_PRIVATE)//自己私用,無條件建立一個報文佇列
ret = newque(key, msgflg);//根據key與msgflg
elseif((id = ipc_findkey(&msg_ids, key))==-1){/* key沒有找到key not used */
if(!(msgflg & IPC_CREAT))//沒找到,但沒設定IPC_CREAT那就返回錯誤
ret =-ENOENT;
else//設定了就建立報文佇列
ret = newque(key, msgflg);
}elseif(msgflg & IPC_CREAT && msgflg & IPC_EXCL){//同時設定了IPC_CREAT與IPC_EXCL返回錯誤
ret =-EEXIST;
}else{
msq = msg_lock(id);
if(msq==NULL)
BUG();
if(ipcperms(&msq->q_perm, msgflg))//檢查存取許可權是否符合規則
ret =-EACCES;
else
ret = msg_buildid(id, msq->q_perm.seq);//將陣列下標轉換一體化的標識號
msg_unlock(id);
}
up(&msg_ids.sem);
return ret;//返回標識號
}
檢視具體的佇列建立函數newque()
staticint newque (key_t key,int msgflg)
{
int id;
struct msg_queue *msq;//佇列頭
msq =(struct msg_queue *) kmalloc (sizeof(*msq), GFP_KERNEL);//分配結構
if(!msq)
return-ENOMEM;
id = ipc_addid(&msg_ids,&msq->q_perm, msg_ctlmni);//分配一個標識號
if(id ==-1){
kfree(msq);
return-ENOSPC;
}//以下是報文佇列頭各種初始化
msq->q_perm.mode =(msgflg & S_IRWXUGO);
msq->q_perm.key = key;//key值
msq->q_stime = msq->q_rtime =0;
msq->q_ctime = CURRENT_TIME;
msq->q_cbytes = msq->q_qnum =0;
msq->q_qbytes = msg_ctlmnb;
msq->q_lspid = msq->q_lrpid =0;
INIT_LIST_HEAD(&msq->q_messages);
INIT_LIST_HEAD(&msq->q_receivers);
INIT_LIST_HEAD(&msq->q_senders);
msg_unlock(id);
//將標識號轉換為一個一體化的標識號,因為實際分配的id實際是陣列下標會重複使用
return msg_buildid(id,msq->q_perm.seq);
- }
每個已建立的報文佇列由一個標識號來代表,報文佇列的標識號是全域性的,所以必須把這全域性範圍內的唯一性,標識號由ipc_addid()分配,
/**
* ipc_addid - add an IPC identifier
* @ids: IPC identifier set
* @new: new IPC permission set
* @size: new size limit for the id array
*
* Add an entry 'new' to the IPC arrays. The permissions object is
* initialised and the first free entry is set up and the id assigned
* is returned. The list is returned in a locked state on success.
* On failure the list is not locked and -1 is returned.
*/
//全域性佇列管理結構 新建立的佇列頭的這個結構
int ipc_addid(struct ipc_ids* ids,struct kern_ipc_perm*new,int size)
{
int id;
size = grow_ary(ids,size);//增加管理陣列的大小
for(id =0; id < size; id++){
if(ids->entries[id].p == NULL)//總根看管理陣列的哪個為空
goto found;
}
return-1;
found:
ids->in_use++;//已使用++
if(id > ids->max_id)//最大的key值
ids->max_id = id;
new->cuid =new->uid = current->euid;//uid賦值
new->gid =new->cgid = current->egid;//gid賦值
new->seq = ids->seq++;//序列號
if(ids->seq > ids->seq_max)//超過了最大的限定
ids->seq =0;//從0開始繼續
spin_lock(&ids->ary);
ids->entries[id].p =new;//掛鉤,新建立的報文頭與總跟掛鉤成功
return id;
}
庫函數msgsnd-報文傳送
參與通訊雙方通過mssget建立了一個報文佇列後或取得該佇列的標識號以後就可以向該佇列傳送或接收報文了
傳送格式為msgbuf
/* message buffer for msgsnd and msgrcv calls */
struct msgbuf {
long mtype;/* type of message */
char mtext[1];/* message text */
};
核心儲存訊息佇列的格式
/* one msg_msg structure for each message */
struct msg_msg {
struct list_head m_list;
long m_type;//型別
int m_ts;/*長度 message text size */
struct msg_msgseg* next;
/* the actual message follows immediately */
};
struct msg_msgseg {
struct msg_msgseg* next;
/* the next part of the message follows immediately */
};
分配一頁,第一個除去msg_msg其他的用於儲存訊息,如果不夠繼續分配一頁除去msg_msg_seq以此類推
//標識號 傳送格式 //大小 //標誌位
asmlinkage long sys_msgsnd (int msqid,struct msgbuf *msgp,size_t msgsz,int msgflg)
{
struct msg_queue *msq;//佇列頭
struct msg_msg *msg;//核心儲存資訊的格式
long mtype;
int err;
//訊息不可以超過8k
if(msgsz > msg_ctlmax ||(long) msgsz <0|| msqid <0)
return-EINVAL;
if(get_user(mtype,&msgp->mtype))//從使用者空間拷貝到核心
return-EFAULT;
if(mtype <1)
return-EINVAL;
msg = load_msg(msgp->mtext, msgsz);//分配緩衝區儲存訊息(從使用者拷貝到核心)
if(IS_ERR(msg))
return PTR_ERR(msg);
msg->m_type = mtype;//訊息型別
msg->m_ts = msgsz;//訊息大小
msq = msg_lock(msqid);//根據給定的標號msg_msg找到相應的報文佇列,將其資料結構上鎖
err=-EINVAL;
if(msq==NULL)
goto out_free;
retry:
err=-EIDRM;
if(msg_checkid(msq,msqid))//驗證下id號
goto out_unlock_free;
err=-EACCES;
if(ipcperms(&msq->q_perm, S_IWUGO))//檢查是否有許可權向這個佇列傳送報文
goto out_unlock_free;
//當前報文大小+當前佇列統計的位元組數超過了報文佇列的總容量.或者報文的個數超過了限制,那就不可以傳送了
if(msgsz + msq->q_cbytes > msq->q_qbytes ||
1+ msq->q_qnum > msq->q_qbytes){
struct msg_sender s;
if(msgflg&IPC_NOWAIT){//是否等待,不等待直接退出
err=-EAGAIN;
goto out_unlock_free;
}
ss_add(msq,&s);//掛載到報文佇列q_sender鏈,這樣可以通過此鏈找到休眠正在等待傳送的進程
msg_unlock(msqid);
schedule();//排程
current->state= TASK_RUNNING;
msq = msg_lock(msqid);
err =-EIDRM;
if(msq==NULL)
goto out_free;
ss_del(&s);//刪除
if(signal_pending(current)){
err=-EINTR;
goto out_unlock_free;
}
goto retry;//重新執行一遍
}
if(!pipelined_send(msq,msg)){//如果有相關進程正在讀這個報文就不用放入佇列了
/* noone is waiting for this message, enqueue it */
list_add_tail(&msg->m_list,&msq->q_messages);//鏈入佇列
msq->q_cbytes += msgsz;//總數++
msq->q_qnum++;//數目++
atomic_add(msgsz,&msg_bytes);
atomic_inc(&msg_hdrs);
}
err =0;
msg = NULL;
msq->q_lspid = current->pid;
msq->q_stime = CURRENT_TIME;
out_unlock_free:
msg_unlock(msqid);
out_free:
if(msg!=NULL)
free_msg(msg);
return err;
}
load_msg用於訊息分割,在核心儲存,見上圖
//報文的源
staticstruct msg_msg* load_msg(void* src,int len)
{
struct msg_msg* msg;
struct msg_msgseg** pseg;
int err;
int alen;
alen = len;
if(alen > DATALEN_MSG)//一頁減去msg結構大小
alen = DATALEN_MSG;
msg =(struct msg_msg *) kmalloc (sizeof(*msg)+ alen, GFP_KERNEL);//一個報文的頭
if(msg==NULL)
return ERR_PTR(-ENOMEM);
msg->next = NULL;
if(copy_from_user(msg+1, src, alen)){//從msg的尾部開始拷貝
err =-EFAULT;
goto out_err;
}
len -= alen;//剩餘長度
src =((char*)src)+alen;//剩餘源頭
pseg =&msg->next;//指向下一個頁
while(len >0){//還有剩餘長度
struct msg_msgseg* seg;
alen = len;
if(alen > DATALEN_SEG)//是否超過page-msgseg
alen = DATALEN_SEG;
seg =(struct msg_msgseg *) kmalloc (sizeof(*seg)+ alen, GFP_KERNEL);//獲取一頁
if(seg==NULL){
err=-ENOMEM;
goto out_err;
}
*pseg = seg;//連結
seg->next = NULL;
if(copy_from_user (seg+1, src, alen)){//繼續拷貝
err =-EFAULT;
goto out_err;
}
pseg =&seg->next;
len -= alen;
src =((char*)src)+alen;
}
return msg;
out_err:
free_msg(msg);
return ERR_PTR(err);
}
/* one msg_sender for each sleeping sender */
struct msg_sender {
struct list_head list;
struct task_struct* tsk;
};
ss_add(msq, &s);//掛載到報文佇列q_sender鏈,這樣可以通過此鏈找到休眠正在等待傳送的進程
staticinlinevoid ss_add(struct msg_queue* msq,struct msg_sender* mss)
{
mss->tsk=current;//獲取當前佇列的進程執行的進程
current->state=TASK_INTERRUPTIBLE;//設定為可中斷睡眠狀態
list_add_tail(&mss->list,&msq->q_senders);//新增到隊尾
}
如果有相關進程正在讀這個報文就不用放入佇列了
intinline pipelined_send(struct msg_queue* msq,struct msg_msg* msg)
{
struct list_head* tmp;
tmp = msq->q_receivers.next;//聚集正在睡眠等待接收的讀進程
while(tmp !=&msq->q_receivers){//表示有
struct msg_receiver* msr;
msr = list_entry(tmp,struct msg_receiver,r_list);
tmp = tmp->next;
if(testmsg(msg,msr->r_msgtype,msr->r_mode)){//型別是否匹配
list_del(&msr->r_list);
if(msr->r_maxsize < msg->m_ts){//讀的緩衝區是否夠用
msr->r_msg = ERR_PTR(-E2BIG);
wake_up_process(msr->r_tsk);//不夠用則將進程喚醒,讓其出錯返回
}else{
msr->r_msg = msg;//有的話,直接讀取
msq->q_lspid = msr->r_tsk->pid;
msq->q_rtime = CURRENT_TIME;
wake_up_process(msr->r_tsk);
return1;
}
}
}
return0;
}
訊息佇列接收msgrcv
//標識號 使用者空間緩衝區 大小
asmlinkage long sys_msgrcv (int msqid,struct msgbuf *msgp,size_t msgsz,
long msgtyp,int msgflg)//msgtyp訊息型別,
{
struct msg_queue *msq;//佇列頭
struct msg_receiver msr_d;//接收的需要睡眠的進程
struct list_head* tmp;
struct msg_msg* msg,*found_msg;
int err;
int mode;
if(msqid <0||(long) msgsz <0)
return-EINVAL;
mode = convert_mode(&msgtyp,msgflg);
msq = msg_lock(msqid);//根據報文佇列標識號,找到具體佇列
if(msq==NULL)
return-EINVAL;
retry:
err=-EACCES;
if(ipcperms (&msq->q_perm, S_IRUGO))//檢測是否具有許可權
goto out_unlock;
tmp = msq->q_messages.next;
found_msg=NULL;
while(tmp !=&msq->q_messages){//遍歷完
msg = list_entry(tmp,struct msg_msg,m_list);//根據佇列當前項找到其指標
if(testmsg(msg,msgtyp,mode)){
found_msg = msg;//查詢到了訊息
if(mode == SEARCH_LESSEQUAL && msg->m_type !=1){
found_msg=msg;
msgtyp=msg->m_type-1;//將type減到比這個報文的型別值更小,看能否找到更小的
}else{
found_msg=msg;
break;
}
}
tmp = tmp->next;
}
if(found_msg){
msg=found_msg;
if((msgsz < msg->m_ts)&&!(msgflg & MSG_NOERROR)){//如果接收大小小於報文大小,出錯
err=-E2BIG;
goto out_unlock;
}
list_del(&msg->m_list);//否則將該報文從佇列脫鏈
msq->q_qnum--;
msq->q_rtime = CURRENT_TIME;
msq->q_lrpid = current->pid;
msq->q_cbytes -= msg->m_ts;
atomic_sub(msg->m_ts,&msg_bytes);
atomic_dec(&msg_hdrs);
ss_wakeup(&msq->q_senders,0);//將傳送的睡眠等待進程全部喚醒,因為拿出了一個報文
msg_unlock(msqid);
out_success:
msgsz =(msgsz > msg->m_ts)? msg->m_ts : msgsz;
if(put_user (msg->m_type,&msgp->mtype)||//實際接收的報文型別,通過put_user送回使用者空間
store_msg(msgp->mtext, msg, msgsz)){//將實際接收到的複製到使用者空間
msgsz =-EFAULT;
}
free_msg(msg);//釋放核心快取
return msgsz;
}else//報文佇列還沒有報文可供接收
{
struct msg_queue *t;
/* no message waiting. Prepare for pipelined
* receive.
*/
if(msgflg & IPC_NOWAIT){//不等待直接返回錯誤
err=-ENOMSG;
goto out_unlock;
}
list_add_tail(&msr_d.r_list,&msq->q_receivers);//鏈入等待接收佇列進程
msr_d.r_tsk = current;//儲存各種資訊
msr_d.r_msgtype = msgtyp;
msr_d.r_mode = mode;
if(msgflg & MSG_NOERROR)
msr_d.r_maxsize = INT_MAX;
else
msr_d.r_maxsize = msgsz;
msr_d.r_msg = ERR_PTR(-EAGAIN);
current->state = TASK_INTERRUPTIBLE;
msg_unlock(msqid);//解鎖並排程
//當前進程一旦睡下,以下需要等待進程通過pipelined_send()向其傳送報文,並且選擇這個進程作為接收進程才會被喚醒
schedule();
current->state = TASK_RUNNING;
msg =(struct msg_msg*) msr_d.r_msg;
if(!IS_ERR(msg))//表示已經成功接收
goto out_success;
//以下是因為緩衝區太小,喚醒了睡眠進程依舊無法接收,而是被信號喚醒的錯誤處理
t = msg_lock(msqid);//對報文加鎖,隱藏著等待,可能被其他進程搶先鎖住該佇列
if(t==NULL)
msqid=-1;
msg =(struct msg_msg*)msr_d.r_msg;
if(!IS_ERR(msg)){//在鎖住佇列之前,還有可能接收到其他進程pipelined_send發來的報文
/* our message arived while we waited for
* the spinlock. Process it.
*///所以還需要檢查下是否成功接收到報文
if(msqid!=-1)
msg_unlock(msqid);
goto out_success;
}
err = PTR_ERR(msg);
if(err ==-EAGAIN){//要將本進程的msg_receiver結構拖鏈,並且看是否有信號處理
if(msqid==-1)
BUG();
list_del(&msr_d.r_list);
if(signal_pending(current))//如果沒有信號處理,則跳轉到retry重新開始
err=-EINTR;
else
goto retry;
}
}
out_unlock:
if(msqid!=-1)
msg_unlock(msqid);
return err;
}
msgctl()報文機制的控制與設定
管道的缺點在於缺乏對管道的控制手段,也缺乏獲取其狀態的資訊(例如,有多少個位元組已經在管道中等待讀取)的手段,msgctl就是為報文佇列提供了這樣的手段
long sys_msgctl(int msqid,int cmd,struct msqid_ds*buf);
cmd為具體的操作碼
/*
* Control commands used with semctl, msgctl and shmctl
* see also specific commands in sem.h, msg.h and shm.h
*/
#define IPC_RMID 0/* remove resource關閉報文佇列 */
#define IPC_SET 1/* set ipc_perm options 改變ipc設施的相關狀態與屬性*/
#define IPC_STAT 2/* get ipc_perm options 獲取狀態*/
#define IPC_INFO 3/* see ipcs 統計資訊*/
以上命令碼並不是專門為報文佇列設定,也適用於sysv ipc的其他兩種機制,對於具體的機制還可能補充其他專用命令.
msg專用命令
/* ipcs ctl commands */
#define MSG_STAT 11
#define MSG_INFO 12
呼叫引數buf是一個msgid_ds結構指標,此結構適用於ipc_stat與ipc_set.
/* Obsolete, used only for backwards compatibility and libc5 compiles */
struct msqid_ds {
struct ipc_perm msg_perm;
struct msg *msg_first;/* first message on queue,unused */
struct msg *msg_last;/* last message in queue,unused */
__kernel_time_t msg_stime;/* last msgsnd time */
__kernel_time_t msg_rtime;/* last msgrcv time */
__kernel_time_t msg_ctime;/* last change time */
unsignedlong msg_lcbytes;/* Reuse junk fields for 32 bit */
unsignedlong msg_lqbytes;/* ditto */
unsignedshort msg_cbytes;/* current number of bytes on queue */
unsignedshort msg_qnum;/* number of messages in queue */
unsignedshort msg_qbytes;/* max number of bytes on queue */
__kernel_ipc_pid_t msg_lspid;/* pid of last msgsnd */
__kernel_ipc_pid_t msg_lrpid;/* last receive pid */
};
當命令為ipc_info時,指向一個msginfo結構
/* buffer for msgctl calls IPC_INFO, MSG_INFO */
struct msginfo {
int msgpool;
int msgmap;
int msgmax;
int msgmnb;
int msgmni;
int msgssz;
int msgtql;
unsignedshort msgseg;
};
asmlinkage long sys_msgctl (int msqid,int cmd,struct msqid_ds *buf)
{
int err, version;
struct msg_queue *msq;
struct msq_setbuf setbuf;
struct kern_ipc_perm *ipcp;
if(msqid <0|| cmd <0)
return-EINVAL;
version = ipc_parse_version(&cmd);//判斷是64位元版本還是32位元版本
switch(cmd){//根據不同型別選擇不同操作
case IPC_INFO://合在一起
case MSG_INFO:
{
struct msginfo msginfo;
int max_id;
if(!buf)
return-EFAULT;
/* We must not return kernel stack data.
* due to padding, it's not enough
* to set all member fields.
*/
memset(&msginfo,0,sizeof(msginfo));
msginfo.msgmni = msg_ctlmni;
msginfo.msgmax = msg_ctlmax;
msginfo.msgmnb = msg_ctlmnb;
msginfo.msgssz = MSGSSZ;
msginfo.msgseg = MSGSEG;
down(&msg_ids.sem);
if(cmd == MSG_INFO){
msginfo.msgpool = msg_ids.in_use;
msginfo.msgmap = atomic_read(&msg_hdrs);
msginfo.msgtql = atomic_read(&msg_bytes);
}else{
msginfo.msgmap = MSGMAP;
msginfo.msgpool = MSGPOOL;
msginfo.msgtql = MSGTQL;
}
max_id = msg_ids.max_id;
up(&msg_ids.sem);
if(copy_to_user (buf,&msginfo,sizeof(struct msginfo)))//從核心拷貝到使用者空間
return-EFAULT;
return(max_id <0)?0: max_id;
}
case MSG_STAT://stat狀態相關操作
case IPC_STAT:
{
struct msqid64_ds tbuf;
int success_return;
if(!buf)
return-EFAULT;
if(cmd == MSG_STAT && msqid > msg_ids.size)
return-EINVAL;
memset(&tbuf,0,sizeof(tbuf));
msq = msg_lock(msqid);
if(msq == NULL)
return-EINVAL;
if(cmd == MSG_STAT){
success_return = msg_buildid(msqid, msq->q_perm.seq);
}else{
err =-EIDRM;
if(msg_checkid(msq,msqid))//id檢查
goto out_unlock;
success_return =0;
}
err =-EACCES;
if(ipcperms (&msq->q_perm, S_IRUGO))//許可權判斷
goto out_unlock;
kernel_to_ipc64_perm(&msq->q_perm,&tbuf.msg_perm);
tbuf.msg_stime = msq->q_stime;
tbuf.msg_rtime = msq->q_rtime;
tbuf.msg_ctime = msq->q_ctime;
tbuf.msg_cbytes = msq->q_cbytes;
tbuf.msg_qnum = msq->q_qnum;
tbuf.msg_qbytes = msq->q_qbytes;
tbuf.msg_lspid = msq->q_lspid;
tbuf.msg_lrpid = msq->q_lrpid;
msg_unlock(msqid);
if(copy_msqid_to_user(buf,&tbuf, version))//從核心拷貝到使用者
return-EFAULT;
return success_return;
}
case IPC_SET://set命令操作
if(!buf)
return-EFAULT;
if(copy_msqid_from_user (&setbuf, buf, version))//從使用者拷貝到核心
return-EFAULT;
break;
case IPC_RMID:
break;
default:
return-EINVAL;
}
down(&msg_ids.sem);
msq = msg_lock(msqid);
err=-EINVAL;
if(msq == NULL)
goto out_up;
err =-EIDRM;
if(msg_checkid(msq,msqid))
goto out_unlock_up;
ipcp =&msq->q_perm;
err =-EPERM;
if(current->euid != ipcp->cuid &&
current->euid != ipcp->uid &&!capable(CAP_SYS_ADMIN))//是否有許可權操作
/* We _could_ check for CAP_CHOWN above, but we don't */
goto out_unlock_up;
switch(cmd){
case IPC_SET:
{
if(setbuf.qbytes > msg_ctlmnb &&!capable(CAP_SYS_RESOURCE))
goto out_unlock_up;
msq->q_qbytes = setbuf.qbytes;
ipcp->uid = setbuf.uid;
ipcp->gid = setbuf.gid;
ipcp->mode =(ipcp->mode &~S_IRWXUGO)|
(S_IRWXUGO & setbuf.mode);
msq->q_ctime = CURRENT_TIME;
/* sleeping receivers might be excluded by
* stricter permissions.
*/
expunge_all(msq,-EAGAIN);//使所有正在等待此佇列接收報文的進程都出錯返回
/* sleeping senders might be able to send
* due to a larger queue size.
*/
ss_wakeup(&msq->q_senders,0);//將所有正在等待此佇列傳送報文的進程都喚醒,進行新一輪嚐試
msg_unlock(msqid);
break;
}
case IPC_RMID:
freeque (msqid);//將所有正在等待的傳送還是接收的進程全部喚醒,讓其出錯返回
break;
}
err =0;
out_up:
up(&msg_ids.sem);
return err;
out_unlock_up:
msg_unlock(msqid);
goto out_up;
out_unlock:
msg_unlock(msqid);
return err;
}
ipc_rmid對應操作:freeque (msqid); //將所有正在等待的傳送還是接收的進程全部喚醒,讓其出錯返回
staticvoid freeque (int id)
{
struct msg_queue *msq;
struct list_head *tmp;
msq = msg_rmid(id);
expunge_all(msq,-EIDRM);//將所有讀寫的從連結串列拖鏈,讓其出錯返回
ss_wakeup(&msq->q_senders,1);//喚醒
msg_unlock(id);
tmp = msq->q_messages.next;
while(tmp !=&msq->q_messages){//將所有報文釋放
struct msg_msg* msg = list_entry(tmp,struct msg_msg,m_list);
tmp = tmp->next;
atomic_dec(&msg_hdrs);
free_msg(msg);
}
atomic_sub(msq->q_cbytes,&msg_bytes);
kfree(msq);//是否佇列頭
}
相關文章