发新帖

RT-Thread学习笔记——消息队列

[复制链接]
2373 0

本文包含源代码、原理图、PCB、封装库、中英文PDF等资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
前言
本文学习RT-Thread的消息队列,支持不定长度消息的收发,涉及消息队列的工作机制、消息队列相关函数以及基于STM32的RT-Thread消息队列应用示例,采用RTT&正点原子联合出品潘多拉开发板进行实验,基于STM32L475VET6。

一、消息队列的工作机制
消息队列能够接收来自线程或中断服务例程中不固定长度的消息,并把消息缓存在自己的内存空间中。其他线程也能够从消息队列中读取相应的消息,而当消息队列是空的时候,可以挂起读取线程。当有新的消息到达时,挂起的线程将被唤醒以接收并处理消息。消息队列是一种异步的通信方式。

如下图所示,线程或中断服务例程可以将一条或多条消息放入消息队列中。同样,一个或多个线程也可以从消息队列中获得消息。当有多个消息发送到消息队列时,通常将先进入消息队列的消息先传给线程,也就是说,线程先得到的是最先进入消息队列的消息,即先进先出原则 (FIFO)。

20190213143127143.png
消息队列工作示意图(来源RT-Thread编程指南)

二、消息队列的相关函数
1、创建动态消息队列函数:创建动态消息队列时先从对象管理器中分配一个消息队列对象,然后给消息队列对象分配一块内存空间,组织成空闲消息链表,这块内存的大小 =[消息大小 + 消息头(用于链表连接)的大小]X 消息队列最大个数,接着再初始化消息队列,此时消息队列为空。创建动态消息队列的函数接口如下所示:
  1. rt_mq_t rt_mq_create(const char *name,
  2.                      rt_size_t   msg_size,
  3.                      rt_size_t   max_msgs,
  4.                      rt_uint8_t  flag);
复制代码
(1)入口参数:

name:消息队列的名称。
msg_size:消息队列中一条消息的最大长度,单位字节。
max_msgs:消息队列的最大个数。
flag:消息队列采用的等待方式,它可以取如下数值:RT_IPC_FLAG_FIFO 或RT_IPC_FLAG_PRIO。

(2)返回值:

RT_EOK:发送成功 消息队列对象的句柄成功。
RT_NULL:失败。



2、删除动态消息队列函数:当动态消息队列不再被使用时,应该删除它以释放系统资源,一旦操作完成,动态消息队列将被永久性地删除。删除动态消息队列时,如果有线程被挂起在该消息队列等待队列上,则内核先唤醒挂起在该消息等待队列上的所有线程(线程返回值是 - RT_ERROR),然后再释放消息队列使用的内存,最后删除消息队列对象。函数接口如下:
  1. rt_err_t rt_mq_delete(rt_mq_t mq);
复制代码
(1)入口参数:

mq:消息队列对象的句柄。

(2)返回值:
RT_EOK:成功。



3、创建静态消息队列函数:创建静态消息队列和《RT-Thread编程指南》所讲的初始化静态消息队列是一样的,静态消息队列对象的内存是在系统编译时由编译器分配的,一般放于读数据段或未初始化数据段中。消息队列初始化后所有消息都挂在空闲消息链表上,消息队列为空,函数接口如下:
  1. rt_err_t rt_mq_init(rt_mq_t     mq,
  2.                     const char *name,
  3.                     void       *msgpool,
  4.                     rt_size_t   msg_size,
  5.                     rt_size_t   pool_size,
  6.                     rt_uint8_t  flag);
复制代码
(1)入口参数:

mq:消息队列对象的句柄。
name:消息队列的名称。
msgpool:指向存放消息的缓冲区的指针。
msg_size:消息队列中一条消息的最大长度,单位字节。
pool_size:存放消息的缓冲区大小。
flag:消息队列采用的等待方式,它可以取如下数值:RT_IPC_FLAG_FIFO 或RT_IPC_FLAG_PRIO。

(2)返回值:

RT_EOK:成功。



4、删除静态消息队列函数:删除静态消息队列和《RT-Thread编程指南》所讲的脱离消息队列是一样的,脱离消息队列将使消息队列对象被从内核对象管理器中脱离。内核先唤醒所有挂在该消息等待队列对象上的线程(线程返回值是 RT_ERROR),然后将该消息队列对象从内核对象管理器中脱离。函数接口如下:
  1. rt_err_t rt_mq_detach(rt_mq_t mq);
复制代码
(1)入口参数:

mq:消息队列对象的句柄。

(2)返回值:

RT_EOK:成功。



5、发送消息函数:线程或者中断服务程序都可以给消息队列发送消息。当发送消息时,消息队列对象先从空闲消息链表上取下一个空闲消息块,把线程或者中断服务程序发送的消息内容复制到消息块上,然后把该消息块挂到消息队列的尾部。当且仅当空闲消息链表上有可用的空闲消息块时,发送者才能成功发送消息;当空闲消息链表上无可用消息块,说明消息队列已满,此时,发送消息的的线程或者中断程序会收到一个错误码(-RT_EFULL)。在发送一个普通消息之后,空闲消息链表上的队首消息被转移到了消息队列尾。函数接口如下:
  1. rt_err_t rt_mq_send (rt_mq_t mq, void* buffer, rt_size_t size);
复制代码
(1)入口参数:

mq:消息队列对象的句柄。
buffer:消息内容。
size:消息大小。

(2)返回值:

RT_EOK:成功。
RT_EFULL:消息队列已满。
RT_ERROR:失败,表示发送的消息长度大于消息队列中消息的最大长度。



6、发送紧急消息函数:发送紧急消息的过程与发送消息几乎一样,唯一的不同是,当发送紧急消息时,从空闲消息链表上取
下来的消息块不是挂到消息队列的队尾,而是挂到队首,这样,接收者就能够优先接收到紧急消息,从而及时进行消息处理。发送紧急消息的函数接口如下:
  1. rt_err_t rt_mq_urgent(rt_mq_t mq, void* buffer, rt_size_t size);
复制代码
(1)入口参数:

mq:消息队列对象的句柄。
buffer:消息内容。
size:消息大小。

(2)返回值:

RT_EOK:成功。
RT_EFULL:消息队列已满。
RT_ERROR:失败。



7、接收消息函数:当消息队列中有消息时,接收者才能接收消息,否则接收者会根据超时时间设置,或挂起在消息队列的等待线程队列上,或直接返回。接收消息时,接收者需指定存储消息的消息队列对象句柄,并且指定一个内存缓冲区,接收到的消息
内容将被复制到该缓冲区里。此外,还需指定未能及时取到消息时的超时时间。接收一个消息后消息队列上的队首消息被转移到了空闲消息链表的尾部。函数接口如下:
  1. rt_err_t rt_mq_recv(rt_mq_t    mq,
  2.                     void      *buffer,
  3.                     rt_size_t  size,
  4.                     rt_int32_t timeout);
复制代码
(1)入口参数:

mq:消息队列对象的句柄。
buffer:消息内容。
size:消息大小。
timeout:指定的超时时间。

(2)返回值:

RT_EOK:成功收到。
RT_ETIMEOUT:超时。
RT_ERROR:失败,返回错误。

三、基于STM32的消息队列应用示例
前将了很多消息队列的理论知识,光说不练都是假把式,那么接下来我们就进行实际操作,基于潘多拉开发板进行实验。创建一个消息队列和两个线程,其中一个线程用于发送消息和发送紧急消息,另外一个线程用于接收消息。通过按下按键KEY0发送消息msg1,按下按键KEY1发送紧急消息msg2,按下按键KEY2接收消息,接收到消息后打印出来。

1、实现代码:
  1. /* 线程句柄 */
  2. static rt_thread_t thread1 = RT_NULL;
  3. static rt_thread_t thread2 = RT_NULL;
  4. /* 消息队列句柄 */
  5. static rt_mq_t msgqueue1 = RT_NULL;
  6. /**************************************************************
  7. 函数名称 : thread1_recv_msgqueue
  8. 函数功能 : 线程1入口函数,用于接收消息
  9. 输入参数 : parameter:入口参数
  10. 返回值           : 无
  11. 备注                 : 无
  12. **************************************************************/
  13. void thread1_recv_msgqueue(void *parameter)
  14. {
  15.         u8 key;
  16.         char buf[64];
  17.        
  18.         while(1)
  19.         {
  20.                 key = key_scan(0);
  21.                
  22.                 if(key== KEY2_PRES)
  23.                 {       
  24.                         if(rt_mq_recv(msgqueue1, buf, sizeof(buf), RT_WAITING_FOREVER) == RT_EOK)
  25.                         {
  26.                                 rt_kprintf("recv msg:%s\r\n", buf);
  27.                         }
  28.                         else
  29.                         {
  30.                                 rt_kprintf("recv msg failed\r\n");
  31.                         }
  32.                 }
  33.                
  34.                 rt_thread_mdelay(1);
  35.         }
  36. }
  37. /**************************************************************
  38. 函数名称 : thread2_send_mailbox_msg
  39. 函数功能 : 线程2入口函数,用于发送消息
  40. 输入参数 : parameter:入口参数
  41. 返回值           : 无
  42. 备注                 : 无
  43. **************************************************************/
  44. void thread2_send_msgqueue(void *parameter)
  45. {
  46.         u8 key;
  47.         rt_err_t res;
  48.        
  49.         while(1)
  50.         {
  51.                 key = key_scan(0);
  52.                
  53.                 if(key== KEY0_PRES)
  54.                 {
  55.                          res = rt_mq_send(msgqueue1, "msg1", sizeof("msg1"));
  56.                          if(res == RT_EOK)
  57.                          {
  58.                                 rt_kprintf("msgqueue send msg1 successful\r\n");
  59.                          }
  60.                          else
  61.                          {
  62.                                 rt_kprintf("msgqueue send msg1 failed\r\n");
  63.                          }
  64.                 }
  65.                 else if(key== KEY1_PRES)
  66.                 {
  67.                          rt_mq_urgent(msgqueue1, "msg2", sizeof("msg2"));
  68.                        
  69.                          if(res == RT_EOK)
  70.                          {
  71.                                 rt_kprintf("msgqueue send msg2 successful\r\n");
  72.                          }
  73.                          else
  74.                          {
  75.                                 rt_kprintf("msgqueue send msg2 failed\r\n");
  76.                          }
  77.                 }
  78.                
  79.                 rt_thread_mdelay(1);
  80.         }
  81. }
  82. void rtthread_msgqueue_test(void)
  83. {
  84.         msgqueue1 = rt_mq_create("msgqueue1",
  85.                                 64, /* 消息最大长度 */
  86.                                 10, /* 消息队列最大容量 */
  87.                                 RT_IPC_FLAG_FIFO);/* FIFO模式 */
  88.        
  89.         if(msgqueue1 != RT_NULL)
  90.         {
  91.                 rt_kprintf("RT-Thread create msgqueue successful\r\n");
  92.         }
  93.         else
  94.         {
  95.                 rt_kprintf("RT-Thread create msgqueue failed\r\n");
  96.                 return;
  97.         }
  98.         thread1 = rt_thread_create("thread1",
  99.                                 thread1_recv_msgqueue,
  100.                                 NULL,
  101.                                 512,
  102.                                 3,
  103.                                 20);
  104.         if(thread1 != RT_NULL)
  105.         {
  106.                 rt_thread_startup(thread1);;
  107.         }
  108.         else
  109.         {
  110.                 rt_kprintf("create thread1 failed\r\n");
  111.                 return;
  112.         }
  113.         thread2 = rt_thread_create("thread2",
  114.                                 thread2_send_msgqueue,
  115.                                 NULL,
  116.                                 512,
  117.                                 2,
  118.                                 20);
  119.         if(thread2 != RT_NULL)
  120.         {
  121.                 rt_thread_startup(thread2);;
  122.         }
  123.         else
  124.         {
  125.                 rt_kprintf("create thread2 failed\r\n");
  126.                 return;
  127.         }
  128.        
  129. }
复制代码
2、观察FinSH:

(1)开机,按下KEY2接收消息,没有任何反应,为什么呢?因为此时消息队列为空,还没有任何消息,输入list_msgqueue可查看当前的消息队列有msgqueue1,消息队列里面有0条消息,以及挂起等待消息的线程为thread1:

20190213152417596.png

(2)按下KEY0发送消息msg1,提示发送成功以及打印出接收到msg1的内容(因为前面步骤(1)我们已经先按下KEY2然后thread1挂起等待接收消息了,所有为直接打印出msg1):

20190213152758313.png

(3)接着连续按下10次KEY0发送消息msg1,当按下第11次时,发送消息msg1失败,因为初始化消息队列时最大只有支持10条消息:

20190213153034490.png

(4)连续按下10次KEY2接收消息:

20190213153145623.png

(5)先按下两次KEY0发送两次消息msg1,再按下一次KEY1发送一次紧急消息msg2:

20190213153312518.png

(6)连续按下3次KEY2接收消息,先接收到的是一条msg2,再是两条msg1(因为前面步骤(2)按下KEY1是发送紧急消息,所以先接收到的是msg2):

20190213153532331.png

四、消息队列的使用场合
消息队列可以应用于发送不定长消息的场合,包括线程与线程间的消息交换,以及中断服务例程中给线程发送消息(中断服务例程不能接收消息)。

1、发送消息

(1)消息队列和邮箱的明显不同是消息的长度并不限定在 4 个字节以内;另外,消息队列也包括了一个发送紧急消息的函数接口。但是当创建的是一个所有消息的最大长度是 4 字节的消息队列时,消息队列对象将蜕化成邮箱。这个不限定长度的消息,也及时的反应到了代码编写的场合上,同样是类似邮箱的代码:
  1. struct msg
  2. {
  3.     rt_uint8_t *data_ptr; /* 数 据 块 首 地 址 */
  4.     rt_uint32_t data_size; /* 数 据 块 大 小 */
  5. };
复制代码
(2)和邮箱例子相同的消息结构定义,假设依然需要发送这样一个消息给接收线程。在邮箱例子中,这个结构只能够发送指向这个结构的指针(在函数指针被发送过去后,接收线程能够正确的访问指向这个地址的内容,通常这块数据需要留给接收线程来释放)。而使用消息队列的方式则大不相同:
  1. void send_op(void *data, rt_size_t length)
  2. {
  3.     struct msg msg_ptr;
  4.     msg_ptr.data_ptr = data; /* 指 向 相 应 的 数 据 块 地址 */
  5.     msg_ptr.data_size = length; /* 数 据 块 的 长 度 */
  6.     /* 发 送 这 个 消 息 指 针 给 mq 消 息 队 列 */
  7.     rt_mq_send(mq, (void*)&msg_ptr, sizeof(struct msg));
  8. }
复制代码
(3)注意,上面的代码中,是把一个局部变量的数据内容发送到了消息队列中。在接收线程中,同样也采用局部变量进行消息接收的结构体:
  1. void message_handler()
  2. {
  3.     struct msg msg_ptr; /* 用 于 放 置 消 息 的 局 部 变 量 */
  4.     /* 从 消 息 队 列 中 接 收 消 息 到 msg_ptr 中 */
  5.     if (rt_mq_recv(mq, (void*)&msg_ptr, sizeof(struct msg)) == RT_EOK)
  6.     {
  7.         /* 成 功 接 收 到 消 息, 进 行 相 应 的 数 据 处 理 */
  8.     }
  9. }
复制代码
因为消息队列是直接的数据内容复制,所以在上面的例子中,都采用了局部变量的方式保存消息结构体,这样也就免去动态内存分配的烦恼了(也就不用担心,接收线程在接收到消息时,消息内存空间已经被释放)。



2、同步消息

在一般的系统设计中会经常遇到要发送同步消息的问题,这个时候就可以根据当时状态的不同选择相应的实现:两个线程间可以采用 [消息队列 + 信号量或邮箱] 的形式实现。发送线程通过消息发送的形式发送相应的消息给消息队列,发送完毕后希望获得接收线程的收到确认,工作示意图如下图所示:



20190213154222915.png
同步消息示意图(来源RT-Thread编程指南)

根据消息确认的不同,可以把消息结构体定义成:

(1)消息采用了信号量来作为确认标志。信号量作为确认标志只能够单一的通知发送线程,消息已经确认接收
  1. struct msg
  2. {
  3.     /* 消 息 结 构 其 他 成 员 */
  4.     struct rt_semaphore ack;
  5. };
复制代码
(2)消息使用了邮箱来作为确认标志,邮箱作为确认标志,代表着接收线程能够通知一些状态值给发送线程:
  1. struct msg
  2. {
  3.     /* 消 息 结 构 其 他 成 员 */
  4.     struct rt_mailbox ack;
  5. };
复制代码
参考文献:
1、[野火®]《RT-Thread 内核实现与应用开发实战—基于STM32》

2、《RT-THREAD 编程指南》

*滑块验证:
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

更多

客服中心

微信扫描二维码 服务时间:周一至周日 8:30-22:00
快速回复 返回顶部 返回列表