单片机上实现基于RAM+FLASH的消息队列

背景简介:由于单片机上的RAM资源有限,而我们的物联终端时时刻刻都在采集数据,如果RAM中的数据未来得及发动到服务端(比如进入了信号盲区),我们就希望使用FLASH进行缓存,而FLASH缓存之后如何保证数据的有序性,以及不丢失,就需要在单片机中实现一个理论上无限长的消息队列

#include <stdio.h>
#include <string.h>

#define NODE_SIZE_KB 32
#define PAGE_SIZE 256

typedef struct STU_QUEUE_NODE
{
    int head_offset;
    int tail_offset;
    int ele_count;
    int remain_size;
    char content[NODE_SIZE_KB * 1024 - sizeof(int) * 4];
} QUEUE_NODE;

static QUEUE_NODE buf1;
static QUEUE_NODE buf2;
static QUEUE_NODE *read_buffer_ptr;
static QUEUE_NODE *write_buffer_ptr;
static int page_in_counter = 0;
static int page_out_counter = 0;
static QUEUE_NODE page[PAGE_SIZE];
static int queue_inited = 0;

void init(QUEUE_NODE *node)
{
    node->head_offset = node->content - (char *)node;
    node->tail_offset = node->head_offset;
    node->ele_count = 0;
    node->remain_size = sizeof(node->content);
}

void get_info(QUEUE_NODE *node)
{
    printf("addr: 0x%x; head: %d; tail: %d; count: %d; remain_size: %d\n",
    (int)node, node->head_offset, node->tail_offset, node->ele_count, node->remain_size);
}

void page_in(QUEUE_NODE *node_ptr)
{
    if(page_in_counter - page_out_counter >= PAGE_SIZE)
    {
        page_out_counter += 1;
    }

    int page_index = page_in_counter % PAGE_SIZE;

    // TODO 修改为写flash
    memcpy(&page[page_index], node_ptr, sizeof(QUEUE_NODE));

    page_in_counter ++;
}

void page_out(QUEUE_NODE *node_ptr)
{
    int page_index = page_out_counter % PAGE_SIZE;

    // TODO 修改为读flash
    memcpy(node_ptr, &page[page_index], sizeof(QUEUE_NODE));
    page_out_counter ++;
}

static void write_message(int length, char *message_src)
{
    // TODO 获取锁
    if(write_buffer_ptr->remain_size < sizeof(length) + length)
    {
        if(write_buffer_ptr == read_buffer_ptr)
        {
            write_buffer_ptr = (read_buffer_ptr == &buf1) ? &buf2 : &buf1;
        }
        else
        {
            page_in(write_buffer_ptr);
        }

        init(write_buffer_ptr);
    }

    char *tail_ptr = (char *)write_buffer_ptr + write_buffer_ptr->tail_offset;
    char *message_ptr = tail_ptr + sizeof(length);
    
    *(int *)tail_ptr = length;
    memcpy(message_ptr, message_src, length);

    write_buffer_ptr->tail_offset += sizeof(length) + length;
    write_buffer_ptr->ele_count += 1;
    write_buffer_ptr->remain_size -= sizeof(length) + length;
    // TODO 释放锁
}

static int read_message_p(QUEUE_NODE *node, char *message_dest)
{
    if(node->ele_count > 0)
    {
        char *head_ptr = (char *)node + node->head_offset;
        char *message_ptr = head_ptr + sizeof(int);

        int length = *(int *)head_ptr;
        memcpy(message_dest, message_ptr, length);
        
        node->head_offset += sizeof(length) + length;
        node->ele_count -= 1;
        return length;
    }
    return 0;
}

int read_message(char *dest)
{
    // TODO 获取锁

    int read_length;
    if(read_buffer_ptr->ele_count > 0)
    {
        read_length = read_message_p(read_buffer_ptr, dest);
    }
    else
    {
        if(read_buffer_ptr == write_buffer_ptr)
        {
            read_length = 0;
        }
        else
        {
            if(page_in_counter > page_out_counter)
            {
                page_out(read_buffer_ptr);
            }
            else
            {
                read_buffer_ptr = write_buffer_ptr;
            }

            read_length = read_message_p(read_buffer_ptr, dest);
        }
    }

    // TODO 释放锁
    return read_length;
}

void init_queue()
{
    if(!queue_inited)
    {
        init(&buf1);
        init(&buf2);

        read_buffer_ptr = &buf1;
        write_buffer_ptr = &buf1;

        queue_inited = 1;
    }
}

int main()
{
    init_queue();

    char buf[128];
    for(int i = 0; i<101; i++)
    {
        sprintf(buf, "========================================= Message: %03d =========================================",i);
        printf("Write: %s\n",buf);
        write_message(strlen(buf), buf);
    }

    for(int i = 0; i < 101; i ++)
    {
        int read_length = read_message(buf);
        buf[read_length] = 0;
        printf("Read: %s\n", buf);
    }

    return 0;
}

其中todo部分需要调用单片机OS的资源保护,由于是在PC平台调试的代码,就未集成平台相关代码