Skip to main content

消息队列(MSG)

1. 基本概念

Linux 消息队列是 Linux 系统中的一种进程间通信机制。它允许多个进程向一个消息队列中写入消息,或者从消息队列中读取消息。

消息队列是一种先进先出(FIFO)的数据结构,可以存储任意类型的消息。消息队列有两个指针:一个指向队列头,一个指向队列尾。写入消息的进程会向队列尾写入消息,而读取消息的进程则会从队列头读取消息。

消息队列可以用于进程间的同步和通信。例如,可以通过消息队列来实现进程间的异步通信,也可以用来实现进程间的同步。消息队列通常用于实现多线程应用程序,能够有效地提高应用程序的性能和可维护性。

Linux 消息队列的基本操作包括创建、打开、关闭、发送和接收消息等,使用方法一般是:

  • 发送者:
    • 获取消息队列的ID。
    • 将数据放入一个附带有标识的特殊的结构体,发送给消息队列。
  • 接收者:
    • 获取消息队列的ID。
    • 将指定标识的消息读出。

当发送者和接收者都不再使用消息队列时,需要及时删除它以释放系统资源。

2. 消息队列API

2.1 获取消息队列ID

msgget() 用于获取一个消息队列的描述符。该函数的原型如下:

int msgget(key_t key, int msgflg);

该函数接受两个参数:

  • key:消息队列的键值。
  • xxxxxxxxxx1 1int shmget(key_t key, size_t size, int shmflg);jsx showLineNumbers

如果函数执行成功,则返回一个消息队列的描述符。否则,返回 -1

msgget() 函数用于获取一个消息队列的描述符。它能够通过消息队列的键值来查找消息队列,并返回一个用于操作消息队列的描述符。例如,可以使用 msgget() 函数来获取一个消息队列的描述符,然后通过该描述符来向消息队列发送消息。

2.2 发送消息

msgsnd()用于向消息队列中写入消息。该函数的原型如下:

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

该函数接受四个参数:

  • msqid:消息队列的描述符。
  • msgp:指向要写入消息队列的消息缓冲区的指针。
  • msgsz:要写入消息队列的消息的大小。
  • msgflg:控制写入消息队列的方式的标志。

如果函数执行成功,则返回 0。否则,返回一个非 0 值。

msgsnd() 函数用于向消息队列中写入消息。它能够将一个消息写入指定的消息队列,并在写入成功时返回。例如,可以使用 msgget() 函数获取消息队列的描述符,然后使用 msgsnd() 函数向该消息队列中写入消息。

注意
  • 选项msgflg是一个位屏蔽字,因此IPC_CREAT、IPC_EXCL和权限mode可以用位或的方式叠加起来,比如: msgget(key, IPC_CREAT\0666); 表示如果key对应的消息队列不存在就创建,且权限指定为 0666,若已存在则直接获取 ID。
  • 权限只有读和写,执行权限是无效的,例如0777跟0666是等价的。
  • 当key被指定为IPC_PRVATE 时,系统会自动产生一个未用的 key来对应一个新的消息队列对象。一般用于线程间通信。

2.3 接收消息

msgrcv() 是 Linux 系统中的一个函数,用于从消息队列中读取消息。该函数的原型如下:

int msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

该函数接受五个参数:

  • msqid:消息队列的描述符。
  • msgp:指向要用于存储读取到的消息的缓冲区的指针。
  • msgsz:要读取的消息的大小。
  • msgtyp:要读取的消息的类型。
  • msgflg:控制读取消息队列的方式的标志。

如果函数执行成功,则返回实际读取到的消息的大小。否则,返回一个负值。

msgrcv() 函数用于从消息队列中读取消息。它能够从指定的消息队列中读取一条消息,并在读取成功时返回。例如,可以使用 msgget() 函数获取消息队列的描述符,然后使用 msgrcv() 函数从该消息队列中读取消息。

注意

使用这两个收、发消息函数需要注意以下几点:

  • 发送消息时,消息必须被组织成以下形式:
struct msgbuf {
long mtype;//消息的标识
char mtext[1];//消息的正文
}

也就是说:发送出去的消息必须以一个long型数据打头,作为该消息的标识,后面的数据则没有要求。

  • 消息的标识可以是任意长整型数值,但不能是OL。
  • 参数 msgsz 是消息中正文的大小,不包含消息的标识。

2.4 设置或获取消息队列属性

msgctl() 是 Linux 系统中的一个函数,用于控制消息队列。该函数的原型如下:

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

该函数接受三个参数:

  • msqid:消息队列的描述符。
  • cmd:指定要进行的操作的类型。
    • IPC_STAT:获取该MSG的信息,储存在结构体 msqid_ds 中。
    • IPC_SET:设置该MSG的信息,储存在结构体 msqid_ds。
    • IPC_RMID: 立即删除该MSG,并且唤醒所有阻塞在该MSG上的进程,同时忽略第三个参数。
    • IPC_INFO:获得关于当前系统中 MSG的限制值信息。
    • MSG_INFO:获得关于当前系统中MSG的相关资源消耗信息。
    • MSG_STAT:同IPC_STAT,但 msgid为该消息队列在内核中记录所有消息队列信息的数组的下标,因此通过迭代所有的下标可以获得系统中所有消息队列的相关信息
  • buf:指向用于存储消息队列信息的结构体的指针。

如果函数执行成功,则返回 0。否则,返回一个非 0 值。

msgctl() 函数用于控制消息队列。它能够对指定的消息队列执行各种操作,例如,查询消息队列信息、更改消息队列的标志、删除消息队列等。该函数的行为取决于 cmd 参数的值。例如,如果将 cmd 设为 IPC_STAT,则 msgctl() 函数会将指定消息队列的信息存储到 buf 所指向的结构体中。

3. 相关结构体定义

3.1 属性信息结构体

IPC_STAT获得的属性信息被存放在以下结构体中:

struct msqid_ds {
struct ipc_perm msg_perm; /*权限相关信息*/
time_t msg_stime; /*最后一次发送消息的时间*/
time_t msg_rtime; /*最后一次接收消息的时间*/
time_t msg_ctime; /*最后一次状态变更的时间*/
unsigned long _msg_cbytes; /*当前消息队列中的数据尺寸*/
msgqnum_t msg_qnum; /*当前消息队列中的消息个数*/
msglen_t msg_qbytes; /*消息队列的最大数据尺寸*/
pid_t msg_lspid; /*最后一个发送消息的进程PID*/
pid_t msg_lrpid; /*最后一个接收消息的进程PID*/
};

3.2 权限信息结构体

struct ipc perm {
key _t _key; /* 当前消息队列的键值key */
uid_t uid; /* 当前消息队列所有者的有效UID*/
gid_t gid; /*当前消息队列所有者的有效GID */
uid_t cuid; /* 当前消息队列创建者的有效UID */
gid_t cgid; /*当前消息队列创建者的有效GID*/
unsigned short mode; /*消息队列的读写权限*/
unsigned shortseq; /*序列号*/
};

3.3 定义结构体

当使用IPC_INFO时,需要定义一个如下结构体来获取系统关于消息队列的限制值信息,并且将这个结构体指针强制类型转化为第三个参数的类型。

struct msginfo {
int msgpool; /*系统消息总尺寸(千字节为单位)最大值*/
int msgmap; /*系统消息个数最大值*/
int msgmax; /*系统单个消息尺寸最大值*/
int msgmnb; /*写入消息队列字节数最大值*/
int msgmni; /*系统消息队列个数最大值*/
int msgssz; /*消息段尺寸*/
int msgtql; /*系统中所有消息队列中的消息总数最大值*/
unsigned short int msgseg; /*分配给消息队列的数据段的最大值*/
};

3.3 注意点

当使用选项MSG_INFO时,跟 IPC_INFO一样也是获得一个msginfo结构体的信息,但是有如下几点不同:

  • 成员 msgpool记录的是系统当前存在的MSG的个数总和。
  • 成员msgmap记录的是系统当前所有MSG中的消息个数总和。
  • 成员msgtql记录的是系统当前所有MSG中的所有消息的所有字节数总和。

4. 示例代码

下面的示例展示了一个进程 jack 如何使用消息队列给另一个进程 rose 发送消息的过程,以及如何使用msgctl( )函数,删除不再使用的消息队列:

  • head.h
#ifndef _HEAD_H_
#define _HEAD_H_

#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdbool.h>
#include <errno.h>
#include <time.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include "head.h"
#include <signal.h>
#include <sys/wait.h>

#define PATH "./"
#define PROJ_ID 1

#define MSGSIZE 100 //消息正文的大小

#define JTOR 1 //jack发给rose的消息
#define RTOJ 2 //rose发给jack的消息

struct msgbuf
{
long mtype; //消息类型
char mtext[MSGSIZE]; //消息正文
};

#endif
  • rose.c
#include "head.h"
#include <singal.h>

void child(int sig)
{
if(sig == SIGCHLD) {
printf("通道已关闭\n");
exit(0);
}
}

int main(int argc, char *argv[])
{
//获取一个IPC对象的key
key_t key = ftok(PATH, PROJ_ID);
if(key == -1) {
perror("ftok() fail");
exit(0);
}

//获取一个消息队列的ID
int msgid = msgget(key, IPC_CREAT|0666);
if(msgid == -1) {
perror("msgget() fail");
exit(0);
}

struct msgbuf msg;

pid_t pid = fork();

//创建一个子进程,专门接收jack的消息
if(pid == -1) {
perror("fork()() fail");
exit(0);
}

//子进程
if(pid == 0) {
while(1) {
bzero(&msg, sizeof(msg)); //先清空消息结构体变量
if(msgrcv(msgid, &msg, MSGSIZE, JTOR, 0) == -1) {
perror("msgrcv() fail");
exit(0);
}
if( !strncmp(msg.mtext, "bye", 3) )
{
//先把通道关闭(把消息队列删掉)
msgctl(msgid, IPC_RMID, NULL);
exit(0);
}
printf("from jack: %s\n", msg.mtext);
}
}

//父进程
if(pid > 0) {
signal(SIGCHLD, child);
while(1) {
bzero(&msg, sizeof(msg)); //先清空消息结构体变量
msg.mtype = RTOJ;
printf("请输入要发送给jack的消息\n");
fgets(msg.mtext, MSGSIZE, stdin); //从键盘获取最多不超过100个字节的信息
msgsnd(msgid, &msg, MSGSIZE, 0);
if( !strncmp(msg.mtext, "bye", 3) )
break;
}
}

//等待子进程
wait(NULL);

return 0;
}
  • jack.c
#include "head.h"
#include <singal.h>

void child(int sig)
{
if(sig == SIGCHLD) {
printf("通道已关闭\n");
exit(0);
}
}

int main(int argc, char *argv[])
{
//获取一个IPC对象的key
key_t key = ftok(PATH, PROJ_ID);
if(key == -1) {
perror("ftok() fail");
exit(0);
}

//获取一个消息队列的ID
int msgid = msgget(key, IPC_CREAT|0666);
if(msgid == -1) {
perror("msgget() fail");
exit(0);
}

struct msgbuf msg;

pid_t pid = fork();

//创建一个子进程,专门接收rose的消息
if(pid == -1) {
perror("fork()() fail");
exit(0);
}

//子进程
if(pid == 0) {
while(1) {
bzero(&msg, sizeof(msg)); //先清空消息结构体变量
if(msgrcv(msgid, &msg, MSGSIZE, RTOJ, 0) == -1) {
perror("msgrcv() fail");
exit(0);
}
if( !strncmp(msg.mtext, "bye", 3) )
{
//先把通道关闭(把消息队列删掉)
msgctl(msgid, IPC_RMID, NULL);
exit(0);
}
printf("from rose: %s\n", msg.mtext);
}
}

//父进程
if(pid > 0) {
signal(SIGCHLD, child);
while(1) {
bzero(&msg, sizeof(msg)); //先清空消息结构体变量
msg.mtype = JTOR;
printf("请输入要发送给rose的消息\n");
fgets(msg.mtext, MSGSIZE, stdin); //从键盘获取最多不超过100个字节的信息
msgsnd(msgid, &msg, MSGSIZE, 0);
if( !strncmp(msg.mtext, "bye", 3) )
break;
}
}

//等待子进程
wait(NULL);

return 0;
}