F´ Flight Software - C/C++ Documentation  NASA-v1.6.0
A framework for building embedded system applications to NASA flight quality standards.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
LocklessQueue.cpp
Go to the documentation of this file.
1 #include <Os/LocklessQueue.hpp>
2 #include <Fw/Types/Assert.hpp>
3 
4 #include <fcntl.h>
5 #include <cstring>
6 #include <new>
7 
8 #define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)
9 
10 namespace Os {
11 
13  const NATIVE_INT_TYPE msgsize) {
14 #ifndef BUILD_DARWIN
15  m_attr.mq_flags = O_NONBLOCK;
16  m_attr.mq_maxmsg = maxmsg;
17  m_attr.mq_msgsize = msgsize;
18  m_attr.mq_curmsgs = 0;
19 #endif
20  // Must have at least 2 messages in the queue
21  FW_ASSERT(maxmsg >= 2, maxmsg);
22 
23  m_index = new(std::nothrow) QueueNode[maxmsg]; // Allocate an index entry for each msg
24  m_data = new(std::nothrow) U8[maxmsg * msgsize]; // Allocate data for each msg
25  FW_ASSERT(m_index != nullptr);
26  FW_ASSERT(m_data != nullptr);
27 
28  for (int i = 0, j = 1; i < maxmsg; i++, j++) {
29  m_index[i].data = &m_data[i * msgsize]; // Assign the data pointer of index to top that msg's buffer
30 
31  if (j < maxmsg) { // If we aren't processing the last item
32  m_index[i].next = &m_index[j]; // Chain this item to the next one
33  } else {
34  m_index[i].next = nullptr; // Initialize to NULL otherwise
35  }
36  }
37 
38  // Assign each of the pointers to the first index element
39  // This one is held in a "dummy" position for now
40  // It will be cleaned up by the producer
41  m_first = &m_index[0];
42  m_last = &m_index[0];
43  m_last->next = nullptr;
44 
45  // Assign the head of the free list to the second element
46  m_free_head = &m_index[1];
47  }
48 
50  delete[] m_index;
51  delete[] m_data;
52  }
53 
54 #ifndef BUILD_DARWIN
55  void LocklessQueue::GetAttr(mq_attr & attr) {
56  memcpy(&attr, &m_attr, sizeof(mq_attr));
57  }
58 #endif
59 
60  void LocklessQueue::PushFree(QueueNode * my_node) {
61  QueueNode * old_free_head;
62 
63  FW_ASSERT(my_node != nullptr);
64 
65  // CAS the node into the free list
66  do {
67  old_free_head = m_free_head;
68  my_node->next = old_free_head;
69  } while (!CAS(&m_free_head, old_free_head, my_node));
70  }
71 
72  bool LocklessQueue::PopFree(QueueNode ** free_node) {
73  QueueNode * my_node;
74 
75  // CAS a buffer from the free list
76  do {
77  my_node = m_free_head;
78 
79  if (nullptr == my_node) {
80  return false;
81  }
82 
83  } while (!CAS(&m_free_head, my_node, my_node->next));
84 
85  (*free_node) = my_node;
86 
87  return true;
88  }
89 
91  NATIVE_INT_TYPE size) {
92 
93  QueueNode * my_node;
94  QueueNode * old_last;
95 
96 #ifndef BUILD_DARWIN
97  // Check that the new message will fit in our buffers
98  if (size > m_attr.mq_msgsize) {
100  }
101 #endif
102 
103  if (!PopFree(&my_node)) {
104  return Queue::QUEUE_FULL;
105  }
106 
107  // Copy the data into the buffer
108  memcpy(my_node->data, buffer, size);
109  my_node->size = size;
110  my_node->next = nullptr;
111 
112  // Publish the node
113  // The m_last pointer is moved before the item is published
114  // All concurrent writers will argue over their order using CAS
115  do {
116  old_last = m_last;
117  } while (!CAS(&m_last, old_last, my_node));
118 
119  old_last->next = my_node;
120 
121  return Queue::QUEUE_OK;
122  }
123 
124  // ONLY ONE RECEIVER AT A TIME!!!!
126  NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE & size) {
127 
128  QueueNode * my_node;
129  QueueNode * old_node;
130 
131 #ifndef BUILD_DARWIN
132  if (capacity < m_attr.mq_msgsize) {
134  }
135 #endif
136 
137  if (m_first == m_last) {
139  }
140 
141  old_node = m_first;
142  my_node = m_first->next;
143 
144  if (my_node == nullptr) {
145  // We may not be fully linked yet, even though "last" has moved
147  }
148 
149  m_first = m_first->next;
150  PushFree(old_node);
151 
152  // Copy the data from the buffer
153  memcpy(buffer, my_node->data, my_node->size);
154  size = my_node->size;
155 
156  return Queue::QUEUE_OK;
157  }
158 
159 }
Os
Definition: File.cpp:7
Os::LocklessQueue::~LocklessQueue
~LocklessQueue()
Definition: LocklessQueue.cpp:49
Os::Queue::QUEUE_FULL
@ QUEUE_FULL
queue was full when attempting to send a message
Definition: Queue.hpp:36
U8
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.hpp:73
Os::Queue::QueueStatus
QueueStatus
Definition: Queue.hpp:27
CAS
#define CAS(a_ptr, a_oldVal, a_newVal)
Definition: LocklessQueue.cpp:8
LocklessQueue.hpp
Os::Queue::QUEUE_SIZE_MISMATCH
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition: Queue.hpp:31
NATIVE_INT_TYPE
int NATIVE_INT_TYPE
native integer type declaration
Definition: BasicTypes.hpp:27
Os::LocklessQueue::Receive
Os::Queue::QueueStatus Receive(U8 *buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &size)
Definition: LocklessQueue.cpp:125
Os::LocklessQueue::Send
Os::Queue::QueueStatus Send(const U8 *buffer, NATIVE_INT_TYPE size)
Definition: LocklessQueue.cpp:90
Os::LocklessQueue::GetAttr
void GetAttr(mq_attr &attr)
Definition: LocklessQueue.cpp:55
Os::Queue::QUEUE_NO_MORE_MSGS
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition: Queue.hpp:29
Os::Queue::QUEUE_OK
@ QUEUE_OK
message sent/received okay
Definition: Queue.hpp:28
FW_ASSERT
#define FW_ASSERT(...)
Definition: Assert.hpp:9
Os::LocklessQueue::LocklessQueue
LocklessQueue(NATIVE_INT_TYPE maxmsg, NATIVE_INT_TYPE msgsize)
Definition: LocklessQueue.cpp:12