F´ Flight Software - C/C++ Documentation  NASA-v1.5.0
A framework for building embedded system applications to NASA flight quality standards.
LocklessQueue.cpp
Go to the documentation of this file.
1 #include <Os/LocklessQueue.hpp>
2 #include <Fw/Types/Assert.hpp>
3 
4 #include <fcntl.h>
5 #include <string.h>
6 
7 #define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)
8 
9 namespace Os {
10 
12  const NATIVE_INT_TYPE msgsize) {
13 #ifndef BUILD_DARWIN
14  m_attr.mq_flags = O_NONBLOCK;
15  m_attr.mq_maxmsg = maxmsg;
16  m_attr.mq_msgsize = msgsize;
17  m_attr.mq_curmsgs = 0;
18 #endif
19  // Must have at least 2 messages in the queue
20  FW_ASSERT(maxmsg >= 2, maxmsg);
21 
22  m_index = new QueueNode[maxmsg]; // Allocate an index entry for each msg
23  m_data = new U8[maxmsg * msgsize]; // Allocate data for each msg
24  FW_ASSERT(m_data != NULL);
25 
26  for (int i = 0, j = 1; i < maxmsg; i++, j++) {
27  m_index[i].data = &m_data[i * msgsize]; // Assign the data pointer of index to top that msg's buffer
28 
29  if (j < maxmsg) { // If we aren't processing the last item
30  m_index[i].next = &m_index[j]; // Chain this item to the next one
31  } else {
32  m_index[i].next = NULL; // Initialize to NULL otherwise
33  }
34  }
35 
36  // Assign each of the pointers to the first index element
37  // This one is held in a "dummy" position for now
38  // It will be cleaned up by the producer
39  m_first = &m_index[0];
40  m_last = &m_index[0];
41  m_last->next = NULL;
42 
43  // Assign the head of the free list to the second element
44  m_free_head = &m_index[1];
45  }
46 
48  delete[] m_index;
49  delete[] m_data;
50  }
51 
52 #ifndef BUILD_DARWIN
53  void LocklessQueue::GetAttr(mq_attr & attr) {
54  memcpy(&attr, &m_attr, sizeof(mq_attr));
55  }
56 #endif
57 
58  void LocklessQueue::PushFree(QueueNode * my_node) {
59  QueueNode * old_free_head;
60 
61  FW_ASSERT(my_node != NULL);
62 
63  // CAS the node into the free list
64  do {
65  old_free_head = m_free_head;
66  my_node->next = old_free_head;
67  } while (!CAS(&m_free_head, old_free_head, my_node));
68  }
69 
70  bool LocklessQueue::PopFree(QueueNode ** free_node) {
71  QueueNode * my_node;
72 
73  // CAS a buffer from the free list
74  do {
75  my_node = m_free_head;
76 
77  if (NULL == my_node) {
78  return false;
79  }
80 
81  } while (!CAS(&m_free_head, my_node, my_node->next));
82 
83  (*free_node) = my_node;
84 
85  return true;
86  }
87 
89  NATIVE_INT_TYPE size) {
90 
91  QueueNode * my_node;
92  QueueNode * old_last;
93 
94 #ifndef BUILD_DARWIN
95  // Check that the new message will fit in our buffers
96  if (size > m_attr.mq_msgsize) {
98  }
99 #endif
100 
101  if (!PopFree(&my_node)) {
102  return Queue::QUEUE_FULL;
103  }
104 
105  // Copy the data into the buffer
106  memcpy(my_node->data, buffer, size);
107  my_node->size = size;
108  my_node->next = NULL;
109 
110  // Publish the node
111  // The m_last pointer is moved before the item is published
112  // All concurrent writers will argue over their order using CAS
113  do {
114  old_last = m_last;
115  } while (!CAS(&m_last, old_last, my_node));
116 
117  old_last->next = my_node;
118 
119  return Queue::QUEUE_OK;
120  }
121 
122  // ONLY ONE RECEIVER AT A TIME!!!!
124  NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE & size) {
125 
126  QueueNode * my_node;
127  QueueNode * old_node;
128 
129 #ifndef BUILD_DARWIN
130  if (capacity < m_attr.mq_msgsize) {
132  }
133 #endif
134 
135  if (m_first == m_last) {
137  }
138 
139  old_node = m_first;
140  my_node = m_first->next;
141 
142  if (my_node == NULL) {
143  // We may not be fully linked yet, even though "last" has moved
145  }
146 
147  m_first = m_first->next;
148  PushFree(old_node);
149 
150  // Copy the data from the buffer
151  memcpy(buffer, my_node->data, my_node->size);
152  size = my_node->size;
153 
154  return Queue::QUEUE_OK;
155  }
156 
157 }
Os
Definition: File.cpp:7
Os::LocklessQueue::~LocklessQueue
~LocklessQueue()
Definition: LocklessQueue.cpp:47
Os::Queue::QUEUE_FULL
@ QUEUE_FULL
queue was full when attempting to send a message
Definition: Queue.hpp:36
U8
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.hpp:76
Os::Queue::QueueStatus
QueueStatus
Definition: Queue.hpp:27
CAS
#define CAS(a_ptr, a_oldVal, a_newVal)
Definition: LocklessQueue.cpp:7
LocklessQueue.hpp
Assert.hpp
Os::Queue::QUEUE_SIZE_MISMATCH
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition: Queue.hpp:31
Os::LocklessQueue::Receive
Os::Queue::QueueStatus Receive(U8 *buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &size)
Definition: LocklessQueue.cpp:123
Os::LocklessQueue::Send
Os::Queue::QueueStatus Send(const U8 *buffer, NATIVE_INT_TYPE size)
Definition: LocklessQueue.cpp:88
Os::LocklessQueue::GetAttr
void GetAttr(mq_attr &attr)
Definition: LocklessQueue.cpp:53
Os::Queue::QUEUE_NO_MORE_MSGS
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition: Queue.hpp:29
FW_ASSERT
#define FW_ASSERT(...)
Definition: Assert.hpp:9
Os::Queue::QUEUE_OK
@ QUEUE_OK
message sent/received okay
Definition: Queue.hpp:28
NATIVE_INT_TYPE
int NATIVE_INT_TYPE
native integer type declaration
Definition: BasicTypes.hpp:29
NULL
#define NULL
NULL.
Definition: BasicTypes.hpp:100
Os::LocklessQueue::LocklessQueue
LocklessQueue(NATIVE_INT_TYPE maxmsg, NATIVE_INT_TYPE msgsize)
Definition: LocklessQueue.cpp:11