F´ Flight Software - C/C++ Documentation  devel
A framework for building embedded system applications to NASA flight quality standards.
Queue.cpp
Go to the documentation of this file.
1 // ======================================================================
2 // \title Queue.cpp
3 // \author dinkel
4 // \brief Queue implementation using the pthread library. This is NOT
5 // an IPC queue. It is meant to be used between threads within
6 // the same address space.
7 //
8 // \copyright
9 // Copyright 2009-2015, by the California Institute of Technology.
10 // ALL RIGHTS RESERVED. United States Government Sponsorship
11 // acknowledged.
12 //
13 // ======================================================================
14 
16 #include <Fw/Types/Assert.hpp>
17 #include <Os/Queue.hpp>
18 
19 #include <cerrno>
20 #include <pthread.h>
21 #include <cstdio>
22 #include <new>
23 
24 namespace Os {
25 
26  // A helper class which stores variables for the queue handle.
27  // The queue itself, a pthread condition variable, and pthread
28  // mutex are contained within this container class.
29  class QueueHandle {
30  public:
32  int ret;
33  ret = pthread_cond_init(&this->queueNotEmpty, nullptr);
34  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
35  ret = pthread_cond_init(&this->queueNotFull, nullptr);
36  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
37  ret = pthread_mutex_init(&this->queueLock, nullptr);
38  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
39  }
41  (void) pthread_cond_destroy(&this->queueNotEmpty);
42  (void) pthread_cond_destroy(&this->queueNotFull);
43  (void) pthread_mutex_destroy(&this->queueLock);
44  }
45  bool create(NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize) {
46  return queue.create(static_cast<NATIVE_UINT_TYPE>(depth), static_cast<NATIVE_UINT_TYPE>(msgSize));
47  }
49  pthread_cond_t queueNotEmpty;
50  pthread_cond_t queueNotFull;
51  pthread_mutex_t queueLock;
52  };
53 
54  Queue::Queue() :
55  m_handle(reinterpret_cast<POINTER_CAST>(nullptr)) {
56  }
57 
59  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
60 
61  // Queue has already been created... remove it and try again:
62  if (nullptr != queueHandle) {
63  delete queueHandle;
64  queueHandle = nullptr;
65  }
66 
67  // Create queue handle:
68  queueHandle = new(std::nothrow) QueueHandle;
69  if (nullptr == queueHandle) {
70  return QUEUE_UNINITIALIZED;
71  }
72  if( !queueHandle->create(depth, msgSize) ) {
73  return QUEUE_UNINITIALIZED;
74  }
75  this->m_handle = reinterpret_cast<POINTER_CAST>(queueHandle);
76 
77 #if FW_QUEUE_REGISTRATION
78  if (this->s_queueRegistry) {
79  this->s_queueRegistry->regQueue(this);
80  }
81 #endif
82 
83  return QUEUE_OK;
84  }
85 
86  Queue::~Queue() {
87  // Clean up the queue handle:
88  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
89  if (nullptr != queueHandle) {
90  delete queueHandle;
91  }
92  this->m_handle = reinterpret_cast<POINTER_CAST>(nullptr);
93  }
94 
95  Queue::QueueStatus sendNonBlock(QueueHandle* queueHandle, const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority) {
96 
97  BufferQueue* queue = &queueHandle->queue;
98  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
99  pthread_mutex_t* queueLock = &queueHandle->queueLock;
100  NATIVE_INT_TYPE ret;
102 
104  // Locked Section
106  ret = pthread_mutex_lock(queueLock);
107  FW_ASSERT(ret == 0, errno);
109 
110  // Push item onto queue:
111  bool pushSucceeded = queue->push(buffer, static_cast<NATIVE_UINT_TYPE>(size), priority);
112 
113  if(pushSucceeded) {
114  // Push worked - wake up a thread that might be waiting on
115  // the other end of the queue:
116  ret = pthread_cond_signal(queueNotEmpty);
117  FW_ASSERT(ret == 0, errno); // If this fails, something horrible happened.
118  }
119  else {
120  // Push failed - the queue is full:
121  status = Queue::QUEUE_FULL;
122  }
123 
125  ret = pthread_mutex_unlock(queueLock);
126  FW_ASSERT(ret == 0, errno);
129 
130  return status;
131  }
132 
133  Queue::QueueStatus sendBlock(QueueHandle* queueHandle, const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority) {
134 
135  BufferQueue* queue = &queueHandle->queue;
136  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
137  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
138  pthread_mutex_t* queueLock = &queueHandle->queueLock;
139  NATIVE_INT_TYPE ret;
140 
142  // Locked Section
144  ret = pthread_mutex_lock(queueLock);
145  FW_ASSERT(ret == 0, errno);
147 
148  // If the queue is full, wait until a message is taken off the queue:
149  while( queue->isFull() ) {
150  ret = pthread_cond_wait(queueNotFull, queueLock);
151  FW_ASSERT(ret == 0, errno);
152  }
153 
154  // Push item onto queue:
155  bool pushSucceeded = queue->push(buffer, static_cast<NATIVE_UINT_TYPE>(size), priority);
156 
157  // The only reason push would not succeed is if the queue
158  // was full. Since we waited for the queue to NOT be full
159  // before sending on the queue, the push must have succeeded
160  // unless there was a programming error or a bit flip.
161  FW_ASSERT(pushSucceeded, pushSucceeded);
162 
163  // Push worked - wake up a thread that might be waiting on
164  // the other end of the queue:
165  ret = pthread_cond_signal(queueNotEmpty);
166  FW_ASSERT(ret == 0, errno); // If this fails, something horrible happened.
167 
169  ret = pthread_mutex_unlock(queueLock);
170  FW_ASSERT(ret == 0, errno);
173 
174  return Queue::QUEUE_OK;
175  }
176 
177 
178  Queue::QueueStatus Queue::send(const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority, QueueBlocking block) {
179  (void) block; // Always non-blocking for now
180  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
181  BufferQueue* queue = &queueHandle->queue;
182 
183  if (nullptr == queueHandle) {
184  return QUEUE_UNINITIALIZED;
185  }
186 
187  if (nullptr == buffer) {
188  return QUEUE_EMPTY_BUFFER;
189  }
190 
191  if (size < 0 || static_cast<NATIVE_UINT_TYPE>(size) > queue->getMsgSize()) {
192  return QUEUE_SIZE_MISMATCH;
193  }
194 
195  if( QUEUE_NONBLOCKING == block ) {
196  return sendNonBlock(queueHandle, buffer, size, priority);
197  }
198 
199  return sendBlock(queueHandle, buffer, size, priority);
200  }
201 
202  Queue::QueueStatus receiveNonBlock(QueueHandle* queueHandle, U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority) {
203 
204  BufferQueue* queue = &queueHandle->queue;
205  pthread_mutex_t* queueLock = &queueHandle->queueLock;
206  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
207  NATIVE_INT_TYPE ret;
208 
209  NATIVE_UINT_TYPE size = static_cast<NATIVE_UINT_TYPE>(capacity);
210  NATIVE_INT_TYPE pri = 0;
212 
214  // Locked Section
216  ret = pthread_mutex_lock(queueLock);
217  FW_ASSERT(ret == 0, errno);
219 
220  // Get an item off of the queue:
221  bool popSucceeded = queue->pop(buffer, size, pri);
222 
223  if(popSucceeded) {
224  // Pop worked - set the return size and priority:
225  actualSize = static_cast<NATIVE_INT_TYPE>(size);
226  priority = pri;
227 
228  // Pop worked - wake up a thread that might be waiting on
229  // the send end of the queue:
230  ret = pthread_cond_signal(queueNotFull);
231  FW_ASSERT(ret == 0, errno); // If this fails, something horrible happened.
232  }
233  else {
234  actualSize = 0;
235  if( size > static_cast<NATIVE_UINT_TYPE>(capacity) ) {
236  // The buffer capacity was too small!
238  }
239  else if( size == 0 ) {
240  // The queue is empty:
241  status = Queue::QUEUE_NO_MORE_MSGS;
242  }
243  else {
244  // If this happens, a programming error or bit flip occurred:
245  FW_ASSERT(0);
246  }
247  }
248 
250  ret = pthread_mutex_unlock(queueLock);
251  FW_ASSERT(ret == 0, errno);
254 
255  return status;
256  }
257 
258  Queue::QueueStatus receiveBlock(QueueHandle* queueHandle, U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority) {
259 
260  BufferQueue* queue = &queueHandle->queue;
261  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
262  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
263  pthread_mutex_t* queueLock = &queueHandle->queueLock;
264  NATIVE_INT_TYPE ret;
265 
266  NATIVE_UINT_TYPE size = static_cast<NATIVE_UINT_TYPE>(capacity);
267  NATIVE_INT_TYPE pri = 0;
269 
271  // Locked Section
273  ret = pthread_mutex_lock(queueLock);
274  FW_ASSERT(ret == 0, errno);
276 
277  // If the queue is empty, wait until a message is put on the queue:
278  while( queue->isEmpty() ) {
279  ret = pthread_cond_wait(queueNotEmpty, queueLock);
280  FW_ASSERT(ret == 0, errno);
281  }
282 
283  // Get an item off of the queue:
284  bool popSucceeded = queue->pop(buffer, size, pri);
285 
286  if(popSucceeded) {
287  // Pop worked - set the return size and priority:
288  actualSize = static_cast<NATIVE_INT_TYPE>(size);
289  priority = pri;
290 
291  // Pop worked - wake up a thread that might be waiting on
292  // the send end of the queue:
293  ret = pthread_cond_signal(queueNotFull);
294  FW_ASSERT(ret == 0, errno); // If this fails, something horrible happened.
295  }
296  else {
297  actualSize = 0;
298  if( size > static_cast<NATIVE_UINT_TYPE>(capacity) ) {
299  // The buffer capacity was too small!
301  }
302  else {
303  // If this happens, a programming error or bit flip occurred:
304  // The only reason a pop should fail is if the user's buffer
305  // was too small.
306  FW_ASSERT(0);
307  }
308  }
309 
311  ret = pthread_mutex_unlock(queueLock);
312  FW_ASSERT(ret == 0, errno);
315 
316  return status;
317  }
318 
319  Queue::QueueStatus Queue::receive(U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority, QueueBlocking block) {
320 
321  if( reinterpret_cast<POINTER_CAST>(nullptr) == this->m_handle ) {
322  return QUEUE_UNINITIALIZED;
323  }
324 
325  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
326 
327  if (nullptr == queueHandle) {
328  return QUEUE_UNINITIALIZED;
329  }
330 
331  // Do not need to check the upper bound of capacity, We don't care
332  // how big the user's buffer is.. as long as it's big enough.
333  if (capacity < 0) {
334  return QUEUE_SIZE_MISMATCH;
335  }
336 
337  if( QUEUE_NONBLOCKING == block ) {
338  return receiveNonBlock(queueHandle, buffer, capacity, actualSize, priority);
339  }
340 
341  return receiveBlock(queueHandle, buffer, capacity, actualSize, priority);
342  }
343 
345  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
346  if (nullptr == queueHandle) {
347  return 0;
348  }
349  BufferQueue* queue = &queueHandle->queue;
350  return static_cast<NATIVE_INT_TYPE>(queue->getCount());
351  }
352 
354  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
355  if (nullptr == queueHandle) {
356  return 0;
357  }
358  BufferQueue* queue = &queueHandle->queue;
359  return static_cast<NATIVE_INT_TYPE>(queue->getMaxCount());
360  }
361 
363  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
364  if (nullptr == queueHandle) {
365  return 0;
366  }
367  BufferQueue* queue = &queueHandle->queue;
368  return static_cast<NATIVE_INT_TYPE>(queue->getDepth());
369  }
370 
372  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
373  if (nullptr == queueHandle) {
374  return 0;
375  }
376  BufferQueue* queue = &queueHandle->queue;
377  return static_cast<NATIVE_INT_TYPE>(queue->getMsgSize());
378  }
379 
380 }
381 
#define FW_ASSERT(...)
Definition: Assert.hpp:14
PlatformPointerCastType POINTER_CAST
Definition: BasicTypes.h:53
PlatformIntType NATIVE_INT_TYPE
Definition: BasicTypes.h:51
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.h:26
PlatformUIntType NATIVE_UINT_TYPE
Definition: BasicTypes.h:52
A generic buffer queue data structure.
Definition: BufferQueue.hpp:26
bool create(NATIVE_UINT_TYPE depth, NATIVE_UINT_TYPE msgSize)
BufferQueue creation.
bool pop(U8 *buffer, NATIVE_UINT_TYPE &size, NATIVE_INT_TYPE &priority)
pop an item off the queue
bool isEmpty()
check if the queue is empty
bool push(const U8 *buffer, NATIVE_UINT_TYPE size, NATIVE_INT_TYPE priority)
push an item onto the queue
bool isFull()
check if the queue is full
pthread_cond_t queueNotEmpty
bool create(NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
Definition: Queue.cpp:45
pthread_mutex_t queueLock
BufferQueue queue
pthread_cond_t queueNotFull
NATIVE_INT_TYPE getMsgSize() const
get the message size (maximum message size queue can hold)
Definition: Queue.cpp:244
QueueStatus
Definition: Queue.hpp:27
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition: Queue.hpp:31
@ QUEUE_UNINITIALIZED
Queue wasn't initialized successfully.
Definition: Queue.hpp:30
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition: Queue.hpp:29
@ QUEUE_EMPTY_BUFFER
supplied buffer is empty
Definition: Queue.hpp:35
@ QUEUE_OK
message sent/received okay
Definition: Queue.hpp:28
@ QUEUE_FULL
queue was full when attempting to send a message
Definition: Queue.hpp:36
QueueStatus send(const Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE priority, QueueBlocking block)
send a message
Definition: QueueCommon.cpp:13
Queue()
Definition: Queue.cpp:38
QueueStatus createInternal(const Fw::StringBase &name, NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
create a message queue
Definition: Queue.cpp:42
QueueStatus receive(Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE &priority, QueueBlocking block)
receive a message
Definition: QueueCommon.cpp:22
NATIVE_INT_TYPE getQueueSize() const
get the queue depth (maximum number of messages queue can hold)
Definition: Queue.cpp:233
NATIVE_INT_TYPE getMaxMsgs() const
get the maximum number of messages (high watermark)
Definition: Queue.cpp:222
@ QUEUE_NONBLOCKING
Queue receive always returns even if there is no message.
Definition: Queue.hpp:42
NATIVE_INT_TYPE getNumMsgs() const
get the number of messages in the queue
Definition: Queue.cpp:211
virtual ~Queue()
Definition: Queue.cpp:65
POINTER_CAST m_handle
handle for implementation specific queue
Definition: Queue.hpp:75
Queue::QueueStatus receiveBlock(QueueHandle *queueHandle, U8 *buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority)
Definition: Queue.cpp:258
Queue::QueueStatus sendBlock(QueueHandle *queueHandle, const U8 *buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority)
Definition: Queue.cpp:133
Queue::QueueStatus receiveNonBlock(QueueHandle *queueHandle, U8 *buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority)
Definition: Queue.cpp:202
Queue::QueueStatus sendNonBlock(QueueHandle *queueHandle, const U8 *buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority)
Definition: Queue.cpp:95