F´ Flight Software - C/C++ Documentation  NASA-v1.5.0
A framework for building embedded system applications to NASA flight quality standards.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Queue.cpp
Go to the documentation of this file.
1 // ======================================================================
2 // \title Queue.cpp
3 // \author dinkel
4 // \brief Queue implementation using the pthread library. This is NOT
5 // an IPC queue. It is meant to be used between threads within
6 // the same address space.
7 //
8 // \copyright
9 // Copyright 2009-2015, by the California Institute of Technology.
10 // ALL RIGHTS RESERVED. United States Government Sponsorship
11 // acknowledged.
12 //
13 // ======================================================================
14 
16 #include <Fw/Types/Assert.hpp>
17 #include <Os/Queue.hpp>
18 
19 #include <errno.h>
20 #include <pthread.h>
21 #include <stdio.h>
22 
23 namespace Os {
24 
25  // A helper class which stores variables for the queue handle.
26  // The queue itself, a pthread condition variable, and pthread
27  // mutex are contained within this container class.
28  class QueueHandle {
29  public:
31  int ret;
32  ret = pthread_cond_init(&this->queueNotEmpty, NULL);
33  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
34  ret = pthread_cond_init(&this->queueNotFull, NULL);
35  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
36  ret = pthread_mutex_init(&this->queueLock, NULL);
37  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
38  }
40  (void) pthread_cond_destroy(&this->queueNotEmpty);
41  (void) pthread_cond_destroy(&this->queueNotFull);
42  (void) pthread_mutex_destroy(&this->queueLock);
43  }
44  bool create(NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize) {
45  return queue.create(depth, msgSize);
46  }
48  pthread_cond_t queueNotEmpty;
49  pthread_cond_t queueNotFull;
50  pthread_mutex_t queueLock;
51  };
52 
53  Queue::Queue() :
54  m_handle((POINTER_CAST) NULL) {
55  }
56 
58  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
59 
60  // Queue has already been created... remove it and try again:
61  if (NULL != queueHandle) {
62  delete queueHandle;
63  queueHandle = NULL;
64  }
65 
66  // Create queue handle:
67  queueHandle = new QueueHandle;
68  if (NULL == queueHandle) {
69  return QUEUE_UNINITIALIZED;
70  }
71  if( !queueHandle->create(depth, msgSize) ) {
72  return QUEUE_UNINITIALIZED;
73  }
74  this->m_handle = (POINTER_CAST) queueHandle;
75 
76 #if FW_QUEUE_REGISTRATION
77  if (this->s_queueRegistry) {
78  this->s_queueRegistry->regQueue(this);
79  }
80 #endif
81 
82  return QUEUE_OK;
83  }
84 
85  Queue::~Queue() {
86  // Clean up the queue handle:
87  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
88  if (NULL != queueHandle) {
89  delete queueHandle;
90  }
91  this->m_handle = (POINTER_CAST) NULL;
92  }
93 
94  Queue::QueueStatus sendNonBlock(QueueHandle* queueHandle, const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority) {
95 
96  BufferQueue* queue = &queueHandle->queue;
97  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
98  pthread_mutex_t* queueLock = &queueHandle->queueLock;
99  NATIVE_INT_TYPE ret;
101 
103  // Locked Section
105  ret = pthread_mutex_lock(queueLock);
106  FW_ASSERT(ret == 0, errno);
108 
109  // Push item onto queue:
110  bool pushSucceeded = queue->push(buffer, size, priority);
111 
112  if(pushSucceeded) {
113  // Push worked - wake up a thread that might be waiting on
114  // the other end of the queue:
115  NATIVE_INT_TYPE ret = pthread_cond_signal(queueNotEmpty);
116  FW_ASSERT(ret == 0, errno); // If this fails, something horrible happened.
117  }
118  else {
119  // Push failed - the queue is full:
120  status = Queue::QUEUE_FULL;
121  }
122 
124  ret = pthread_mutex_unlock(queueLock);
125  FW_ASSERT(ret == 0, errno);
128 
129  return status;
130  }
131 
132  Queue::QueueStatus sendBlock(QueueHandle* queueHandle, const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority) {
133 
134  BufferQueue* queue = &queueHandle->queue;
135  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
136  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
137  pthread_mutex_t* queueLock = &queueHandle->queueLock;
138  NATIVE_INT_TYPE ret;
139 
141  // Locked Section
143  ret = pthread_mutex_lock(queueLock);
144  FW_ASSERT(ret == 0, errno);
146 
147  // If the queue is full, wait until a message is taken off the queue:
148  while( queue->isFull() ) {
149  NATIVE_INT_TYPE ret = pthread_cond_wait(queueNotFull, queueLock);
150  FW_ASSERT(ret == 0, errno);
151  }
152 
153  // Push item onto queue:
154  bool pushSucceeded = queue->push(buffer, size, priority);
155 
156  // The only reason push would not succeed is if the queue
157  // was full. Since we waited for the queue to NOT be full
158  // before sending on the queue, the push must have succeeded
159  // unless there was a programming error or a bit flip.
160  FW_ASSERT(pushSucceeded, pushSucceeded);
161 
162  // Push worked - wake up a thread that might be waiting on
163  // the other end of the queue:
164  ret = pthread_cond_signal(queueNotEmpty);
165  FW_ASSERT(ret == 0, errno); // If this fails, something horrible happened.
166 
168  ret = pthread_mutex_unlock(queueLock);
169  FW_ASSERT(ret == 0, errno);
172 
173  return Queue::QUEUE_OK;
174  }
175 
176 
177  Queue::QueueStatus Queue::send(const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority, QueueBlocking block) {
178  (void) block; // Always non-blocking for now
179  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
180  BufferQueue* queue = &queueHandle->queue;
181 
182  if (NULL == queueHandle) {
183  return QUEUE_UNINITIALIZED;
184  }
185 
186  if (NULL == buffer) {
187  return QUEUE_EMPTY_BUFFER;
188  }
189 
190  if (size < 0 || (NATIVE_UINT_TYPE) size > queue->getMsgSize()) {
191  return QUEUE_SIZE_MISMATCH;
192  }
193 
194  if( QUEUE_NONBLOCKING == block ) {
195  return sendNonBlock(queueHandle, buffer, size, priority);
196  }
197 
198  return sendBlock(queueHandle, buffer, size, priority);
199  }
200 
201  Queue::QueueStatus receiveNonBlock(QueueHandle* queueHandle, U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority) {
202 
203  BufferQueue* queue = &queueHandle->queue;
204  pthread_mutex_t* queueLock = &queueHandle->queueLock;
205  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
206  NATIVE_INT_TYPE ret;
207 
208  NATIVE_UINT_TYPE size = capacity;
209  NATIVE_INT_TYPE pri = 0;
211 
213  // Locked Section
215  ret = pthread_mutex_lock(queueLock);
216  FW_ASSERT(ret == 0, errno);
218 
219  // Get an item off of the queue:
220  bool popSucceeded = queue->pop(buffer, size, pri);
221 
222  if(popSucceeded) {
223  // Pop worked - set the return size and priority:
224  actualSize = (NATIVE_INT_TYPE) size;
225  priority = pri;
226 
227  // Pop worked - wake up a thread that might be waiting on
228  // the send end of the queue:
229  NATIVE_INT_TYPE ret = pthread_cond_signal(queueNotFull);
230  FW_ASSERT(ret == 0, errno); // If this fails, something horrible happened.
231  }
232  else {
233  actualSize = 0;
234  if( size > (NATIVE_UINT_TYPE) capacity ) {
235  // The buffer capacity was too small!
237  }
238  else if( size == 0 ) {
239  // The queue is empty:
240  status = Queue::QUEUE_NO_MORE_MSGS;
241  }
242  else {
243  // If this happens, a programming error or bit flip occured:
244  FW_ASSERT(0);
245  }
246  }
247 
249  ret = pthread_mutex_unlock(queueLock);
250  FW_ASSERT(ret == 0, errno);
253 
254  return status;
255  }
256 
257  Queue::QueueStatus receiveBlock(QueueHandle* queueHandle, U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority) {
258 
259  BufferQueue* queue = &queueHandle->queue;
260  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
261  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
262  pthread_mutex_t* queueLock = &queueHandle->queueLock;
263  NATIVE_INT_TYPE ret;
264 
265  NATIVE_UINT_TYPE size = capacity;
266  NATIVE_INT_TYPE pri = 0;
268 
270  // Locked Section
272  ret = pthread_mutex_lock(queueLock);
273  FW_ASSERT(ret == 0, errno);
275 
276  // If the queue is empty, wait until a message is put on the queue:
277  while( queue->isEmpty() ) {
278  NATIVE_INT_TYPE ret = pthread_cond_wait(queueNotEmpty, queueLock);
279  FW_ASSERT(ret == 0, errno);
280  }
281 
282  // Get an item off of the queue:
283  bool popSucceeded = queue->pop(buffer, size, pri);
284 
285  if(popSucceeded) {
286  // Pop worked - set the return size and priority:
287  actualSize = (NATIVE_INT_TYPE) size;
288  priority = pri;
289 
290  // Pop worked - wake up a thread that might be waiting on
291  // the send end of the queue:
292  NATIVE_INT_TYPE ret = pthread_cond_signal(queueNotFull);
293  FW_ASSERT(ret == 0, errno); // If this fails, something horrible happened.
294  }
295  else {
296  actualSize = 0;
297  if( size > (NATIVE_UINT_TYPE) capacity ) {
298  // The buffer capacity was too small!
300  }
301  else {
302  // If this happens, a programming error or bit flip occured:
303  // The only reason a pop should fail is if the user's buffer
304  // was too small.
305  FW_ASSERT(0);
306  }
307  }
308 
310  ret = pthread_mutex_unlock(queueLock);
311  FW_ASSERT(ret == 0, errno);
314 
315  return status;
316  }
317 
318  Queue::QueueStatus Queue::receive(U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority, QueueBlocking block) {
319 
320  if( (POINTER_CAST) NULL == this->m_handle ) {
321  return QUEUE_UNINITIALIZED;
322  }
323 
324  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
325 
326  if (NULL == queueHandle) {
327  return QUEUE_UNINITIALIZED;
328  }
329 
330  // Do not need to check the upper bound of capacity, We don't care
331  // how big the user's buffer is.. as long as it's big enough.
332  if (capacity < 0) {
333  return QUEUE_SIZE_MISMATCH;
334  }
335 
336  if( QUEUE_NONBLOCKING == block ) {
337  return receiveNonBlock(queueHandle, buffer, capacity, actualSize, priority);
338  }
339 
340  return receiveBlock(queueHandle, buffer, capacity, actualSize, priority);
341  }
342 
343  NATIVE_INT_TYPE Queue::getNumMsgs(void) const {
344  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
345  if (NULL == queueHandle) {
346  return 0;
347  }
348  BufferQueue* queue = &queueHandle->queue;
349  return queue->getCount();
350  }
351 
352  NATIVE_INT_TYPE Queue::getMaxMsgs(void) const {
353  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
354  if (NULL == queueHandle) {
355  return 0;
356  }
357  BufferQueue* queue = &queueHandle->queue;
358  return queue->getMaxCount();
359  }
360 
362  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
363  if (NULL == queueHandle) {
364  return 0;
365  }
366  BufferQueue* queue = &queueHandle->queue;
367  return queue->getDepth();
368  }
369 
370  NATIVE_INT_TYPE Queue::getMsgSize(void) const {
371  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
372  if (NULL == queueHandle) {
373  return 0;
374  }
375  BufferQueue* queue = &queueHandle->queue;
376  return queue->getMsgSize();
377  }
378 
379 }
380 
Os::QueueHandle
Definition: IPCQueueStub.cpp:28
Os
Definition: File.cpp:7
Os::sendBlock
Queue::QueueStatus sendBlock(QueueHandle *queueHandle, const U8 *buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority)
Definition: Queue.cpp:132
Os::Queue::QUEUE_FULL
@ QUEUE_FULL
queue was full when attempting to send a message
Definition: Queue.hpp:36
Os::receiveBlock
Queue::QueueStatus receiveBlock(QueueHandle *queueHandle, U8 *buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority)
Definition: Queue.cpp:257
Os::QueueHandle::~QueueHandle
~QueueHandle()
Definition: Queue.cpp:39
Fw::StringBase
Definition: StringType.hpp:23
U8
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.hpp:76
Os::QueueHandle::queueNotEmpty
pthread_cond_t queueNotEmpty
Definition: IPCQueueStub.cpp:48
Os::Queue::QueueStatus
QueueStatus
Definition: Queue.hpp:27
Os::Queue::getQueueSize
NATIVE_INT_TYPE getQueueSize(void) const
get the queue depth (maximum number of messages queue can hold)
Definition: Queue.cpp:249
Os::QueueHandle::queueNotFull
pthread_cond_t queueNotFull
Definition: IPCQueueStub.cpp:49
Os::Queue::m_handle
POINTER_CAST m_handle
handle for implementation specific queue
Definition: Queue.hpp:69
Os::BufferQueue::push
bool push(const U8 *buffer, NATIVE_UINT_TYPE size, NATIVE_INT_TYPE priority)
push an item onto the queue
Definition: BufferQueueCommon.cpp:51
Os::BufferQueue
A generic buffer queue data structure.
Definition: BufferQueue.hpp:26
Os::Queue::QUEUE_EMPTY_BUFFER
@ QUEUE_EMPTY_BUFFER
supplied buffer is empty
Definition: Queue.hpp:35
Assert.hpp
Os::Queue::send
QueueStatus send(const Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE priority, QueueBlocking block)
send a message
Definition: QueueCommon.cpp:13
Os::Queue::Queue
Queue()
Definition: Queue.cpp:39
Os::Queue::QUEUE_SIZE_MISMATCH
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition: Queue.hpp:31
Os::QueueHandle::create
bool create(NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
Definition: Queue.cpp:44
Os::Queue::receive
QueueStatus receive(Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE &priority, QueueBlocking block)
receive a message
Definition: QueueCommon.cpp:22
Os::QueueHandle::QueueHandle
QueueHandle()
Definition: Queue.cpp:30
Os::QueueHandle::queue
BufferQueue queue
Definition: IPCQueueStub.cpp:47
Os::Queue::QUEUE_UNINITIALIZED
@ QUEUE_UNINITIALIZED
Queue wasn't initialized successfully.
Definition: Queue.hpp:30
Os::sendNonBlock
Queue::QueueStatus sendNonBlock(QueueHandle *queueHandle, const U8 *buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority)
Definition: Queue.cpp:94
BufferQueue.hpp
NATIVE_UINT_TYPE
unsigned int NATIVE_UINT_TYPE
native unsigned integer type declaration
Definition: BasicTypes.hpp:30
Os::receiveNonBlock
Queue::QueueStatus receiveNonBlock(QueueHandle *queueHandle, U8 *buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority)
Definition: Queue.cpp:201
Os::BufferQueue::isEmpty
bool isEmpty()
check if the queue is empty
Definition: BufferQueueCommon.cpp:95
Os::Queue::QUEUE_NO_MORE_MSGS
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition: Queue.hpp:29
FW_ASSERT
#define FW_ASSERT(...)
Definition: Assert.hpp:9
Os::BufferQueue::isFull
bool isFull()
check if the queue is full
Definition: BufferQueueCommon.cpp:91
Os::Queue::getMsgSize
NATIVE_INT_TYPE getMsgSize(void) const
get the message size (maximum message size queue can hold)
Definition: Queue.cpp:260
Os::Queue::getNumMsgs
NATIVE_INT_TYPE getNumMsgs(void) const
get the number of messages in the queue
Definition: Queue.cpp:227
Os::Queue::QUEUE_OK
@ QUEUE_OK
message sent/received okay
Definition: Queue.hpp:28
Os::QueueHandle::queueLock
pthread_mutex_t queueLock
Definition: IPCQueueStub.cpp:50
Os::BufferQueue::create
bool create(NATIVE_UINT_TYPE depth, NATIVE_UINT_TYPE msgSize)
BufferQueue creation.
Definition: BufferQueueCommon.cpp:38
Os::Queue::QUEUE_NONBLOCKING
@ QUEUE_NONBLOCKING
Queue receive always returns even if there is no message.
Definition: Queue.hpp:42
Os::BufferQueue::pop
bool pop(U8 *buffer, NATIVE_UINT_TYPE &size, NATIVE_INT_TYPE &priority)
pop an item off the queue
Definition: BufferQueueCommon.cpp:72
Os::Queue::getMaxMsgs
NATIVE_INT_TYPE getMaxMsgs(void) const
get the maximum number of messages (high watermark)
Definition: Queue.cpp:238
Os::Queue::createInternal
QueueStatus createInternal(const Fw::StringBase &name, NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
create a message queue
Definition: Queue.cpp:51
Os::Queue::~Queue
virtual ~Queue()
Definition: Queue.cpp:76
NATIVE_INT_TYPE
int NATIVE_INT_TYPE
native integer type declaration
Definition: BasicTypes.hpp:29
NULL
#define NULL
NULL.
Definition: BasicTypes.hpp:100
Queue.hpp