F´ Flight Software - C/C++ Documentation  devel
A framework for building embedded system applications to NASA flight quality standards.
LocklessQueue.cpp
Go to the documentation of this file.
1 #include <Os/LocklessQueue.hpp>
2 #include <Fw/Types/Assert.hpp>
3 
4 #include <fcntl.h>
5 #include <cstring>
6 #include <new>
7 
8 #define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)
9 
10 namespace Os {
11 
13  const NATIVE_INT_TYPE msgsize) {
14 #ifndef BUILD_DARWIN
15  m_attr.mq_flags = O_NONBLOCK;
16  m_attr.mq_maxmsg = maxmsg;
17  m_attr.mq_msgsize = msgsize;
18  m_attr.mq_curmsgs = 0;
19 #endif
20  // Must have at least 2 messages in the queue
21  FW_ASSERT(maxmsg >= 2, maxmsg);
22 
23  m_index = new(std::nothrow) QueueNode[maxmsg]; // Allocate an index entry for each msg
24  m_data = new(std::nothrow) U8[maxmsg * msgsize]; // Allocate data for each msg
25  FW_ASSERT(m_index != nullptr);
26  FW_ASSERT(m_data != nullptr);
27 
28  for (int i = 0, j = 1; i < maxmsg; i++, j++) {
29  m_index[i].data = &m_data[i * msgsize]; // Assign the data pointer of index to top that msg's buffer
30 
31  if (j < maxmsg) { // If we aren't processing the last item
32  m_index[i].next = &m_index[j]; // Chain this item to the next one
33  } else {
34  m_index[i].next = nullptr; // Initialize to NULL otherwise
35  }
36  }
37 
38  // Assign each of the pointers to the first index element
39  // This one is held in a "dummy" position for now
40  // It will be cleaned up by the producer
41  m_first = &m_index[0];
42  m_last = &m_index[0];
43  m_last->next = nullptr;
44 
45  // Assign the head of the free list to the second element
46  m_free_head = &m_index[1];
47  }
48 
50  delete[] m_index;
51  delete[] m_data;
52  }
53 
54 #ifndef BUILD_DARWIN
55  void LocklessQueue::GetAttr(mq_attr & attr) {
56  memcpy(&attr, &m_attr, sizeof(mq_attr));
57  }
58 #endif
59 
60  void LocklessQueue::PushFree(QueueNode * my_node) {
61  QueueNode * old_free_head;
62 
63  FW_ASSERT(my_node != nullptr);
64 
65  // CAS the node into the free list
66  do {
67  old_free_head = m_free_head;
68  my_node->next = old_free_head;
69  } while (!CAS(&m_free_head, old_free_head, my_node));
70  }
71 
72  bool LocklessQueue::PopFree(QueueNode ** free_node) {
73  QueueNode * my_node;
74 
75  // CAS a buffer from the free list
76  do {
77  my_node = m_free_head;
78 
79  if (nullptr == my_node) {
80  return false;
81  }
82 
83  } while (!CAS(&m_free_head, my_node, my_node->next));
84 
85  (*free_node) = my_node;
86 
87  return true;
88  }
89 
91  NATIVE_INT_TYPE size) {
92 
93  QueueNode * my_node;
94  QueueNode * old_last;
95 
96 #ifndef BUILD_DARWIN
97  // Check that the new message will fit in our buffers
98  if (size > m_attr.mq_msgsize) {
100  }
101 #endif
102 
103  if (!PopFree(&my_node)) {
104  return Queue::QUEUE_FULL;
105  }
106 
107  // Copy the data into the buffer
108  memcpy(my_node->data, buffer, static_cast<size_t>(size));
109  my_node->size = size;
110  my_node->next = nullptr;
111 
112  // Publish the node
113  // The m_last pointer is moved before the item is published
114  // All concurrent writers will argue over their order using CAS
115  do {
116  old_last = m_last;
117  } while (!CAS(&m_last, old_last, my_node));
118 
119  old_last->next = my_node;
120 
121  return Queue::QUEUE_OK;
122  }
123 
124  // ONLY ONE RECEIVER AT A TIME!!!!
126  NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE & size) {
127 
128  QueueNode * my_node;
129  QueueNode * old_node;
130 
131 #ifndef BUILD_DARWIN
132  if (capacity < m_attr.mq_msgsize) {
134  }
135 #endif
136 
137  if (m_first == m_last) {
139  }
140 
141  old_node = m_first;
142  my_node = m_first->next;
143 
144  if (my_node == nullptr) {
145  // We may not be fully linked yet, even though "last" has moved
147  }
148 
149  m_first = m_first->next;
150  PushFree(old_node);
151 
152  // Copy the data from the buffer
153  memcpy(buffer, my_node->data, static_cast<size_t>(my_node->size));
154  size = my_node->size;
155 
156  return Queue::QUEUE_OK;
157  }
158 
159 }
#define FW_ASSERT(...)
Definition: Assert.hpp:14
PlatformIntType NATIVE_INT_TYPE
Definition: BasicTypes.h:51
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.h:26
#define CAS(a_ptr, a_oldVal, a_newVal)
Os::Queue::QueueStatus Receive(U8 *buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &size)
LocklessQueue(NATIVE_INT_TYPE maxmsg, NATIVE_INT_TYPE msgsize)
void GetAttr(mq_attr &attr)
Os::Queue::QueueStatus Send(const U8 *buffer, NATIVE_INT_TYPE size)
QueueStatus
Definition: Queue.hpp:27
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition: Queue.hpp:31
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition: Queue.hpp:29
@ QUEUE_OK
message sent/received okay
Definition: Queue.hpp:28
@ QUEUE_FULL
queue was full when attempting to send a message
Definition: Queue.hpp:36