F´ Flight Software - C/C++ Documentation  NASA-v1.6.0
A framework for building embedded system applications to NASA flight quality standards.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Queue.cpp
Go to the documentation of this file.
1 // ======================================================================
2 // \title Queue.cpp
3 // \author dinkel
4 // \brief Queue implementation using the pthread library. This is NOT
5 // an IPC queue. It is meant to be used between threads within
6 // the same address space.
7 //
8 // \copyright
9 // Copyright 2009-2015, by the California Institute of Technology.
10 // ALL RIGHTS RESERVED. United States Government Sponsorship
11 // acknowledged.
12 //
13 // ======================================================================
14 
16 #include <Fw/Types/Assert.hpp>
17 #include <Os/Queue.hpp>
18 
19 #include <cerrno>
20 #include <pthread.h>
21 #include <cstdio>
22 #include <new>
23 
24 namespace Os {
25 
26  // A helper class which stores variables for the queue handle.
27  // The queue itself, a pthread condition variable, and pthread
28  // mutex are contained within this container class.
29  class QueueHandle {
30  public:
32  int ret;
33  ret = pthread_cond_init(&this->queueNotEmpty, nullptr);
34  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
35  ret = pthread_cond_init(&this->queueNotFull, nullptr);
36  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
37  ret = pthread_mutex_init(&this->queueLock, nullptr);
38  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
39  }
41  (void) pthread_cond_destroy(&this->queueNotEmpty);
42  (void) pthread_cond_destroy(&this->queueNotFull);
43  (void) pthread_mutex_destroy(&this->queueLock);
44  }
45  bool create(NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize) {
46  return queue.create(depth, msgSize);
47  }
49  pthread_cond_t queueNotEmpty;
50  pthread_cond_t queueNotFull;
51  pthread_mutex_t queueLock;
52  };
53 
54  Queue::Queue() :
55  m_handle(reinterpret_cast<POINTER_CAST>(nullptr)) {
56  }
57 
59  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
60 
61  // Queue has already been created... remove it and try again:
62  if (nullptr != queueHandle) {
63  delete queueHandle;
64  queueHandle = nullptr;
65  }
66 
67  // Create queue handle:
68  queueHandle = new(std::nothrow) QueueHandle;
69  if (nullptr == queueHandle) {
70  return QUEUE_UNINITIALIZED;
71  }
72  if( !queueHandle->create(depth, msgSize) ) {
73  return QUEUE_UNINITIALIZED;
74  }
75  this->m_handle = reinterpret_cast<POINTER_CAST>(queueHandle);
76 
77 #if FW_QUEUE_REGISTRATION
78  if (this->s_queueRegistry) {
79  this->s_queueRegistry->regQueue(this);
80  }
81 #endif
82 
83  return QUEUE_OK;
84  }
85 
86  Queue::~Queue() {
87  // Clean up the queue handle:
88  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
89  if (nullptr != queueHandle) {
90  delete queueHandle;
91  }
92  this->m_handle = reinterpret_cast<POINTER_CAST>(nullptr);
93  }
94 
95  Queue::QueueStatus sendNonBlock(QueueHandle* queueHandle, const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority) {
96 
97  BufferQueue* queue = &queueHandle->queue;
98  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
99  pthread_mutex_t* queueLock = &queueHandle->queueLock;
100  NATIVE_INT_TYPE ret;
102 
104  // Locked Section
106  ret = pthread_mutex_lock(queueLock);
107  FW_ASSERT(ret == 0, errno);
109 
110  // Push item onto queue:
111  bool pushSucceeded = queue->push(buffer, size, priority);
112 
113  if(pushSucceeded) {
114  // Push worked - wake up a thread that might be waiting on
115  // the other end of the queue:
116  NATIVE_INT_TYPE ret = pthread_cond_signal(queueNotEmpty);
117  FW_ASSERT(ret == 0, errno); // If this fails, something horrible happened.
118  }
119  else {
120  // Push failed - the queue is full:
121  status = Queue::QUEUE_FULL;
122  }
123 
125  ret = pthread_mutex_unlock(queueLock);
126  FW_ASSERT(ret == 0, errno);
129 
130  return status;
131  }
132 
133  Queue::QueueStatus sendBlock(QueueHandle* queueHandle, const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority) {
134 
135  BufferQueue* queue = &queueHandle->queue;
136  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
137  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
138  pthread_mutex_t* queueLock = &queueHandle->queueLock;
139  NATIVE_INT_TYPE ret;
140 
142  // Locked Section
144  ret = pthread_mutex_lock(queueLock);
145  FW_ASSERT(ret == 0, errno);
147 
148  // If the queue is full, wait until a message is taken off the queue:
149  while( queue->isFull() ) {
150  NATIVE_INT_TYPE ret = pthread_cond_wait(queueNotFull, queueLock);
151  FW_ASSERT(ret == 0, errno);
152  }
153 
154  // Push item onto queue:
155  bool pushSucceeded = queue->push(buffer, size, priority);
156 
157  // The only reason push would not succeed is if the queue
158  // was full. Since we waited for the queue to NOT be full
159  // before sending on the queue, the push must have succeeded
160  // unless there was a programming error or a bit flip.
161  FW_ASSERT(pushSucceeded, pushSucceeded);
162 
163  // Push worked - wake up a thread that might be waiting on
164  // the other end of the queue:
165  ret = pthread_cond_signal(queueNotEmpty);
166  FW_ASSERT(ret == 0, errno); // If this fails, something horrible happened.
167 
169  ret = pthread_mutex_unlock(queueLock);
170  FW_ASSERT(ret == 0, errno);
173 
174  return Queue::QUEUE_OK;
175  }
176 
177 
178  Queue::QueueStatus Queue::send(const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority, QueueBlocking block) {
179  (void) block; // Always non-blocking for now
180  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
181  BufferQueue* queue = &queueHandle->queue;
182 
183  if (nullptr == queueHandle) {
184  return QUEUE_UNINITIALIZED;
185  }
186 
187  if (nullptr == buffer) {
188  return QUEUE_EMPTY_BUFFER;
189  }
190 
191  if (size < 0 || static_cast<NATIVE_UINT_TYPE>(size) > queue->getMsgSize()) {
192  return QUEUE_SIZE_MISMATCH;
193  }
194 
195  if( QUEUE_NONBLOCKING == block ) {
196  return sendNonBlock(queueHandle, buffer, size, priority);
197  }
198 
199  return sendBlock(queueHandle, buffer, size, priority);
200  }
201 
202  Queue::QueueStatus receiveNonBlock(QueueHandle* queueHandle, U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority) {
203 
204  BufferQueue* queue = &queueHandle->queue;
205  pthread_mutex_t* queueLock = &queueHandle->queueLock;
206  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
207  NATIVE_INT_TYPE ret;
208 
209  NATIVE_UINT_TYPE size = static_cast<NATIVE_UINT_TYPE>(capacity);
210  NATIVE_INT_TYPE pri = 0;
212 
214  // Locked Section
216  ret = pthread_mutex_lock(queueLock);
217  FW_ASSERT(ret == 0, errno);
219 
220  // Get an item off of the queue:
221  bool popSucceeded = queue->pop(buffer, size, pri);
222 
223  if(popSucceeded) {
224  // Pop worked - set the return size and priority:
225  actualSize = static_cast<NATIVE_INT_TYPE>(size);
226  priority = pri;
227 
228  // Pop worked - wake up a thread that might be waiting on
229  // the send end of the queue:
230  NATIVE_INT_TYPE ret = pthread_cond_signal(queueNotFull);
231  FW_ASSERT(ret == 0, errno); // If this fails, something horrible happened.
232  }
233  else {
234  actualSize = 0;
235  if( size > static_cast<NATIVE_UINT_TYPE>(capacity) ) {
236  // The buffer capacity was too small!
238  }
239  else if( size == 0 ) {
240  // The queue is empty:
241  status = Queue::QUEUE_NO_MORE_MSGS;
242  }
243  else {
244  // If this happens, a programming error or bit flip occurred:
245  FW_ASSERT(0);
246  }
247  }
248 
250  ret = pthread_mutex_unlock(queueLock);
251  FW_ASSERT(ret == 0, errno);
254 
255  return status;
256  }
257 
258  Queue::QueueStatus receiveBlock(QueueHandle* queueHandle, U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority) {
259 
260  BufferQueue* queue = &queueHandle->queue;
261  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
262  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
263  pthread_mutex_t* queueLock = &queueHandle->queueLock;
264  NATIVE_INT_TYPE ret;
265 
266  NATIVE_UINT_TYPE size = static_cast<NATIVE_UINT_TYPE>(capacity);
267  NATIVE_INT_TYPE pri = 0;
269 
271  // Locked Section
273  ret = pthread_mutex_lock(queueLock);
274  FW_ASSERT(ret == 0, errno);
276 
277  // If the queue is empty, wait until a message is put on the queue:
278  while( queue->isEmpty() ) {
279  NATIVE_INT_TYPE ret = pthread_cond_wait(queueNotEmpty, queueLock);
280  FW_ASSERT(ret == 0, errno);
281  }
282 
283  // Get an item off of the queue:
284  bool popSucceeded = queue->pop(buffer, size, pri);
285 
286  if(popSucceeded) {
287  // Pop worked - set the return size and priority:
288  actualSize = static_cast<NATIVE_INT_TYPE>(size);
289  priority = pri;
290 
291  // Pop worked - wake up a thread that might be waiting on
292  // the send end of the queue:
293  NATIVE_INT_TYPE ret = pthread_cond_signal(queueNotFull);
294  FW_ASSERT(ret == 0, errno); // If this fails, something horrible happened.
295  }
296  else {
297  actualSize = 0;
298  if( size > static_cast<NATIVE_UINT_TYPE>(capacity) ) {
299  // The buffer capacity was too small!
301  }
302  else {
303  // If this happens, a programming error or bit flip occurred:
304  // The only reason a pop should fail is if the user's buffer
305  // was too small.
306  FW_ASSERT(0);
307  }
308  }
309 
311  ret = pthread_mutex_unlock(queueLock);
312  FW_ASSERT(ret == 0, errno);
315 
316  return status;
317  }
318 
319  Queue::QueueStatus Queue::receive(U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority, QueueBlocking block) {
320 
321  if( reinterpret_cast<POINTER_CAST>(nullptr) == this->m_handle ) {
322  return QUEUE_UNINITIALIZED;
323  }
324 
325  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
326 
327  if (nullptr == queueHandle) {
328  return QUEUE_UNINITIALIZED;
329  }
330 
331  // Do not need to check the upper bound of capacity, We don't care
332  // how big the user's buffer is.. as long as it's big enough.
333  if (capacity < 0) {
334  return QUEUE_SIZE_MISMATCH;
335  }
336 
337  if( QUEUE_NONBLOCKING == block ) {
338  return receiveNonBlock(queueHandle, buffer, capacity, actualSize, priority);
339  }
340 
341  return receiveBlock(queueHandle, buffer, capacity, actualSize, priority);
342  }
343 
345  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
346  if (nullptr == queueHandle) {
347  return 0;
348  }
349  BufferQueue* queue = &queueHandle->queue;
350  return queue->getCount();
351  }
352 
354  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
355  if (nullptr == queueHandle) {
356  return 0;
357  }
358  BufferQueue* queue = &queueHandle->queue;
359  return queue->getMaxCount();
360  }
361 
363  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
364  if (nullptr == queueHandle) {
365  return 0;
366  }
367  BufferQueue* queue = &queueHandle->queue;
368  return queue->getDepth();
369  }
370 
372  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
373  if (nullptr == queueHandle) {
374  return 0;
375  }
376  BufferQueue* queue = &queueHandle->queue;
377  return queue->getMsgSize();
378  }
379 
380 }
381 
Os::QueueHandle
Definition: IPCQueueStub.cpp:29
Os
Definition: File.cpp:7
Os::sendBlock
Queue::QueueStatus sendBlock(QueueHandle *queueHandle, const U8 *buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority)
Definition: Queue.cpp:133
Os::Queue::QUEUE_FULL
@ QUEUE_FULL
queue was full when attempting to send a message
Definition: Queue.hpp:36
Os::Queue::getMsgSize
NATIVE_INT_TYPE getMsgSize() const
get the message size (maximum message size queue can hold)
Definition: Queue.cpp:244
Os::receiveBlock
Queue::QueueStatus receiveBlock(QueueHandle *queueHandle, U8 *buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority)
Definition: Queue.cpp:258
Os::QueueHandle::~QueueHandle
~QueueHandle()
Definition: Queue.cpp:40
Fw::StringBase
Definition: StringType.hpp:23
U8
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.hpp:76
Os::QueueHandle::queueNotEmpty
pthread_cond_t queueNotEmpty
Definition: IPCQueueStub.cpp:49
Os::Queue::QueueStatus
QueueStatus
Definition: Queue.hpp:27
Os::QueueHandle::queueNotFull
pthread_cond_t queueNotFull
Definition: IPCQueueStub.cpp:50
Os::Queue::getMaxMsgs
NATIVE_INT_TYPE getMaxMsgs() const
get the maximum number of messages (high watermark)
Definition: Queue.cpp:222
Os::Queue::m_handle
POINTER_CAST m_handle
handle for implementation specific queue
Definition: Queue.hpp:75
Os::BufferQueue::push
bool push(const U8 *buffer, NATIVE_UINT_TYPE size, NATIVE_INT_TYPE priority)
push an item onto the queue
Definition: BufferQueueCommon.cpp:51
Os::BufferQueue
A generic buffer queue data structure.
Definition: BufferQueue.hpp:26
Os::Queue::QUEUE_EMPTY_BUFFER
@ QUEUE_EMPTY_BUFFER
supplied buffer is empty
Definition: Queue.hpp:35
Os::Queue::getNumMsgs
NATIVE_INT_TYPE getNumMsgs() const
get the number of messages in the queue
Definition: Queue.cpp:211
Assert.hpp
Os::Queue::send
QueueStatus send(const Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE priority, QueueBlocking block)
send a message
Definition: QueueCommon.cpp:13
Os::Queue::Queue
Queue()
Definition: Queue.cpp:38
Os::Queue::QUEUE_SIZE_MISMATCH
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition: Queue.hpp:31
Os::QueueHandle::create
bool create(NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
Definition: Queue.cpp:45
Os::Queue::receive
QueueStatus receive(Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE &priority, QueueBlocking block)
receive a message
Definition: QueueCommon.cpp:22
Os::QueueHandle::QueueHandle
QueueHandle()
Definition: Queue.cpp:31
Os::QueueHandle::queue
BufferQueue queue
Definition: IPCQueueStub.cpp:48
Os::Queue::QUEUE_UNINITIALIZED
@ QUEUE_UNINITIALIZED
Queue wasn't initialized successfully.
Definition: Queue.hpp:30
Os::sendNonBlock
Queue::QueueStatus sendNonBlock(QueueHandle *queueHandle, const U8 *buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority)
Definition: Queue.cpp:95
BufferQueue.hpp
NATIVE_UINT_TYPE
unsigned int NATIVE_UINT_TYPE
native unsigned integer type declaration
Definition: BasicTypes.hpp:30
Os::receiveNonBlock
Queue::QueueStatus receiveNonBlock(QueueHandle *queueHandle, U8 *buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority)
Definition: Queue.cpp:202
Os::BufferQueue::isEmpty
bool isEmpty()
check if the queue is empty
Definition: BufferQueueCommon.cpp:95
Os::Queue::QUEUE_NO_MORE_MSGS
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition: Queue.hpp:29
FW_ASSERT
#define FW_ASSERT(...)
Definition: Assert.hpp:8
Os::BufferQueue::isFull
bool isFull()
check if the queue is full
Definition: BufferQueueCommon.cpp:91
Os::Queue::QUEUE_OK
@ QUEUE_OK
message sent/received okay
Definition: Queue.hpp:28
Os::QueueHandle::queueLock
pthread_mutex_t queueLock
Definition: IPCQueueStub.cpp:51
Os::Queue::getQueueSize
NATIVE_INT_TYPE getQueueSize() const
get the queue depth (maximum number of messages queue can hold)
Definition: Queue.cpp:233
Os::BufferQueue::create
bool create(NATIVE_UINT_TYPE depth, NATIVE_UINT_TYPE msgSize)
BufferQueue creation.
Definition: BufferQueueCommon.cpp:38
Os::Queue::QUEUE_NONBLOCKING
@ QUEUE_NONBLOCKING
Queue receive always returns even if there is no message.
Definition: Queue.hpp:42
Os::BufferQueue::pop
bool pop(U8 *buffer, NATIVE_UINT_TYPE &size, NATIVE_INT_TYPE &priority)
pop an item off the queue
Definition: BufferQueueCommon.cpp:72
Os::Queue::createInternal
QueueStatus createInternal(const Fw::StringBase &name, NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
create a message queue
Definition: Queue.cpp:42
Os::Queue::~Queue
virtual ~Queue()
Definition: Queue.cpp:65
NATIVE_INT_TYPE
int NATIVE_INT_TYPE
native integer type declaration
Definition: BasicTypes.hpp:29
Queue.hpp