F´ Flight Software - C/C++ Documentation NASA-v1.6.0
A framework for building embedded system applications to NASA flight quality standards.
Loading...
Searching...
No Matches
LocklessQueue.cpp
Go to the documentation of this file.
2#include <Fw/Types/Assert.hpp>
3
4#include <fcntl.h>
5#include <cstring>
6#include <new>
7
8#define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)
9
10namespace Os {
11
13 const NATIVE_INT_TYPE msgsize) {
14#ifndef BUILD_DARWIN
15 m_attr.mq_flags = O_NONBLOCK;
16 m_attr.mq_maxmsg = maxmsg;
17 m_attr.mq_msgsize = msgsize;
18 m_attr.mq_curmsgs = 0;
19#endif
20 // Must have at least 2 messages in the queue
21 FW_ASSERT(maxmsg >= 2, maxmsg);
22
23 m_index = new(std::nothrow) QueueNode[maxmsg]; // Allocate an index entry for each msg
24 m_data = new(std::nothrow) U8[maxmsg * msgsize]; // Allocate data for each msg
25 FW_ASSERT(m_index != nullptr);
26 FW_ASSERT(m_data != nullptr);
27
28 for (int i = 0, j = 1; i < maxmsg; i++, j++) {
29 m_index[i].data = &m_data[i * msgsize]; // Assign the data pointer of index to top that msg's buffer
30
31 if (j < maxmsg) { // If we aren't processing the last item
32 m_index[i].next = &m_index[j]; // Chain this item to the next one
33 } else {
34 m_index[i].next = nullptr; // Initialize to NULL otherwise
35 }
36 }
37
38 // Assign each of the pointers to the first index element
39 // This one is held in a "dummy" position for now
40 // It will be cleaned up by the producer
41 m_first = &m_index[0];
42 m_last = &m_index[0];
43 m_last->next = nullptr;
44
45 // Assign the head of the free list to the second element
46 m_free_head = &m_index[1];
47 }
48
50 delete[] m_index;
51 delete[] m_data;
52 }
53
54#ifndef BUILD_DARWIN
55 void LocklessQueue::GetAttr(mq_attr & attr) {
56 memcpy(&attr, &m_attr, sizeof(mq_attr));
57 }
58#endif
59
60 void LocklessQueue::PushFree(QueueNode * my_node) {
61 QueueNode * old_free_head;
62
63 FW_ASSERT(my_node != nullptr);
64
65 // CAS the node into the free list
66 do {
67 old_free_head = m_free_head;
68 my_node->next = old_free_head;
69 } while (!CAS(&m_free_head, old_free_head, my_node));
70 }
71
72 bool LocklessQueue::PopFree(QueueNode ** free_node) {
73 QueueNode * my_node;
74
75 // CAS a buffer from the free list
76 do {
77 my_node = m_free_head;
78
79 if (nullptr == my_node) {
80 return false;
81 }
82
83 } while (!CAS(&m_free_head, my_node, my_node->next));
84
85 (*free_node) = my_node;
86
87 return true;
88 }
89
91 NATIVE_INT_TYPE size) {
92
93 QueueNode * my_node;
94 QueueNode * old_last;
95
96#ifndef BUILD_DARWIN
97 // Check that the new message will fit in our buffers
98 if (size > m_attr.mq_msgsize) {
100 }
101#endif
102
103 if (!PopFree(&my_node)) {
104 return Queue::QUEUE_FULL;
105 }
106
107 // Copy the data into the buffer
108 memcpy(my_node->data, buffer, size);
109 my_node->size = size;
110 my_node->next = nullptr;
111
112 // Publish the node
113 // The m_last pointer is moved before the item is published
114 // All concurrent writers will argue over their order using CAS
115 do {
116 old_last = m_last;
117 } while (!CAS(&m_last, old_last, my_node));
118
119 old_last->next = my_node;
120
121 return Queue::QUEUE_OK;
122 }
123
124 // ONLY ONE RECEIVER AT A TIME!!!!
126 NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE & size) {
127
128 QueueNode * my_node;
129 QueueNode * old_node;
130
131#ifndef BUILD_DARWIN
132 if (capacity < m_attr.mq_msgsize) {
134 }
135#endif
136
137 if (m_first == m_last) {
139 }
140
141 old_node = m_first;
142 my_node = m_first->next;
143
144 if (my_node == nullptr) {
145 // We may not be fully linked yet, even though "last" has moved
147 }
148
149 m_first = m_first->next;
150 PushFree(old_node);
151
152 // Copy the data from the buffer
153 memcpy(buffer, my_node->data, my_node->size);
154 size = my_node->size;
155
156 return Queue::QUEUE_OK;
157 }
158
159}
#define FW_ASSERT(...)
Definition Assert.hpp:7
PlatformIntType NATIVE_INT_TYPE
Definition BasicTypes.h:51
uint8_t U8
8-bit unsigned integer
Definition BasicTypes.h:26
#define CAS(a_ptr, a_oldVal, a_newVal)
Os::Queue::QueueStatus Receive(U8 *buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &size)
LocklessQueue(NATIVE_INT_TYPE maxmsg, NATIVE_INT_TYPE msgsize)
void GetAttr(mq_attr &attr)
Os::Queue::QueueStatus Send(const U8 *buffer, NATIVE_INT_TYPE size)
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition Queue.hpp:31
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition Queue.hpp:29
@ QUEUE_OK
message sent/received okay
Definition Queue.hpp:28
@ QUEUE_FULL
queue was full when attempting to send a message
Definition Queue.hpp:36
Definition File.cpp:6