F´ Flight Software - C/C++ Documentation devel
A framework for building embedded system applications to NASA flight quality standards.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
LocklessQueue.cpp
Go to the documentation of this file.
2#include <Fw/Types/Assert.hpp>
3
4#include <fcntl.h>
5#include <cstring>
6#include <new>
7
8#define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)
9
10namespace Os {
11
13 const NATIVE_INT_TYPE msgsize) {
14#ifndef BUILD_DARWIN
15 m_attr.mq_flags = O_NONBLOCK;
16 m_attr.mq_maxmsg = maxmsg;
17 m_attr.mq_msgsize = msgsize;
18 m_attr.mq_curmsgs = 0;
19#endif
20 // Must have at least 2 messages in the queue
21 FW_ASSERT(maxmsg >= 2, maxmsg);
22
23 m_index = new(std::nothrow) QueueNode[maxmsg]; // Allocate an index entry for each msg
24 m_data = new(std::nothrow) U8[maxmsg * msgsize]; // Allocate data for each msg
25 FW_ASSERT(m_index != nullptr);
26 FW_ASSERT(m_data != nullptr);
27
28 for (int i = 0, j = 1; i < maxmsg; i++, j++) {
29 m_index[i].data = &m_data[i * msgsize]; // Assign the data pointer of index to top that msg's buffer
30
31 if (j < maxmsg) { // If we aren't processing the last item
32 m_index[i].next = &m_index[j]; // Chain this item to the next one
33 } else {
34 m_index[i].next = nullptr; // Initialize to NULL otherwise
35 }
36 }
37
38 // Assign each of the pointers to the first index element
39 // This one is held in a "dummy" position for now
40 // It will be cleaned up by the producer
41 m_first = &m_index[0];
42 m_last = &m_index[0];
43 m_last->next = nullptr;
44
45 // Assign the head of the free list to the second element
46 m_free_head = &m_index[1];
47 }
48
50 delete[] m_index;
51 delete[] m_data;
52 }
53
54#ifndef BUILD_DARWIN
55 void LocklessQueue::GetAttr(mq_attr & attr) {
56 memcpy(&attr, &m_attr, sizeof(mq_attr));
57 }
58#endif
59
60 void LocklessQueue::PushFree(QueueNode * my_node) {
61 QueueNode * old_free_head;
62
63 FW_ASSERT(my_node != nullptr);
64
65 // CAS the node into the free list
66 do {
67 old_free_head = m_free_head;
68 my_node->next = old_free_head;
69 } while (!CAS(&m_free_head, old_free_head, my_node));
70 }
71
72 bool LocklessQueue::PopFree(QueueNode ** free_node) {
73 QueueNode * my_node;
74
75 // CAS a buffer from the free list
76 do {
77 my_node = m_free_head;
78
79 if (nullptr == my_node) {
80 return false;
81 }
82
83 } while (!CAS(&m_free_head, my_node, my_node->next));
84
85 (*free_node) = my_node;
86
87 return true;
88 }
89
91 NATIVE_INT_TYPE size) {
92
93 QueueNode * my_node;
94 QueueNode * old_last;
95
96#ifndef BUILD_DARWIN
97 // Check that the new message will fit in our buffers
98 if (size > m_attr.mq_msgsize) {
100 }
101#endif
102
103 if (!PopFree(&my_node)) {
104 return Queue::QUEUE_FULL;
105 }
106
107 // Copy the data into the buffer
108 memcpy(my_node->data, buffer, size);
109 my_node->size = size;
110 my_node->next = nullptr;
111
112 // Publish the node
113 // The m_last pointer is moved before the item is published
114 // All concurrent writers will argue over their order using CAS
115 do {
116 old_last = m_last;
117 } while (!CAS(&m_last, old_last, my_node));
118
119 old_last->next = my_node;
120
121 return Queue::QUEUE_OK;
122 }
123
124 // ONLY ONE RECEIVER AT A TIME!!!!
126 NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE & size) {
127
128 QueueNode * my_node;
129 QueueNode * old_node;
130
131#ifndef BUILD_DARWIN
132 if (capacity < m_attr.mq_msgsize) {
134 }
135#endif
136
137 if (m_first == m_last) {
139 }
140
141 old_node = m_first;
142 my_node = m_first->next;
143
144 if (my_node == nullptr) {
145 // We may not be fully linked yet, even though "last" has moved
147 }
148
149 m_first = m_first->next;
150 PushFree(old_node);
151
152 // Copy the data from the buffer
153 memcpy(buffer, my_node->data, my_node->size);
154 size = my_node->size;
155
156 return Queue::QUEUE_OK;
157 }
158
159}
#define FW_ASSERT(...)
Definition Assert.hpp:14
PlatformIntType NATIVE_INT_TYPE
Definition BasicTypes.h:51
uint8_t U8
8-bit unsigned integer
Definition BasicTypes.h:26
#define CAS(a_ptr, a_oldVal, a_newVal)
Os::Queue::QueueStatus Receive(U8 *buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &size)
LocklessQueue(NATIVE_INT_TYPE maxmsg, NATIVE_INT_TYPE msgsize)
void GetAttr(mq_attr &attr)
Os::Queue::QueueStatus Send(const U8 *buffer, NATIVE_INT_TYPE size)
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition Queue.hpp:31
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition Queue.hpp:29
@ QUEUE_OK
message sent/received okay
Definition Queue.hpp:28
@ QUEUE_FULL
queue was full when attempting to send a message
Definition Queue.hpp:36
Definition File.cpp:6