F´ Flight Software - C/C++ Documentation  NASA-v2.0.1
A framework for building embedded system applications to NASA flight quality standards.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
LocklessQueue.cpp
Go to the documentation of this file.
1 #include <Os/LocklessQueue.hpp>
2 #include <Fw/Types/Assert.hpp>
3 
4 #include <fcntl.h>
5 #include <string.h>
6 
7 #define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)
8 
9 namespace Os {
10 
12  const NATIVE_INT_TYPE msgsize) {
13 #ifndef BUILD_DARWIN
14  m_attr.mq_flags = O_NONBLOCK;
15  m_attr.mq_maxmsg = maxmsg;
16  m_attr.mq_msgsize = msgsize;
17  m_attr.mq_curmsgs = 0;
18 #endif
19  // Must have at least 2 messages in the queue
20  FW_ASSERT(maxmsg >= 2, maxmsg);
21 
22  m_index = new QueueNode[maxmsg]; // Allocate an index entry for each msg
23  m_data = new U8[maxmsg * msgsize]; // Allocate data for each msg
24  FW_ASSERT(m_data != NULL);
25 
26  for (int i = 0, j = 1; i < maxmsg; i++, j++) {
27  m_index[i].data = &m_data[i * msgsize]; // Assign the data pointer of index to top that msg's buffer
28 
29  if (j < maxmsg) { // If we aren't processing the last item
30  m_index[i].next = &m_index[j]; // Chain this item to the next one
31  } else {
32  m_index[i].next = NULL; // Initialize to NULL otherwise
33  }
34  }
35 
36  // Assign each of the pointers to the first index element
37  // This one is held in a "dummy" position for now
38  // It will be cleaned up by the producer
39  m_first = &m_index[0];
40  m_last = &m_index[0];
41  m_last->next = NULL;
42 
43  // Assign the head of the free list to the second element
44  m_free_head = &m_index[1];
45  }
46 
48  delete[] m_index;
49  delete[] m_data;
50  }
51 
52 #ifndef BUILD_DARWIN
53  void LocklessQueue::GetAttr(mq_attr & attr) {
54  memcpy(&attr, &m_attr, sizeof(mq_attr));
55  }
56 #endif
57 
58  void LocklessQueue::PushFree(QueueNode * my_node) {
59  QueueNode * old_free_head;
60 
61  FW_ASSERT(my_node != NULL);
62 
63  // CAS the node into the free list
64  do {
65  old_free_head = m_free_head;
66  my_node->next = old_free_head;
67  } while (!CAS(&m_free_head, old_free_head, my_node));
68  }
69 
70  bool LocklessQueue::PopFree(QueueNode ** free_node) {
71  QueueNode * my_node;
72 
73  // CAS a buffer from the free list
74  do {
75  my_node = m_free_head;
76 
77  if (NULL == my_node) {
78  return false;
79  }
80 
81  } while (!CAS(&m_free_head, my_node, my_node->next));
82 
83  (*free_node) = my_node;
84 
85  return true;
86  }
87 
89  NATIVE_INT_TYPE size) {
90 
91  QueueNode * my_node;
92  QueueNode * old_last;
93 
94 #ifndef BUILD_DARWIN
95  // Check that the new message will fit in our buffers
96  if (size > m_attr.mq_msgsize) {
98  }
99 #endif
100 
101  if (!PopFree(&my_node)) {
102  return Queue::QUEUE_FULL;
103  }
104 
105  // Copy the data into the buffer
106  memcpy(my_node->data, buffer, size);
107  my_node->size = size;
108  my_node->next = NULL;
109 
110  // Publish the node
111  // The m_last pointer is moved before the item is published
112  // All concurrent writers will argue over their order using CAS
113  do {
114  old_last = m_last;
115  } while (!CAS(&m_last, old_last, my_node));
116 
117  old_last->next = my_node;
118 
119  return Queue::QUEUE_OK;
120  }
121 
122  // ONLY ONE RECEIVER AT A TIME!!!!
124  NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE & size) {
125 
126  QueueNode * my_node;
127  QueueNode * old_node;
128 
129 #ifndef BUILD_DARWIN
130  if (capacity < m_attr.mq_msgsize) {
132  }
133 #endif
134 
135  if (m_first == m_last) {
137  }
138 
139  old_node = m_first;
140  my_node = m_first->next;
141 
142  if (my_node == NULL) {
143  // We may not be fully linked yet, even though "last" has moved
145  }
146 
147  m_first = m_first->next;
148  PushFree(old_node);
149 
150  // Copy the data from the buffer
151  memcpy(buffer, my_node->data, my_node->size);
152  size = my_node->size;
153 
154  return Queue::QUEUE_OK;
155  }
156 
157 }
Os
Definition: File.cpp:7
Os::LocklessQueue::~LocklessQueue
~LocklessQueue()
Definition: LocklessQueue.cpp:47
Os::Queue::QUEUE_FULL
@ QUEUE_FULL
queue was full when attempting to send a message
Definition: Queue.hpp:36
U8
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.hpp:76
Os::Queue::QueueStatus
QueueStatus
Definition: Queue.hpp:27
CAS
#define CAS(a_ptr, a_oldVal, a_newVal)
Definition: LocklessQueue.cpp:7
LocklessQueue.hpp
Assert.hpp
Os::Queue::QUEUE_SIZE_MISMATCH
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition: Queue.hpp:31
Os::LocklessQueue::Receive
Os::Queue::QueueStatus Receive(U8 *buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &size)
Definition: LocklessQueue.cpp:123
Os::LocklessQueue::Send
Os::Queue::QueueStatus Send(const U8 *buffer, NATIVE_INT_TYPE size)
Definition: LocklessQueue.cpp:88
Os::LocklessQueue::GetAttr
void GetAttr(mq_attr &attr)
Definition: LocklessQueue.cpp:53
Os::Queue::QUEUE_NO_MORE_MSGS
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition: Queue.hpp:29
FW_ASSERT
#define FW_ASSERT(...)
Definition: Assert.hpp:9
Os::Queue::QUEUE_OK
@ QUEUE_OK
message sent/received okay
Definition: Queue.hpp:28
NATIVE_INT_TYPE
int NATIVE_INT_TYPE
native integer type declaration
Definition: BasicTypes.hpp:29
NULL
#define NULL
NULL.
Definition: BasicTypes.hpp:100
Os::LocklessQueue::LocklessQueue
LocklessQueue(NATIVE_INT_TYPE maxmsg, NATIVE_INT_TYPE msgsize)
Definition: LocklessQueue.cpp:11