System V 消息队列进程间通信

Aki 发布于 2023-02-27 280 次阅读


若是一个多线程的进程,由于各个线程共享一个地址空间,可以直接通过变量的形式进行通信。而进程,由于各个进程独占一个地址空间,我们需要一种通信机制来完成进程间的数据交互。

Linux提供了System V和POSIX两种消息队列实现方式

System  V  IPC

System  V IPC引入了三种高级进程间的通信机制。一个IPC对象包含消息队列、共享内寸和信号量。

  • 共享内存
  • 消息队列
  • 信号灯集

System V IPC对象

每个IPC对象有唯一的ID;IPC对象创建后一直存在,直到被显式地删除;每个IPC对象有一个关联的KEY(其中进程的私有对象KTY值为0)。

命令查看IPC对象

IPC对象是全局对象,可用ipcs,ipcrm等命令查看或删除,生命周期与内核相同。

ipcs -q: 只显示消息队列
ipcs -s: 只显示信号量
ipcs -m: 只显示共享内存

System V 消息队列通信重要函数、

消息队列( message queue ) : 消息队列是由消息的链表,存放在内核中并由消息队列标识符标识。消息队列克服了信号传递信息少、管道只能承载无格式字节流以及缓冲区大小受限等缺点。

消息队列的特点:

  1. 消息队列是面向记录的,其中消息具有特定的格式以及特定的优先级。
  2. 消息队列独立于发送和接受进程。进程终止时,消息队列的内容并不会被删除。
  3. 消息队列可以实现消息的随机查询,消息不一定要以先进先出的顺序读取,可以按照消息的类型读取。

进程间通过消息队列通信步骤:

  1. 创建或打开消息队列
  2. 添加消息
  3. 读取消息
  4. 控制消息队列。

(1)获取键值

#include <sys/types.h>
#include <sys/ipc.h>

key_t ftok(const char *path, int proj_id);

成功时返回合法的key值,失败时返回EOF,即-1
path  存在且可访问的文件的路径,并不会实际打开读取修改文件
proj_id  用于生成key的数字,不能为0,推荐114514

(2)打开或创建消息队列

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg);

成功时返回消息队列的id,失败时返回EOF
key: 和消息队列关联的key  IPC_PRIVATE 或 ftok
msgflg: 标志位  IPC_CREAT|0666

(3)添加消息

//消息结构体,需要保证第一个为long型
struct message                                                                               
{                                                                                            
        long mtype;     //标识为何种消息!!!!剩下的自由发挥                                                                     
        char buffer[512];                                                                    
};      
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

msgid:消息队列ID。
msgp:消息结构体指针。
msgsz:结构体中字符数组的大小。(发送可用sizeof或者strlen计算)
msgflg:可以为0,也可以为IPC_NOWAIT。
返回值:成功返回0,失败返回-1。

在 System V 的消息队列中,每个消息都有一个消息类型,用来标识不同的消息。发送和接收消息时,必须指定消息类型,这样消息队列才能正确地选择合适的消息。

消息类型是一个整数,通常是正整数。发送和接收消息时,可以指定消息类型为0,这表示接收队列中的第一条消息。如果指定的消息类型不存在,则接收操作会被阻塞,直到消息队列中有指定类型的消息为止。

使用消息类型可以实现多个进程之间的不同类型的通信。例如,一个进程可以发送类型为1的消息,另一个进程可以接收类型为1的消息,并根据消息内容执行不同的操作。因此,消息类型是 System V 消息队列实现进程间通信的一个重要机制。一般来说,发送和接收消息时,要遵循一致的消息类型规则。例如,发送方指定的消息类型为1,接收方应该指定相同的消息类型1,而不是使用默认的0。这样可以保证消息传递的正确性和稳定性。

在使用 System V 的消息队列进行进程间通信时,并不是一定要使用 mtypemtext 字段组成的消息结构体,你也可以自定义消息结构体,只需要保证消息结构体的第一个字段是 long 类型的 mtype 字段即可。

需要注意的是,如果你使用自定义的消息结构体,接收方需要知道消息结构体的定义,才能正确地解析消息数据。因此,在进行进程间通信时,建议将消息结构体的定义放在共享的头文件中,以便发送方和接收方都可以使用相同的消息结构体。

(4)读取消息

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,int msgflg);

msgid:消息队列ID。
msgp:消息结构体指针。
msgsz:结构体中字符数组的大小。(使用sizeof计算)
msgtyp:消息类型。
可以为0,也可以为IPC_NOWAIT。
返回值:成功返回消息的数据长度,失败返回-1。

(5)控制消息队列

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

msgid:消息队列ID。
cmd:控制选项(或控制命令),下面详解
buf:存放属性信息(删除设置为NULL即可)
返回值:成功返回0,失败返回-1

IPC_STAT:将msqid消息队列的属性信息,读到第三个参数所指定的缓存。
IPC_SET:使用第三个参数中的新设置去修改消息队列的属性。
IPC_RMID:删除消息队列,用不到第三个参数,用不到时设置为NULL。

在使用 System V 的消息队列进行进程间通信时,msgsndmsgrcv 函数的第四个参数可以设置为 IPC_NOWAIT,用于指定函数的行为。IPC_NOWAIT 表示非阻塞操作,即如果消息队列已满(对于 msgsnd 函数)或消息队列为空(对于 msgrcv 函数),函数将立即返回,而不是等待消息队列可用。

具体来说,IPC_NOWAIT 参数对于 msgsnd 函数表示,如果消息队列已满,则不等待消息队列可用,直接返回错误并设置 errnoEAGAIN。这样可以避免函数一直阻塞,等待消息队列可用。而对于 msgrcv 函数表示,如果消息队列为空,则不等待消息队列有可用的消息,直接返回错误并设置 errnoENOMSG。这样可以避免函数一直阻塞,等待消息队列有可用的消息。

在某些场景下,我们可能需要使用非阻塞操作来避免函数一直阻塞,例如在程序中需要同时处理多个进程间通信的消息队列,如果一个消息队列已满或为空,我们可能需要立即转到处理其他消息队列,而不是一直等待。在这种情况下,可以使用 IPC_NOWAIT 参数来实现非阻塞操作。

如果在 msgsndmsgrcv 函数的第四个参数中设置为 0,则表示这是一个阻塞操作,函数将一直阻塞等待消息队列可用。对于 msgsnd 函数,如果消息队列已满,则阻塞等待直到消息队列可用;对于 msgrcv 函数,如果消息队列为空,则阻塞等待直到消息队列有可用的消息。

发送方进程

#include<iostream>
using namespace std;
#include<sys/types.h>
#include<sys/ipc.h>
#include<cstring>
#include<sys/msg.h>


void unix_error(const char* msg) noexcept
{
	cerr << msg;
	exit(0);
}


//消息队列结构体,双方必须一致!!!
struct message
{
	long mtype;
	char buffer[512];
};



int main()
{
	key_t key = ftok("./makefile",114514);
	if(key < 0)
	{
		unix_error("create key error\n");
	}

	int msgid = msgget(key,IPC_CREAT | 0666 );
	if(msgid < 0)
	{
		unix_error("create msgid error\n");
	}

	message msg;
	msg.mtype = 1;

	while(1)
	{
		memset(msg.buffer,0,512);
		cout << "请输入需要发送的消息:";
		cin >> msg.buffer;
		if(msgsnd(msgid,&msg,strlen(msg.buffer) + 1,IPC_NOWAIT) == 0)
		{
			cout << "发送成功" << endl;
		}
		if(strcmp(msg.buffer,"quit") == 0)
		{
			break;
		}
	}

	return 0;
}

接收方进程

#include<iostream>
using namespace std;
#include<sys/types.h>
#include<sys/ipc.h>
#include<cstring>
#include<sys/msg.h>


void unix_error(const char* buffer) noexcept
{
	cerr << buffer;
	exit(0);
}

//消息队列结构体,双方必须一致!!!
struct message
{
	long mtype;
	char buffer[512];
};


int main()
{

	key_t key = ftok("./makefile",114514);
	int msgid = msgget(key,IPC_CREAT|0666);
	if(msgid < 0)
	{
		unix_error("create msgid error\n");
	}

	message msg;
	while(msgrcv(msgid,&msg,sizeof(msg.buffer),1,0) > 0)
	{
		if(strlen(msg.buffer) > 0)
		{

		        cout << "收到消息:" << msg.buffer << endl;
		}
		if(strcmp(msg.buffer,"quit") == 0)
		{
			break;
		}
		memset(msg.buffer,0,512);
	}

	msgctl(msgid,IPC_RMID,nullptr);

	return 0;
}