单片机上实现基于RAM+FLASH的消息队列
背景简介:由于单片机上的RAM资源有限,而我们的物联终端时时刻刻都在采集数据,如果RAM中的数据未来得及发动到服务端(比如进入了信号盲区),我们就希望使用FLASH进行缓存,而FLASH缓存之后如何保证数据的有序性,以及不丢失,就需要在单片机中实现一个理论上无限长的消息队列
#include <stdio.h>
#include <string.h>
#define NODE_SIZE_KB 32
#define PAGE_SIZE 256
typedef struct STU_QUEUE_NODE
{
int head_offset;
int tail_offset;
int ele_count;
int remain_size;
char content[NODE_SIZE_KB * 1024 - sizeof(int) * 4];
} QUEUE_NODE;
static QUEUE_NODE buf1;
static QUEUE_NODE buf2;
static QUEUE_NODE *read_buffer_ptr;
static QUEUE_NODE *write_buffer_ptr;
static int page_in_counter = 0;
static int page_out_counter = 0;
static QUEUE_NODE page[PAGE_SIZE];
static int queue_inited = 0;
void init(QUEUE_NODE *node)
{
node->head_offset = node->content - (char *)node;
node->tail_offset = node->head_offset;
node->ele_count = 0;
node->remain_size = sizeof(node->content);
}
void get_info(QUEUE_NODE *node)
{
printf("addr: 0x%x; head: %d; tail: %d; count: %d; remain_size: %d\n",
(int)node, node->head_offset, node->tail_offset, node->ele_count, node->remain_size);
}
void page_in(QUEUE_NODE *node_ptr)
{
if(page_in_counter - page_out_counter >= PAGE_SIZE)
{
page_out_counter += 1;
}
int page_index = page_in_counter % PAGE_SIZE;
// TODO 修改为写flash
memcpy(&page[page_index], node_ptr, sizeof(QUEUE_NODE));
page_in_counter ++;
}
void page_out(QUEUE_NODE *node_ptr)
{
int page_index = page_out_counter % PAGE_SIZE;
// TODO 修改为读flash
memcpy(node_ptr, &page[page_index], sizeof(QUEUE_NODE));
page_out_counter ++;
}
static void write_message(int length, char *message_src)
{
// TODO 获取锁
if(write_buffer_ptr->remain_size < sizeof(length) + length)
{
if(write_buffer_ptr == read_buffer_ptr)
{
write_buffer_ptr = (read_buffer_ptr == &buf1) ? &buf2 : &buf1;
}
else
{
page_in(write_buffer_ptr);
}
init(write_buffer_ptr);
}
char *tail_ptr = (char *)write_buffer_ptr + write_buffer_ptr->tail_offset;
char *message_ptr = tail_ptr + sizeof(length);
*(int *)tail_ptr = length;
memcpy(message_ptr, message_src, length);
write_buffer_ptr->tail_offset += sizeof(length) + length;
write_buffer_ptr->ele_count += 1;
write_buffer_ptr->remain_size -= sizeof(length) + length;
// TODO 释放锁
}
static int read_message_p(QUEUE_NODE *node, char *message_dest)
{
if(node->ele_count > 0)
{
char *head_ptr = (char *)node + node->head_offset;
char *message_ptr = head_ptr + sizeof(int);
int length = *(int *)head_ptr;
memcpy(message_dest, message_ptr, length);
node->head_offset += sizeof(length) + length;
node->ele_count -= 1;
return length;
}
return 0;
}
int read_message(char *dest)
{
// TODO 获取锁
int read_length;
if(read_buffer_ptr->ele_count > 0)
{
read_length = read_message_p(read_buffer_ptr, dest);
}
else
{
if(read_buffer_ptr == write_buffer_ptr)
{
read_length = 0;
}
else
{
if(page_in_counter > page_out_counter)
{
page_out(read_buffer_ptr);
}
else
{
read_buffer_ptr = write_buffer_ptr;
}
read_length = read_message_p(read_buffer_ptr, dest);
}
}
// TODO 释放锁
return read_length;
}
void init_queue()
{
if(!queue_inited)
{
init(&buf1);
init(&buf2);
read_buffer_ptr = &buf1;
write_buffer_ptr = &buf1;
queue_inited = 1;
}
}
int main()
{
init_queue();
char buf[128];
for(int i = 0; i<101; i++)
{
sprintf(buf, "========================================= Message: %03d =========================================",i);
printf("Write: %s\n",buf);
write_message(strlen(buf), buf);
}
for(int i = 0; i < 101; i ++)
{
int read_length = read_message(buf);
buf[read_length] = 0;
printf("Read: %s\n", buf);
}
return 0;
}
其中todo部分需要调用单片机OS的资源保护,由于是在PC平台调试的代码,就未集成平台相关代码