Skip to content

nuttx/msgq: add kernel message queue support #16226

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

anchao
Copy link
Contributor

@anchao anchao commented Apr 16, 2025

Summary

nuttx/msgq: add kernel message queue support

Currently NuttX have 2 message queue implementations:

  1. Posix Message Queue (mq_close/mq_getattr/mq_getsetattr/mq_notify/mq_open/mq_overview/mq_receive/mq_send/mq_setattr/mq_timedreceive/mq_timedsend/mq_unlink)
  2. System V Message Queue (msgctl/msgget/msggrep/msginit/msgmerge/msgop/msgrcv/msgsnd)

Posix/SysteV message queues meet the standard implementation, But there are various limitations for kernel developer:

  1. Depends on the file system, and message sending and receiving require file descriptors as handles, resulting in additional resource overhead and performance degradation
  2. Do not support static memory pool configuration, and use global shared memory pools, which will cause more uncertainty in some use case.
  3. Cannot support additional capabilities(such as "message peek")

So in this PR, we are planing to introduce the "nxmsgq" implementation to simplify the development in kernel space:
(Compare with of Zephyr and FreeRTOS interfaces)

------------------------------------------------------------------------------
|      NuttX          |         Zephyr          |       FreeRTOS             |
|---------------------|-------------------------|----------------------------|
|   nxmsgq_init       |       k_msgq_init       |      xQueueCreateStatic    |
|---------------------|-------------------------|----------------------------|
|   nxmsgq_create     |    k_msgq_alloc_init    |      xQueueCreate          |
|---------------------|-------------------------|----------------------------|
|   nxmsgq_destroy    |    k_msgq_cleanup       |      vQueueDelete          |
|---------------------|-------------------------|----------------------------|
|   nxmsgq_used       |    k_msgq_num_used_get  |   uxQueueMessagesWaiting   |
|---------------------|-------------------------|----------------------------|
|   nxmsgq_space      |    k_msgq_num_free_get  |   uxQueueSpacesAvailable   |
|---------------------|-------------------------|----------------------------|
|   nxmsgq_purge      |     k_msgq_purge        |                            |
|---------------------|-------------------------|----------------------------|
|   nxmsgq_ticksend   |     k_msgq_put          |       xQueueSend           |
|   nxmsgq_trysend    |                         |      xQueueSendFromISR     |
|   nxmsgq_send       |                         |       xQueueSend           |
|---------------------|-------------------------|----------------------------|
|   nxmsgq_tickrecv   |     k_msgq_get          |      xQueueReceive         |
|   nxmsgq_tryrecv    |                         |    xQueueReceiveFromISR    |
|   nxmsgq_recv       |                         |       xQueueReceive        |
|---------------------|-------------------------|----------------------------|
|   nxmsgq_tickpeek   |     k_msgq_peek         |       xQueuePeek           |
|   nxmsgq_trypeek    |                         |       xQueuePeekFromISR    |
|   nxmsgq_peek       |                         |       xQueuePeek           |
|---------------------|-------------------------|----------------------------|
|   nxmsgq_is_empty   |                         |                            |
|   nxmsgq_is_full    |                         |                            |
------------------------------------------------------------------------------
Posix  Open Test(mq)     : loop: 1001:spending 0.13305000s
Kernel Open Test(nxmsgq) : loop: 1001:spending 0.6345000s (-52%)

Posix  Recv Test(mq)     : loop: 1001:spending 0.7884000s
Kernel Recv Test(nxmsgq) : loop: 1001:spending 0.6837000s (-13%)

Signed-off-by: chao an [email protected]

Impact

N/A

Testing

sim/nsh, Cortex-M55, test code as below


#include <nuttx/config.h>
#include <stdio.h>

#include <fcntl.h>   
#include <sys/stat.h>
#include <mqueue.h>
#include <nuttx/msgq.h>
#include <nuttx/irq.h>
#include <errno.h>

/****************************************************************************
 * Public Functions
 ****************************************************************************/

/****************************************************************************
 * hello_main
 ****************************************************************************/

static void timespec_sub(struct timespec *dest,                            
                         struct timespec *ts1,                             
                         struct timespec *ts2)                             
{                                                                          
  dest->tv_sec = ts1->tv_sec - ts2->tv_sec;                                
  dest->tv_nsec = ts1->tv_nsec - ts2->tv_nsec;                             
                                                                           
  if (dest->tv_nsec < 0)                                                   
    {                                                                      
      dest->tv_nsec += 1000000000;                                         
      dest->tv_sec -= 1;                                                   
    }                                                                      
}       

void mq_open_test(void)
{
  struct timespec result;                                                  
  struct timespec start;                                                   
  struct timespec end;     
  nxmsgq_t *msg;
  mqd_t mq;

  irqstate_t flags;
  int loop = 0;
  

  /* Posix Message Queue Test */

  flags = enter_critical_section();
  clock_gettime(CLOCK_MONOTONIC, &start);
  while (loop++ < 1000)
    {
      mq = mq_open("test", O_RDWR | O_CREAT, 0644, NULL);
      if (mq < 0) {
        printf("mq_open fail: %d\n", mq);
        break;
      }

      mq_close(mq);
    }
  clock_gettime(CLOCK_MONOTONIC, &end);
  leave_critical_section(flags);

  timespec_sub(&result, &end, &start);
  printf("Posix  Open Test : loop: %d:spending %lld.%lds\n", loop, result.tv_sec, result.tv_nsec);

  /* Kernel Message Queue Test */

  loop = 0;

  flags = enter_critical_section();
  clock_gettime(CLOCK_MONOTONIC, &start);
  while (loop++ < 1000)
    {
      msg = nxmsgq_create(64, 8);
      if (msg == NULL) {
        printf("nxmsgq_create fail: %p\n", msg);
        break;
      }

      nxmsgq_destroy(msg);
    }
  clock_gettime(CLOCK_MONOTONIC, &end);
  leave_critical_section(flags);

  timespec_sub(&result, &end, &start);

  printf("Kernel Open Test : loop: %d:spending %lld.%lds\n", loop, result.tv_sec, result.tv_nsec);
}

mqd_t     g_mq;
nxmsgq_t *g_msg;

void *message_thread(void *arg)
{
  struct timespec result;                                                  
  struct timespec start;                                                   
  struct timespec end;     
  irqstate_t flags;
  char tmp[64];
  int loop = 0;
  int ret;
  unsigned int prio;

  /* Posix Message Queue Test */

  flags = enter_critical_section();
  clock_gettime(CLOCK_MONOTONIC, &start);
  while (loop++ < 1000)
    {
      ret = mq_receive(g_mq, tmp, 64, &prio);
      if (ret < 0)
        {
          printf("mq_receive fail: %d, loop: %d, errno: %d\n", ret, loop, errno);
          return NULL;
        }
    }
  clock_gettime(CLOCK_MONOTONIC, &end);
  leave_critical_section(flags);

  timespec_sub(&result, &end, &start);
  printf("Posix  Recv Test : loop: %d:spending %lld.%lds\n", loop, result.tv_sec, result.tv_nsec);

  /* Kernel Message Queue Test */

  loop = 0;

  flags = enter_critical_section();
  clock_gettime(CLOCK_MONOTONIC, &start);
  while (loop++ < 1000)
    {
      ret = nxmsgq_recv(g_msg, tmp);
      if (ret < 0)
        {
          printf("nxmsgq_recv fail: %d, loop: %d\n", ret, loop);
          return NULL;
        }
    }
  clock_gettime(CLOCK_MONOTONIC, &end);
  leave_critical_section(flags);

  timespec_sub(&result, &end, &start);
  printf("Kernel Recv Test : loop: %d:spending %lld.%lds\n", loop, result.tv_sec, result.tv_nsec);

  return NULL;
}


void mq_sendrecv_test(void)
{
  struct sched_param sparam;
  pthread_attr_t tattr;
  pthread_t pid;
  char tmp[64];
  int loop = 0;
  int ret;

  g_mq = mq_open("test", O_RDWR | O_CREAT, 0644, NULL);
  if (g_mq < 0)
    {
      printf("mq_open fail: %d\n", g_mq);
      return;
    }

  g_msg = nxmsgq_create(64, 8);
  if (g_msg < 0)
    {
      printf("nxmsgq_create fail: %p\n", g_msg);
      return;
    }

  pthread_attr_init(&tattr);
  sparam.sched_priority = 200;
  pthread_attr_setschedparam(&tattr, &sparam);

  pthread_create(&pid, &tattr, message_thread, NULL);

  while (loop++ < 1000)
    {
      ret = mq_send(g_mq, tmp, 64, 0);
      if (ret < 0)
        {
          printf("mq_send fail: %d\n", ret);
          return;
        }
    }

  loop = 0;
  while (loop++ < 1000)
    {
      ret = nxmsgq_send(g_msg, tmp);
      if (ret < 0)
        {
          printf("nxmsgq_send fail: %d\n", ret);
          return;
        }
    }

}

int main(int argc, FAR char *argv[])
{
  mq_open_test();
  mq_sendrecv_test();
  return 0;
}

Currently NuttX have 2 message queue implementations:

1. Posix Message Queue    (mq_close/mq_getattr/mq_getsetattr/mq_notify/mq_open/mq_overview/mq_receive/mq_send/mq_setattr/mq_timedreceive/mq_timedsend/mq_unlink)
2. System V Message Queue (msgctl/msgget/msggrep/msginit/msgmerge/msgop/msgrcv/msgsnd)

Posix/SysteV message queues meet the standard implementation, But there are various limitations for kernel developer:

1. Depends on the file system, and message sending and receiving require file descriptors as handles, resulting in additional resource overhead and performance degradation
2. Do not support static memory pool configuration, and use global shared memory pools, which will cause more uncertainty in some use case.
3. Cannot support additional capabilities(such as "message peek")

So in this PR, we are planing to introduce the "nxmsgq" implementation to simplify the development in kernel space:
(Compare with of Zephyr and FreeRTOS interfaces)

------------------------------------------------------------------------------
|      NuttX          |         Zephyr          |       FreeRTOS             |
|---------------------|-------------------------|----------------------------|
|   nxmsgq_init       |       k_msgq_init       |      xQueueCreateStatic    |
|---------------------|-------------------------|----------------------------|
|   nxmsgq_create     |    k_msgq_alloc_init    |      xQueueCreate          |
|---------------------|-------------------------|----------------------------|
|   nxmsgq_destroy    |    k_msgq_cleanup       |      vQueueDelete          |
|---------------------|-------------------------|----------------------------|
|   nxmsgq_used       |    k_msgq_num_used_get  |   uxQueueMessagesWaiting   |
|---------------------|-------------------------|----------------------------|
|   nxmsgq_space      |    k_msgq_num_free_get  |   uxQueueSpacesAvailable   |
|---------------------|-------------------------|----------------------------|
|   nxmsgq_purge      |     k_msgq_purge        |                            |
|---------------------|-------------------------|----------------------------|
|   nxmsgq_ticksend   |     k_msgq_put          |       xQueueSend           |
|   nxmsgq_trysend    |                         |      xQueueSendFromISR     |
|   nxmsgq_send       |                         |       xQueueSend           |
|---------------------|-------------------------|----------------------------|
|   nxmsgq_tickrecv   |     k_msgq_get          |      xQueueReceive         |
|   nxmsgq_tryrecv    |                         |    xQueueReceiveFromISR    |
|   nxmsgq_recv       |                         |       xQueueReceive        |
|---------------------|-------------------------|----------------------------|
|   nxmsgq_tickpeek   |     k_msgq_peek         |       xQueuePeek           |
|   nxmsgq_trypeek    |                         |       xQueuePeekFromISR    |
|   nxmsgq_peek       |                         |       xQueuePeek           |
|---------------------|-------------------------|----------------------------|
|   nxmsgq_is_empty   |                         |                            |
|   nxmsgq_is_full    |                         |                            |
------------------------------------------------------------------------------

Posix  Open Test(mq)     : loop: 1001:spending 0.13305000s
Kernel Open Test(nxmsgq) : loop: 1001:spending 0.6345000s (-52%)

Posix  Recv Test(mq)     : loop: 1001:spending 0.7884000s
Kernel Recv Test(nxmsgq) : loop: 1001:spending 0.6837000s (-13%)

Signed-off-by: chao an <[email protected]>
@github-actions github-actions bot added Area: OS Components OS Components issues Size: L The size of the change in this PR is large labels Apr 16, 2025
@nuttxpr
Copy link

nuttxpr commented Apr 16, 2025

[Experimental Bot, please feedback here]

This PR does not fully meet the NuttX requirements. Here's why:

  • Impact is understated: The addition of a new message queue implementation has a significant impact. The PR author marks impact as N/A, which is incorrect. At minimum, the documentation impact should be YES, as new APIs require documentation. Compatibility may also be affected if existing code uses other message queue implementations and wants to switch. The performance impact also needs to be documented and justified. While a benchmark is provided, a more detailed analysis of the trade-offs would be beneficial.

  • Testing is incomplete: While tests are provided, they don't appear to cover all code paths. Edge cases such as queue full/empty conditions, timeout handling, and interrupt context usage (if supported) should be tested. More details about the test environment are also needed. "sim/nsh, Cortex-M55" isn't sufficient. Specific simulator details and the Cortex-M55 board/configuration used should be listed. The "before change" logs are missing entirely. These logs are crucial for demonstrating that the changes fix or improve an existing issue without introducing regressions.

  • Missing information: The summary mentions Zephyr and FreeRTOS. While useful for context, it doesn't explain why the NuttX-specific implementation is needed beyond some general statements. A more concrete comparison (e.g., a table showing feature support and limitations of each) would be helpful in justifying the design choices. The summary also lacks a clear description of how the message queue is implemented (e.g., data structures used, synchronization mechanisms).

To fully meet the requirements, the PR author needs to:

  1. Expand the Impact section: Detail the documentation updates, potential compatibility issues, and the reasoning behind the performance trade-offs.

  2. Improve the Testing section: Provide "before change" logs, test edge cases and error conditions, and specify the exact testing environment. Consider adding tests for each of the new API functions.

  3. Clarify the Summary: Explain the implementation details of the message queue and provide a more structured comparison with existing NuttX message queue implementations as well as Zephyr and FreeRTOS. Justify the design choices and the need for a new implementation more convincingly. It's helpful to clarify whether this new implementation is meant to replace the existing POSIX and System V queues eventually, or if it's intended to coexist with them and serve different use cases.

Copy link
Contributor

@acassis acassis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please include Documentation/, the commit Summary could be used as a starting point


typedef struct nxmsgq
{
struct circbuf_s cbuf;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would using a linked list (sq or dq) be an option here? The messages themselves would obviously need a bit more space, but it would remove the "max_msgs" limit from this implementation ?

Copy link
Contributor Author

@anchao anchao Apr 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, list is a good suggestion. I will improve it if we want to implement a priority queue in the future, However in this stage, if we only compare the API with freertos/zephyr, the current data type already meets the requirements.

max_msgs is the behavior expected by users. Most kernel developers expect that the queue is within a controllable range, which will help them find problems in advance.

* nxmsgq_destroy()
*
* Input Parameters:
* msg_size - Message size (in bytes).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that a nxmsgq can only handle messages of a certain size ? Isn't this limitation a bit too harsh ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

int nxmsgq_used(FAR nxmsgq_t *msgq);

/****************************************************************************
* Name: nxmsgq_used
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nxmsgq_space

@pussuw
Copy link
Contributor

pussuw commented Apr 17, 2025

I like the idea a lot, I was always wondering how NuttX did not already have this kernel internal message system (which does not depend on a file system). Like stated in the summary field, many RTOSes provide this and I think it is an extremely useful and powerful synchronization mechanism.

@anchao anchao marked this pull request as draft April 17, 2025 17:00
@anchao
Copy link
Contributor Author

anchao commented Apr 17, 2025

let me mark this PR as a draft first . I want to support the priority queue and zero-copy in this pull request.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area: OS Components OS Components issues Size: L The size of the change in this PR is large
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants