2021-05-12 14:32:11
Linux進程間通訊-訊息佇列(mqueue)深入理解
前面兩篇文章分解介紹了匿名管道和命名管道方式的進程間通訊,本文將介紹Linux訊息佇列(posix)的通訊機制和特點。
1、訊息佇列
訊息佇列的實現分為兩種,一種為System V的訊息佇列,一種是Posix訊息佇列;這篇文章將主要圍繞Posix訊息佇列介紹;
訊息佇列可以認為是一個訊息連結串列,某個進程往一個訊息佇列中寫入訊息之前,不需要另外某個進程在該佇列上等待訊息的達到,這一點與管道和FIFO相反。Posix訊息佇列與System V訊息佇列的區別如下:
(1) 對Posix訊息佇列的讀總是返回最高優先順序的最早訊息,對System V訊息佇列的讀則可以返回任意指定優先順序的訊息。
(2)當往一個空佇列放置一個訊息時,Posix訊息佇列允許產生一個信號或啟動一個執行緒,System V訊息佇列則不提供類似的機制。
2、訊息佇列的基本操作
2.1 開啟一個訊息佇列
#include <mqueue.h>
typedef int mqd_t;
mqd_t mq_open(const char *name, int oflag, ... /* mode_t mode, struct mq_attr *attr */);
返回: 成功時為訊息佇列描述字,出錯時為-1。
功能: 建立一個新的訊息佇列或開啟一個已存在的訊息的佇列。
2.2 關閉一個訊息佇列
#include <mqueue.h>
int mq_close(mqd_t mqdes);
返回: 成功時為0,出錯時為-1。
功能: 關閉已開啟的訊息佇列。
2.3 刪除一個訊息佇列
#include <mqueue.h>
int mq_unlink(const char *name)
返回: 成功時為0,出錯時為-1
功能: 從系統中刪除訊息佇列。
這三個函數操作的程式碼如下:
#include <mqueue.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
int main(int argc, char* argv[])
{
int flag = O_RDWR | O_CREAT | O_EXCL;
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
mqd_t mqid = mq_open("/mq_test", flag, mode,NULL);
if (mqid == -1)
{
printf("mqueue create failed!n");
return 1;
}
else
{
printf("mqueue create success!n");
}
mq_close(mqid); return 0;
}
#include <mqueue.h>
#include <unistd.h>
int main(int argc, char* argv[])
{
mq_unlink("/mq_test");
return 0;
}
注意:編譯posix mqueue時,要連線執行時庫(runtime library),既-lrt選項,執行結果如下:
關於mqueue更多詳細內容可以使用:man mq_overview命令檢視,裡面有一條需要注意的是,Linux下的Posix訊息佇列是在vfs中建立的,可以用
mount -t mqueue none /dev/mqueue
將訊息佇列掛在在/dev/mqueue目錄下,便於檢視。
2.4 mq_close()和mq_unlink()
mq_close()的功能是關閉訊息佇列的檔案描述符,但訊息佇列並不從系統中刪除,要刪除一個訊息佇列,必須呼叫mq_unlink();這與檔案系統的unlink()機制是一樣的。
3、訊息佇列的屬性
#include <mqueue.h>
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *attr);
均返回:成功時為0, 出錯時為-1
每個訊息佇列有四個屬性:
struct mq_attr
{
long mq_flags; /* message queue flag : 0, O_NONBLOCK */
long mq_maxmsg; /* max number of messages allowed on queue*/
long mq_msgsize; /* max size of a message (in bytes)*/
long mq_curmsgs; /* number of messages currently on queue */
};
4、訊息收發
#include <mqueue.h>
int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);
返回:成功時為0,出錯為-1
ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *priop);
返回:成功時為訊息中的位元組數,出錯為-1
mqsend程式碼如下:
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <mqueue.h>
#include <sys/stat.h>
#include <sys/types.h>
int main(int argc, char* argv[])
{
int flag = O_RDWR;
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
mqd_t mqid = mq_open("/mq_test",flag,mode,NULL);
if (mqid == -1)
{
printf("open mqueue failed!n");
return 1;
}
char *buf = "hello, i am sender!";
mq_send(mqid,buf,strlen(buf),20);
mq_close(mqid);
return 0;
}
mqrecv程式碼如下:
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <mqueue.h>
#include <sys/stat.h>
#include <sys/types.h>
int main(int argc, char* argv[])
{
int flag = O_RDWR;
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
mqd_t mqid = mq_open("/mq_test",flag,mode,NULL);
if (mqid == -1)
{
printf("open mqueue failed!n");
return 1;
}
struct mq_attr attr;
mq_getattr(mqid,&attr);
char buf[256] = {0};
int priority = 0;
mq_receive(mqid,buf,attr.mq_msgsize,&priority);
printf("%sn",buf);
mq_close(mqid);
return 0;
}
執行結果如下:
首先我們執行三次send,然後執行四次recv,可見recv的前三次是可以收到訊息佇列裡的三個訊息的,當執行第四次的時,系統訊息佇列裡為空,recv就會阻塞;關於非阻塞式mqueue見下文。
5、mq_notify函數
如前文介紹,poxis訊息佇列執行非同步通知,以告知何時有一個訊息放置到某個空訊息佇列中,這種通知有兩種方式可以選擇:
(1)產生一個信號
(2)建立一個執行緒來執行一個指定的函數
這種通知通過mq_notify() 函數建立。該函數為指定的訊息訊息佇列建立或刪除非同步事件通知,
#include <mqueue.h>
int mq_notify(mqd_t mqdes, const struct sigevent* notification);
(1)如果notification引數為非空,那麼當前進程希望在有一個訊息到達所指定的先前為空的對列時得到通知。
(2)如果notification引數為空,而且當前進程被註冊為接收指定佇列的通知,那麼已存在的註冊將被復原。
(3)任意時刻只有一個進程可以被註冊為接收某個給定佇列的通知。
(4)當有一個訊息到達先前為空的訊息佇列,而且已有一個進程被註冊為接收該佇列的通知時,只有在沒有任何執行緒阻塞在該佇列的mq_receive呼叫中的前提下,通知才會發出。即說明,在mq_receive呼叫中的阻塞比任何通知的註冊都優先。
(5)當前通知被傳送給它的註冊進程時,其註冊即被復原。該進程必須再次呼叫mq_notify以重新註冊。
sigevent結構如下:
union sigval{
int sival_int; /*integer value*/
void *sival_ptr; /*pointer value*/
};
struct sigevent{
int sigev_notify; /*SIGEV_{NONE, SIGNAL, THREAD}*/
int sigev_signo; /*signal number if SIGEV_SIGNAL*/
union sigval sigev_value;
void (*sigev_notify_function)(union sigval);
pthread_attr_t *sigev_notify_attributes;
};
5.1 mq_notify() 使用信號處理程式
一個正確的使用非阻塞mq_receive的信號通知的例子:
#include <unistd.h>
#include <stdio.h>
#include <mqueue.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <signal.h>
#include <stdlib.h>
#include <errno.h>
void sig_usr1(int );
volatile sig_atomic_t mqflag;
int main(int argc, char* argv[])
{
mqd_t mqid = 0;
void *buff;
struct mq_attr attr;
struct sigevent sigev;
sigset_t zeromask,newmask,oldmask;
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
mqid = mq_open("/mq_test",O_RDONLY | O_NONBLOCK,mode,NULL); // 非阻塞式開啟mqueue
mq_getattr(mqid,&attr);
buff = malloc(attr.mq_msgsize);
sigemptyset(&zeromask);
sigemptyset(&newmask);
sigemptyset(&oldmask);
sigaddset(&newmask,SIGUSR1); // 初始化信號集
signal(SIGUSR1,sig_usr1); // 信號處理程式
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
int n = mq_notify(mqid,&sigev); // 啟用通知
for (;;)
{
sigprocmask(SIG_BLOCK,&newmask,&oldmask);
while(mqflag == 0)
sigsuspend(&zeromask);
mqflag = 0;
ssize_t n;
mq_notify(mqid, &sigev); // 重新註冊
while( (n = mq_receive(mqid,buff,attr.mq_msgsize,NULL)) >=0)
printf("SIGUSR1 received, read %ld bytes.n",(long)n); //讀取訊息
if(errno != EAGAIN)
printf("mq_receive errorn");
sigprocmask(SIG_UNBLOCK,&newmask,NULL);
}
return 0;
}
void sig_usr1(int signo)
{
mqflag = 1;
}
??裡為什麼使用的是非阻塞式mq_receive,為什麼不在信號處理程式中列印接收到的字元請參閱《unp 第二卷》
5.2 mq_notify() 使用執行緒處理程式
非同步事件通知的另一種方式是把sigev_notify設定成SIGEV_THREAD,這會建立一個新執行緒,該執行緒呼叫由sigev_notify_function指定的函數,所用的引數由sigev_value指定,新執行緒的屬性由sigev_notify_attributes指定,要指定執行緒的預設屬性的話,傳空指標。新執行緒是作為脫離執行緒建立的。
#include <unistd.h>
#include <stdio.h>
#include <mqueue.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <signal.h>
#include <stdlib.h>
#include <errno.h>
#include <pthread.h>
mqd_t mqid = 0;
struct mq_attr attr;
struct sigevent sigev;
static void notify_thread(union sigval);
int main(int argc, char* argv[])
{
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
mqid = mq_open("/mq_test",O_RDONLY | O_NONBLOCK,mode,NULL);
mq_getattr(mqid,&attr);
sigev.sigev_notify = SIGEV_THREAD;
sigev.sigev_notify_function = notify_thread;
sigev.sigev_value.sival_ptr = NULL;
sigev.sigev_notify_attributes = NULL;
int n = mq_notify(mqid,&sigev);
for (;;)
pause();
return 0;
}
static void notify_thread(union sigval arg)
{
ssize_t n;
char* buff;
printf("notify_thread_started!n");
buff = malloc(attr.mq_msgsize);
mq_notify(mqid, &sigev);
while( (n = mq_receive(mqid,buff,attr.mq_msgsize,NULL)) >=0)
printf("SIGUSR1 received, read %ld bytes.n",(long)n);
if(errno != EAGAIN)
printf("mq_receive errorn");
free(buff);
pthread_exit(0);
}
一、進程間通訊-管道 https://www.linuxidc.com/Linux/2018-04/151680.htm
二、進程間通訊-命名管道 https://www.linuxidc.com/Linux/2018-04/151681.htm
三、進程間通訊-訊息佇列 https://www.linuxidc.com/Linux/2018-04/151682.htm
四、進程間通訊-共用記憶體 https://www.linuxidc.com/Linux/2018-04/151683.htm
本文永久更新連結地址:https://www.linuxidc.com/Linux/2018-04/151682.htm
相關文章