消息队列(Message Queue)是一种异步通信机制,其核心功能是:
- 解耦生产者和消费者:发送方和接收方无需同时在线
- 缓冲与流量整形:应对瞬时流量过载
- 优先级处理:支持紧急消息插队
典型案例:
在RT-Thread中,中断服务程序(ISR)通过rt_mq_send非阻塞发送传感器数据,消费者线程按需从队列读取数据,避免ISR因等待导致实时性下降。
一、消息队列API函数
1、消息队列的创建
- rt_mq_t rt_mq_create(const char *name,
- rt_size_t msg_size,
- rt_size_t max_msgs,
- rt_uint8_t flag)
复制代码- rt_err_t rt_mq_init(rt_mq_t mq,
- const char *name,
- void *msgpool,
- rt_size_t msg_size,
- rt_size_t pool_size,
- rt_uint8_t flag)
复制代码 2、消息的发送
发送消息时,从空闲消息链表取一个空闲消息块,把消息复制到该消息块,然后把消息块挂到消息队列尾部。消息的发送有以下三种:
队列中有空闲消息块时,才能成功发送消息,否则返回错误。- rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size)
复制代码 如果队列中没有可用的空闲消息块,会根据timeout参数等待,超时后才返回错误。- rt_err_t rt_mq_send_wait(rt_mq_t mq,
- const void *buffer,
- rt_size_t size,
- rt_int32_t timeout)
复制代码 它会把消息块放在消息队列的头部,以便这个消息能被第1时间读取。- rt_err_t rt_mq_urgent(rt_mq_t mq, const void *buffer, rt_size_t size)
复制代码 3、消息的接收
当队列有消息时,使用收消息函数,可以从队列接收消息;如果没有消息,根据指定的 timeout 参数等待,直到超时结束。- rt_err_t rt_mq_recv(rt_mq_t mq,
- void *buffer,
- rt_size_t size,
- rt_int32_t timeout)
复制代码 4、消息队列的脱离/删除
- rt_err_t rt_mq_delete(rt_mq_t mq)
复制代码- rt_err_t rt_mq_detach(rt_mq_t mq)
复制代码 二、创建消息队列示例
1、创建消息队列
- /* 使用静态创建(确定内存占用) */
- struct rt_messagequeue mq_static;
- uint8_t msgpool[96]; /* 消息数量计算:96/(RT_ALIGN(2,4)+4)=12条消息 */
- rt_mq_init(
- &mq_static, /* 消息队列对象的句柄 */
- "smq", /* 消息队列的名字 */
- msgpool, /* 内存池指向的地址 */
- 2, /* 每一个消息占2字节 */
- 96, /* 内存池的大小 */
- RT_IPC_FLAG_FIFO); /* 选择线程唤醒的顺序方式 */
- /* 使用动态创建(灵活调整) */
- rt_mq_t mq_dynamic = rt_mq_create(
- "dmq", /* 消息队列名称 */
- 2, /* 每一个消息大小 */
- 12, /* 消息数量 */
- RT_IPC_FLAG_FIFO); /* 选择线程唤醒的顺序方式 */
复制代码 2、完整示例
- #include "thread_task.h"
- #include "main.h"
- #include <stdio.h>
- #include "rtthread.h"
- #include <rthw.h>
- /******************************************** 线程 1 ******************************************************/
- #define THREAD_1_PRIORITY 4 /* 进程优先级 */
- #define THREAD_1_STACK_SIZE 512 /* 进程栈空间大小 */
- #define THREAD_1_TIMESLICE 10 /* 进程执行时间片个数 */
- static struct rt_thread *thread_1_handle; /* 进程句柄 */
- /******************************************** 线程 2 ******************************************************/
- #define THREAD_2_PRIORITY 5 /* 进程优先级 */
- #define THREAD_2_STACK_SIZE 512 /* 进程栈空间大小 */
- #define THREAD_2_TIMESLICE 10 /* 进程执行时间片个数 */
- static struct rt_thread *thread_2_handle; /* 进程句柄 */
- /* 消息队列 */
- rt_mq_t mq_test;
- /**
- * @brief 线程1入口函数
- * @param 无
- * @retval 无
- */
- void thread_1_entry(void* param)
- {
- uint8_t buffer[2];
- while(1)
- {
- rt_mq_recv(mq_test, buffer, 2, 0xffffffff);
- HAL_GPIO_TogglePin(GPIOC, LED1_Pin);
- rt_thread_delay(4000); /* 精准延时1000时间片 */
- }
- }
- /**
- * @brief 线程2入口函数
- * @param 无
- * @retval 无
- */
- void thread_2_entry(void* param)
- {
- uint8_t text[2] = {0,1};
- while(1)
- {
- text[0] ++;
- rt_mq_send_wait(mq_test, text, 2, 0xffff);
- HAL_GPIO_TogglePin(GPIOC, LED2_Pin);
-
- rt_thread_delay(200);
- }
- }
- /**
- * @brief 动态创建线程任务并启动
- * @param 无
- * @retval 无
- */
- void ThreadStart(void)
- {
- rt_base_t level = rt_hw_interrupt_disable();
- /* 动态创建线程 */
- thread_1_handle = rt_thread_create(
- "thread_1", /* 线程句柄名称*/
- thread_1_entry, /* 函数入口 */
- RT_NULL, /* 入口函数参数 */
- THREAD_1_STACK_SIZE, /* 线程栈大小 */
- THREAD_1_PRIORITY, /* 线程优先级 */
- THREAD_1_TIMESLICE /* 线程时间片大小 */
- );
-
- rt_thread_startup(thread_1_handle); /* 启动线程 */
- thread_2_handle = rt_thread_create(
- "thread_2", /* 线程句柄名称*/
- thread_2_entry, /* 函数入口 */
- RT_NULL, /* 入口函数参数 */
- THREAD_2_STACK_SIZE, /* 线程栈大小 */
- THREAD_2_PRIORITY, /* 线程优先级 */
- THREAD_2_TIMESLICE /* 线程时间片大小 */
- );
-
- rt_thread_startup(thread_2_handle); /* 启动线程 */
- /* 动态创建一个消息队列 */
- mq_test = rt_mq_create(
- "mq_test", /* 消息队列名称 */
- 2, /* 消息大小 */
- 12, /* 消息数量 */
- RT_IPC_FLAG_FIFO /* 选择线程唤醒的顺序方式 */
- );
- rt_hw_interrupt_enable(level);
- }
复制代码 执行流程如下:
- 线程1立即阻塞等待消息(队列空)
- 线程2开始高频发送(每200 ticks发送1次)
- 队列填满阶段
- 线程2发送12条消息后队列满 → rt_mq_send_wait()开始阻塞
- 系统进入双阻塞状态(线程1和2均挂起)
- 恢复同步阶段
- 线程1的rt_thread_delay(4000)到期后:
- 消费1条消息 → 释放1个队列空间
- 唤醒线程2继续发送
- 稳定状态:每4000 ticks消费1条,线程2每200 ticks补充1条
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |