消息队列¶
约 2904 个字 365 行代码 1 张图片 预计阅读时间 14 分钟
System V消息队列¶
消息队列允许进程以消息的形式交换数据。尽管消息队列在某些方面与管道和 FIFO 类似,但它们之间仍然存在显著的差别。
- 用来引用消息队列的句柄是一个由 msgget()调用返回的标识符。这些标识符与 UNIX系统上大多数其他形式的 I/O 所使用的文件描述符是不同的。
- 通过消息队列进行的通信是面向消息的,即读者接收到由写者写入的整条消息。读取一条消息的一部分而让剩余部分遗留在队列中或一次读取多条消息都是不可能的。这一点与管道不通,管道提供的是一个无法进行区分的字节流(即使用管道时读者一次可以读取任意数量的字节数,不管写者写入的数据块的大小是什么)。
- 除了包含数据之外,每条消息还有一个用整数表示的类型。从消息队列中读取消息既可以按照先入先出的顺序,也可以根据类型来读取消息。
msgget()系统调用首先会在所有既有消息队列中搜索与指定的键对应的队列。如果找到了一个匹配的队列,那么就会返回该对象的标识符(除非在 msgflg 中同时指定了 IPC_CREAT和 IPC_EXCL,那样的话就返回一个错误)。如果没有找到匹配的队列并且在 msgflg 中指定了IPC_CREAT,那么就会创建一个新队列并返回该队列的标识符。
msgsnd发送消息¶
消息格式定义如下:
msgsnd的要点:
- 在使用 msgsnd()发送消息时并不存在 write()所具备的部分写的概念。这也是成功的msgsnd()只需要返回 0 而不是所发送的字节数的原因。
- 使用 msgsnd()发送消息必须要将消息结构中的 mtype 字段的值设为一个大于 0 的值。
- System V 消息队列会触发 “满” 的状态,本质是**队列使用的资源达到了系统 / 队列级别的限制**,此时调用
msgsnd()发送消息会阻塞(默认)或返回错误(非阻塞模式),队列满包括:① 系统最大消息队列个数,② 单个队列默认最大字节数,③ 单个队列中单个消息最大字节数;
可以通过ipcs -l查看所有System V IPC规格,例如:
root@3f930d3ecedd:/coding/example# ipcs -l
------ Messages Limits --------
max queues system wide = 32000
max size of message (bytes) = 8192
default max size of queue (bytes) = 16384
------ Shared Memory Limits --------
max number of segments = 4096
max seg size (kbytes) = 18014398509465599
max total shared memory (kbytes) = 18446744073709551612
min seg size (bytes) = 1
------ Semaphore Limits --------
max number of arrays = 32000
max semaphores per array = 32000
max semaphores system wide = 1024000000
max ops per semop call = 500
semaphore max value = 32767
msgrcv接受消息¶
msgrcv()系统调用从消息队列中读取(以及删除)一条消息并将其内容复制进 msgp 指向的缓冲区中。
msgp 缓冲区中 mtext 字段的最大可用空间是通过 maxmsgsz 参数来指定的。如果队列中待删除的消息体的大小超过了 maxmsgsz 字节,那么就不会从队列中删除消息,并且 msgrcv()会返回错误 E2BIG。
读取消息的顺序无需与消息被发送的一致。可以根据 mtype 字段的值来选择消息,而这个选择过程是由 msgtyp 参数来控制的,具体如下所述。
- 如果 msgtyp 等于 0,那么会删除队列中的第一条消息并将其返回给调用进程。
- 如果 msgtyp 大于 0,那么会将队列中第一条 mtype 等于 msgtyp 的消息删除并将其返回给调用进程。通过指定不同的 msgtyp 值,多个进程能够从同一个消息队列中读取消息而不会出现竞争读取同一条消息的情况。比较有用的一项技术是让各个进程选取与自己的进程 ID 匹配的消息。
- 如果 msgtyp 小于 0,那么就会将等待消息当成优先队列来处理。队列中 mtype 最小并且其值小于或等于 msgtyp 的绝对值的第一条消息会被删除并返回给调用进程。
关键函数¶
/* Get messages queue. */
extern int msgget (key_t __key, int __msgflg) __THROW;
/* Send message to message queue. */
extern int msgsnd (int __msqid, const void *__msgp, size_t __msgsz, int __msgflg);
/* Receive message from message queue. */
extern ssize_t msgrcv (int __msqid, void *__msgp, size_t __msgsz, long int __msgtyp, int __msgflg);
/* Message queue control operation. */
extern int msgctl (int __msqid, int __cmd, struct msqid_ds *__buf) __THROW;
示例代码¶
- case1: 消息队列实现基本进程间通信,msgrcv在不同进程会产生竞争
- case2: 基于不同msgtype向同一个队列发送和接受消息,不同msgtype不会相互阻塞
#include <stdio.h>
#include <stdlib.h>
#include <sys/msg.h>
#include <unistd.h>
#include <string.h>
#include "lib.h"
/**
* case1: 消息队列实现基本进程间通信,msgrcv在不同进程会产生竞争
*/
void msq_system_v_case1()
{
char *pathname = "/home/ubuntu";
int proj_id = 12345;
key_t key = ftok(pathname, proj_id);
for (int i = 0; i < 3; i++) {
if (fork() != 0) {
continue;
}
int msgid = msgget(key, IPC_CREAT | 0666);
if (msgid == -1) perror_exit("msgget");
struct msgbuf buf = {0};
if (i == 0) {
strcpy(buf.mtext, "hello,world");
buf.mtype = 1;
if (msgsnd(msgid, &buf, MSG_SIZE, 0) == -1) perror_exit("msgsnd");
printf("i=%d, pid=%d, msgid=%d send msg\n", i, getpid(), msgid);
}
if (i == 1) {
sleep(1);
// msg只会被消费一次
ssize_t len = msgrcv(msgid, &buf, MSG_SIZE, 1, 0);
if (len == -1) perror_exit("msgrcv");
printf("i=%d, pid=%d, msgid=%d send recv: %s\n", i, getpid(), msgid, buf.mtext);
strcpy(buf.mtext, "HELLO,world");
buf.mtype = 1;
// msg消费后其他进程msgrcv会阻塞,此处继续发送
if (msgsnd(msgid, &buf, strlen(buf.mtext), 0) == -1) perror_exit("msgsnd");
printf("i=%d, pid=%d, msgid=%d send msg\n", i, getpid(), msgid);
}
if (i == 2) {
sleep(2);
ssize_t len = msgrcv(msgid, &buf, MSG_SIZE, 1, 0);
if (len == -1) perror_exit("msgrcv");
printf("i=%d, pid=%d, msgid=%d recv: %s\n", i, getpid(), msgid, buf.mtext);
// 最后删除队列
if (msgctl(msgid, IPC_RMID, NULL) == -1) perror_exit("msgctl");
}
exit(EXIT_SUCCESS);
}
while (wait(NULL) != -1) {}
PRINT_SUCCESSFUL;
}
/**
* case2: 基于不同msgtype向同一个队列发送和接受消息,不同msgtype不会相互阻塞
*/
void msq_system_v_case2()
{
char *pathname = "/home/ubuntu";
int proj_id = 12345;
key_t key = ftok(pathname, proj_id);
int msgid = msgget(key, IPC_CREAT | IPC_EXCL | 0666);
if (msgid == -1) perror_exit("msgget");
struct msgbuf buf = {0};
for (int i = 0; i < 3; i++) {
pid_t pid = fork();
if (pid != 0) {
// 父进程按子进程pid向队列发送消息
sprintf(buf.mtext, "pid=%d, hello world", pid);
buf.mtype = pid;
if (msgsnd(msgid, &buf, MSG_SIZE, 0) == -1) perror_exit("msgsnd");
printf("i=%d, msgid=%d send msg to pid=%d\n", i, msgid, pid);
continue;
}
// 子进程按自身pid从队列读消息
sleep(1);
ssize_t len = msgrcv(msgid, &buf, MSG_SIZE, getpid(), 0);
if (len == -1) perror_exit("msgrcv");
printf("i=%d, pid=%d, msgid=%d recv: %s\n", i, getpid(), msgid, buf.mtext);
exit(EXIT_SUCCESS);
}
while (wait(NULL) != -1) {}
// 最后删除队列
if (msgctl(msgid, IPC_RMID, NULL) == -1) perror_exit("msgctl");
PRINT_SUCCESSFUL;
}
int main()
{
msq_system_v_case1();
msq_system_v_case2();
}
结果输出:
i=0, pid=5349, msgid=17 send msg
i=1, pid=5350, msgid=17 send recv: hello,world
i=1, pid=5350, msgid=17 send msg
i=2, pid=5351, msgid=17 recv: HELLO,world
[msq_system_v_case1] successful...
--------------------------------
i=0, msgid=18 send msg to pid=5361
i=1, msgid=18 send msg to pid=5362
i=2, msgid=18 send msg to pid=5363
i=0, pid=5361, msgid=18 recv: pid=5361, hello world
i=1, pid=5362, msgid=18 recv: pid=5362, hello world
i=2, pid=5363, msgid=18 recv: pid=5363, hello world
[msq_system_v_case2] successful...
--------------------------------
总结和缺点¶
UNIX 系统为同一系统上不同进程之间的数据传输提供了多种机制,既包括无分隔符的字节流形式(管道、FIFO 以及 UNIX domain 流 socket),也包括有分隔符的消息形式(System V消息队列、POSIX 消息队列以及 UNIX domain 数据报 socket)。
System V 消息队列的一个与众不同的特性是它能够为每个消息加上一个数字类型。应用程序可以使用这个完成两件事情:读取进程可以根据类型来选择消息或者它们可以采用一种优先队列策略以便优先读取高优先级的消息(即那些消息类型值更低的消息)。
但 System V 消息队列也存在几个缺点。
- 消息队列是通过标识符引用的,而不是像大多数其他 UNIX I/O 机制那样使用文件描述符。这意味着在第 63 章介绍的各种基于文件描述符的 I/O 技术(如 select()、poll()以及 epoll)将无法应用于消息队列上。此外,在程序中编写同时处理消息队列的输入和基于文件描述符的 I/O 机制的代码要比编写只处理文件描述符的代码更加复杂。
- 使用键而不是文件名来标识消息队列会增加额外的程序设计复杂性,同时还需要使用ipcs 和 ipcrm 来替换 ls 和 rm。ftok()函数通常能产生一个唯一的键,但却无法保证。使用 IPC_PRIVATE 键能确保产生唯一的队列标识符,但需要使这个标识符对需要用到它的其他进程可见。
-
消息队列是无连接的,内核不会像对待管道、FIFO 以及 socket 那样维护引用队列的进程数。因此就难以回答下列问题。
-
一个应用程序何时能够安全地删除一个消息队列?(不管是否有进程在后面某个时刻需要从队列中读取数据而过早地删除队列会导致数据丢失。)
-
应用程序如何确保不再使用的队列会被删除呢?
-
-
消息队列的总数、消息的大小以及单个队列的容量都是有限制的。这些限制都是可配置的,但如果一个应用程序超出了这些默认限制的范围,那么在安装应用程序的时候就需要完成一些额外的工作了。
POSIX消息队列¶
POSIX 消息队列与 System V 消息队列的相似之处在于数据的交换单位是整个消息,但它们之间仍然存在一些
显著的差异。
- POSIX 消息队列是引用计数的。只有当所有当前使用队列的进程都关闭了队列之后才会对队列进行标记以便删除。
- 每个 System V 消息都有一个整数类型,并且通过 msgrcv()可以以各种方式类选择消息。与之形成鲜明对比的是,POSIX 消息有一个关联的优先级,并且消息之间是严格按照优先级顺序排队的(以及接收)。
- POSIX 消息队列提供了一个特性允许在队列中的一条消息可用时异步地通知进程。
消息队列描述符和打开着的消息队列之间的关系与文件描述符和打开着的文件描述符之间的关系类似。消息队列描述符是一个进程级别的句柄,它引用了系统层面的打开着的消息队列描述表中的一个条目,而该条目则引用了一个消息队列对象。在 Linux 上,POSIX 消息队列被实现成了虚拟文件系统中的 i-node,并且消息队列描述符和打开着的消息队列描述分别被实现成了文件描述符和打开着的文件描述。

基本用法¶
POSIX 消息队列 API 中的主要函数如下。
- mq_open()函数创建一个新消息队列或打开一个既有队列,返回后续调用中会用到的消息队列描述符。
- mq_send()函数向队列写入一条消息。
- mq_receive()函数从队列中读取一条消息。
- mq_close()函数关闭进程之前打开的一个消息队列。
- mq_unlink()函数删除一个消息队列名并当所有进程关闭该队列时对队列进行标记以便删除。
- 每个消息队列都有一组关联的特性,其中一些特性可以在使用 mq_open()创建或打开队列时进行设置。获取和修改队列特性的工作则是由两个函数来完成的:mq_getattr()和mq_setattr()。
- mq_notify()函数允许一个进程向一个队列注册接收消息通知。在注册完之后,当一条消息可用时会通过发送一个信号或在一个单独的线程中调用一个函数来通知进程。
/* 创建或打开一个POSIX消息队列 */
extern mqd_t mq_open (const char *__name, int __oflag, ...)
/* Add message pointed by MSG_PTR to message queue MQDES. */
extern int mq_send (mqd_t __mqdes, const char *__msg_ptr, size_t __msg_len, unsigned int __msg_prio) __nonnull ((2));
/* Receive the oldest from highest priority messages in message queue MQDES. */
extern ssize_t mq_receive (mqd_t __mqdes, char *__msg_ptr, size_t __msg_len, unsigned int *__msg_prio) __nonnull ((2));
基本用法如下:
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>
#include <unistd.h>
#include <string.h>
#include "lib.h"
static char posix_msq_name[] = "/msq_demo";
/**
* 消息队列实现基本进程间通信,mq_open、mq_send、mq_receive以及mq_getattr基本用法
*/
void msq_posix_case1()
{
struct mq_attr attr;
attr.mq_maxmsg = 10; // 定义了使用mq_send()向消息队列添加消息的数量上限
attr.mq_msgsize = 1024; // 定义了加入消息队列的每条消息的大小的上限
/**
* mq_maxmsg 和 mq_msgsize 特性是在消息队列被创建时就确定下来的,并且之后也无法修改这两个特性
* 内核根据这两个值来确定消息队列所需的最大内存量。
*/
mqd_t mqd = mq_open(posix_msq_name, O_WRONLY | O_RDONLY | O_CREAT | O_EXCL, 0666, &attr);
for (int i = 0; i < 2; i++) {
pid_t pid = fork();
if (pid != 0) {
continue;
}
if (i == 0) {
mqd_t mqd = mq_open(posix_msq_name, O_WRONLY, 0666, NULL);
static char content[] = "hello,world";
printf("i=%d, pid=%d, mqd=%d, send msg:%s\n", i, getpid(), mqd, content);
/**
* msg_len 参数指定了 msg_ptr 指向的消息的长度,其值必须小于或等于队列的 mq_msgsize特性,
* 否则 mq_send()就会返回 EMSGSIZE 错误。长度为零的消息是允许的。
*/
if (mq_send(mqd, content, strlen(content) + 1, 0) == -1 ) perror_exit("mq_send");
}
if (i == 1) {
mqd_t mqd = mq_open(posix_msq_name, O_RDONLY, 0666, NULL);
struct mq_attr attr;
if (mq_getattr(mqd, &attr) == -1) perror_exit("mq_getattr");
// 按队列的最大消息长度分配缓冲区
char *msg_buf = malloc(attr.mq_msgsize);
if (msg_buf == NULL) {
perror_exit("malloc");
}
int prio = 0;
/**
* 不管消息的实际大小是什么,msg_len(即 msg_ptr 指向的缓冲区的大小)必须要大于或等于队列的 mq_msgsize 特性,
* 否则 mq_receive()就会失败并返回 EMSGSIZE 错误
*/
if (mq_receive(mqd, msg_buf, attr.mq_msgsize, &prio) == -1 ) perror_exit("mq_receive");
printf("i=%d, pid=%d, msgsize=%d, recv msg:%s\n", i, getpid(), attr.mq_msgsize, msg_buf);
}
if (mq_close(mqd) == -1) perror_exit("mq_close");
exit(EXIT_SUCCESS);
}
while (wait(NULL) != -1) {}
// 关闭该进程打开的消息队列
if (mq_close(mqd) == -1) perror_exit("mq_close");
// 删除一个消息队列名并当所有进程关闭该队列时对队列进行标记以便删除
if (mq_unlink(posix_msq_name) == -1) perror_exit("mq_unlink");
PRINT_SUCCESSFUL;
}
int main()
{
msq_posix_case1();
}
结果输出:
i=0, pid=10345, mqd=4, send msg:hello,world
i=1, pid=10346, msgsize=1024, recv msg:hello,world
[msq_posix_case1] successful...
--------------------------------
异步通知机制¶
POSIX 消息队列区别于 System V 消息队列的一个特性是 POSIX 消息队列能够接收之前为空的队列上有可用消息的异步通知(即队列从空变成了非空)。这个特性意味着已经无需执行一个阻塞的调用或将消息队列描述符标记为非阻塞并在队列上定期执行 mq_receive()调用(“拉”)了,因为一个进程能够请求消息到达通知,然后继续执行其他任务直到收到通知为止。进程可以选择通过信号的形式或通过在一个单独的线程中调用一个函数的形式来接收通知。
/* Register notification issued upon message arrival to an empty message queue MQDES. */
extern int mq_notify (mqd_t __mqdes, const struct sigevent *__notification) __THROW;
mq_notify 是 POSIX 消息队列的**异步通知机制**,注册调用进程在一条消息进入描述符 mqdes 引用的空队列时接收通知。作用是让内核在队列有新消息时,主动通知进程,避免进程阻塞在 mq_receive() 上轮询。linux支持信号和线程两种机制实现异步通知。
场景1:通过指定信号(如 SIGUSR1)让内核在有消息时向进程发送信号。
struct sigevent sev = {0};
int stop_flag = 0;
// 信号处理函数
void handle_signal(int sig) {
mqd_t mqd = mq_open(posix_msq_name, O_RDONLY);
char buf[256];
unsigned int prio;
ssize_t len = mq_receive(mqd, buf, 256, &prio);
if (len != -1) {
buf[len] = '\0';
printf("pid=%d, mqd=%d, recv msg:%s\n", getpid(), mqd, buf);
stop_flag = 1;
} else {
perror_exit("mq_receive");
}
// 通知只触发一次:收到通知后,需要重新调用 mq_notify 再次注册。
mq_notify(mqd, &sev);
if (mq_close(mqd) == -1) perror_exit("mq_close");
}
/**
* 通过信号接收异步消息通知,无需主线程阻塞在mq_receive
*/
void msq_posix_case2()
{
struct mq_attr attr;
attr.mq_maxmsg = 10;
attr.mq_msgsize = 256;
mqd_t mqd = mq_open(posix_msq_name, O_WRONLY | O_RDONLY | O_CREAT | O_EXCL, 0666, &attr);
// 设置信号通知
sev.sigev_notify = SIGEV_SIGNAL;
sev.sigev_signo = SIGUSR1;
// 注册信号处理函数
signal(SIGUSR1, handle_signal);
// 为该进程注册消息通知
mq_notify(mqd, &sev);
if (fork() == 0) {
sleep(3);
static char content[] = "hello,world";
printf("pid=%d, mqd=%d, send msg:%s\n", getpid(), mqd, content);
if (mq_send(mqd, content, strlen(content) + 1, 0) == -1 ) perror_exit("mq_send");
exit(EXIT_SUCCESS);
}
while (!stop_flag) { sleep(1); /* do something */ }
if (mq_close(mqd) == -1) perror_exit("mq_close");
if (mq_unlink(posix_msq_name) == -1) perror_exit("mq_unlink");
PRINT_SUCCESSFUL;
}
输出结果:
pid=13563, mqd=3, send msg:hello,world
pid=13559, mqd=4, recv msg:hello,world
[msq_posix_case2] successful...
--------------------------------
场景2:内核自动创建一个线程,在新消息到达时执行指定的回调函数。
// 线程回调函数
void thread_func(union sigval val) {
mqd_t mqd = mq_open(posix_msq_name, O_RDONLY);
char buf[256];
unsigned int prio;
ssize_t len = mq_receive(mqd, buf, 256, &prio);
if (len != -1) {
buf[len] = '\0';
// 内核会创建新线程处理该消息,因此pid和tid会不一样
printf("pid=%d, tid=%d, mqd=%d, recv msg:%s\n", getpid(), syscall(SYS_gettid), mqd, buf);
stop_flag = 1;
} else {
perror_exit("mq_receive");
}
if (mq_close(mqd) == -1) perror_exit("mq_close");
}
/**
* case3: 通过线程接收异步消息通知,无需主线程阻塞在mq_receive
*/
void msq_posix_case3()
{
struct mq_attr attr;
attr.mq_maxmsg = 10;
attr.mq_msgsize = 256;
mqd_t mqd = mq_open(posix_msq_name, O_WRONLY | O_RDONLY | O_CREAT | O_EXCL, 0666, &attr);
printf("pid=%d, tid=%d, create mqd=%d\n", getpid(), syscall(SYS_gettid), mqd);
// 设置消息通知线程回调
sev.sigev_notify = SIGEV_THREAD;
sev.sigev_notify_function = thread_func;
// 为该进程注册消息通知
mq_notify(mqd, &sev);
if (fork() == 0) {
sleep(3);
static char content[] = "hello,world";
printf("pid=%d, tid=%d, mqd=%d, send msg:%s\n", getpid(), syscall(SYS_gettid), mqd, content);
if (mq_send(mqd, content, strlen(content) + 1, 0) == -1 ) perror_exit("mq_send");
exit(EXIT_SUCCESS);
}
while (!stop_flag) { sleep(1); /* do something */ }
if (mq_close(mqd) == -1) perror_exit("mq_close");
if (mq_unlink(posix_msq_name) == -1) perror_exit("mq_unlink");
PRINT_SUCCESSFUL;
}
结果输出:
pid=14415, tid=14415, create mqd=3
pid=14420, tid=14420, mqd=3, send msg:hello,world
pid=14415, tid=14445, mqd=5, recv msg:hello,world
[msq_posix_case3] successful...
--------------------------------
总结¶
POSIX IPC 接口与 System V IPC 接口相比存在的各种优势:
- POSIX IPC 接口更加简单并且与传统的 UNIX 文件模型更加一致,同时 POSIX IPC 对象是引用计数的,这样就简化了确定何时删除一个对象的任务。POSIX 消息队列也同样具备这些常规优势。
- 消息通知特性允许一个(单个)进程能够在一条消息进入之前为空的队列时异步地通过信号或线程的实例化来接收通知。
- 在 Linux(不包括其他 UNIX 实现)上可以使用 poll()、select()以及 epoll 来监控 POSIX消息队列。System V 消息队列并没有这个特性。