F´ Flight Software - C/C++ Documentation  NASA-v2.1.0
A framework for building embedded system applications to NASA flight quality standards.
Queue.cpp
Go to the documentation of this file.
1 // ======================================================================
2 // \title Queue.cpp
3 // \author dinkel
4 // \brief Queue implementation using the pthread library. This is NOT
5 // an IPC queue. It is meant to be used between threads within
6 // the same address space.
7 //
8 // \copyright
9 // Copyright 2009-2015, by the California Institute of Technology.
10 // ALL RIGHTS RESERVED. United States Government Sponsorship
11 // acknowledged.
12 //
13 // ======================================================================
14 
16 #include <Fw/Types/Assert.hpp>
17 #include <Os/Queue.hpp>
18 
19 #include <errno.h>
20 #include <pthread.h>
21 #include <stdio.h>
22 
23 namespace Os {
24 
25  // A helper class which stores variables for the queue handle.
26  // The queue itself, a pthread condition variable, and pthread
27  // mutex are contained within this container class.
28  class QueueHandle {
29  public:
31  int ret;
32  ret = pthread_cond_init(&this->queueNotEmpty, NULL);
33  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
34  ret = pthread_cond_init(&this->queueNotFull, NULL);
35  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
36  ret = pthread_mutex_init(&this->queueLock, NULL);
37  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
38  }
40  (void) pthread_cond_destroy(&this->queueNotEmpty);
41  (void) pthread_cond_destroy(&this->queueNotFull);
42  (void) pthread_mutex_destroy(&this->queueLock);
43  }
44  bool create(NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize) {
45  return queue.create(depth, msgSize);
46  }
48  pthread_cond_t queueNotEmpty;
49  pthread_cond_t queueNotFull;
50  pthread_mutex_t queueLock;
51  };
52 
53  Queue::Queue() :
54  m_handle((POINTER_CAST) NULL) {
55  }
56 
58  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
59 
60  // Queue has already been created... remove it and try again:
61  if (NULL != queueHandle) {
62  delete queueHandle;
63  queueHandle = NULL;
64  }
65 
66  // Create queue handle:
67  queueHandle = new QueueHandle;
68  if (NULL == queueHandle) {
69  return QUEUE_UNINITIALIZED;
70  }
71  if( !queueHandle->create(depth, msgSize) ) {
72  return QUEUE_UNINITIALIZED;
73  }
74  this->m_handle = (POINTER_CAST) queueHandle;
75 
76 #if FW_QUEUE_REGISTRATION
77  if (this->s_queueRegistry) {
78  this->s_queueRegistry->regQueue(this);
79  }
80 #endif
81 
82  return QUEUE_OK;
83  }
84 
85  Queue::~Queue() {
86  // Clean up the queue handle:
87  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
88  if (NULL != queueHandle) {
89  delete queueHandle;
90  }
91  this->m_handle = (POINTER_CAST) NULL;
92  }
93 
94  Queue::QueueStatus sendNonBlock(QueueHandle* queueHandle, const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority) {
95 
96  BufferQueue* queue = &queueHandle->queue;
97  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
98  pthread_mutex_t* queueLock = &queueHandle->queueLock;
99  NATIVE_INT_TYPE ret;
101 
103  // Locked Section
105  ret = pthread_mutex_lock(queueLock);
106  FW_ASSERT(ret == 0, errno);
108 
109  // Push item onto queue:
110  bool pushSucceeded = queue->push(buffer, size, priority);
111 
112  if(pushSucceeded) {
113  // Push worked - wake up a thread that might be waiting on
114  // the other end of the queue:
115  NATIVE_INT_TYPE ret = pthread_cond_signal(queueNotEmpty);
116  FW_ASSERT(ret == 0, errno); // If this fails, something horrible happened.
117  }
118  else {
119  // Push failed - the queue is full:
120  status = Queue::QUEUE_FULL;
121  }
122 
124  ret = pthread_mutex_unlock(queueLock);
125  FW_ASSERT(ret == 0, errno);
128 
129  return status;
130  }
131 
132  Queue::QueueStatus sendBlock(QueueHandle* queueHandle, const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority) {
133 
134  BufferQueue* queue = &queueHandle->queue;
135  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
136  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
137  pthread_mutex_t* queueLock = &queueHandle->queueLock;
138  NATIVE_INT_TYPE ret;
139 
141  // Locked Section
143  ret = pthread_mutex_lock(queueLock);
144  FW_ASSERT(ret == 0, errno);
146 
147  // If the queue is full, wait until a message is taken off the queue:
148  while( queue->isFull() ) {
149  NATIVE_INT_TYPE ret = pthread_cond_wait(queueNotFull, queueLock);
150  FW_ASSERT(ret == 0, errno);
151  }
152 
153  // Push item onto queue:
154  bool pushSucceeded = queue->push(buffer, size, priority);
155 
156  // The only reason push would not succeed is if the queue
157  // was full. Since we waited for the queue to NOT be full
158  // before sending on the queue, the push must have succeeded
159  // unless there was a programming error or a bit flip.
160  FW_ASSERT(pushSucceeded, pushSucceeded);
161 
162  // Push worked - wake up a thread that might be waiting on
163  // the other end of the queue:
164  ret = pthread_cond_signal(queueNotEmpty);
165  FW_ASSERT(ret == 0, errno); // If this fails, something horrible happened.
166 
168  ret = pthread_mutex_unlock(queueLock);
169  FW_ASSERT(ret == 0, errno);
172 
173  return Queue::QUEUE_OK;
174  }
175 
176 
177  Queue::QueueStatus Queue::send(const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority, QueueBlocking block) {
178  (void) block; // Always non-blocking for now
179  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
180  BufferQueue* queue = &queueHandle->queue;
181 
182  if (NULL == queueHandle) {
183  return QUEUE_UNINITIALIZED;
184  }
185 
186  if (NULL == buffer) {
187  return QUEUE_EMPTY_BUFFER;
188  }
189 
190  if (size < 0 || (NATIVE_UINT_TYPE) size > queue->getMsgSize()) {
191  return QUEUE_SIZE_MISMATCH;
192  }
193 
194  if( QUEUE_NONBLOCKING == block ) {
195  return sendNonBlock(queueHandle, buffer, size, priority);
196  }
197 
198  return sendBlock(queueHandle, buffer, size, priority);
199  }
200 
201  Queue::QueueStatus receiveNonBlock(QueueHandle* queueHandle, U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority) {
202 
203  BufferQueue* queue = &queueHandle->queue;
204  pthread_mutex_t* queueLock = &queueHandle->queueLock;
205  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
206  NATIVE_INT_TYPE ret;
207 
208  NATIVE_UINT_TYPE size = capacity;
209  NATIVE_INT_TYPE pri = 0;
211 
213  // Locked Section
215  ret = pthread_mutex_lock(queueLock);
216  FW_ASSERT(ret == 0, errno);
218 
219  // Get an item off of the queue:
220  bool popSucceeded = queue->pop(buffer, size, pri);
221 
222  if(popSucceeded) {
223  // Pop worked - set the return size and priority:
224  actualSize = (NATIVE_INT_TYPE) size;
225  priority = pri;
226 
227  // Pop worked - wake up a thread that might be waiting on
228  // the send end of the queue:
229  NATIVE_INT_TYPE ret = pthread_cond_signal(queueNotFull);
230  FW_ASSERT(ret == 0, errno); // If this fails, something horrible happened.
231  }
232  else {
233  actualSize = 0;
234  if( size > (NATIVE_UINT_TYPE) capacity ) {
235  // The buffer capacity was too small!
237  }
238  else if( size == 0 ) {
239  // The queue is empty:
240  status = Queue::QUEUE_NO_MORE_MSGS;
241  }
242  else {
243  // If this happens, a programming error or bit flip occurred:
244  FW_ASSERT(0);
245  }
246  }
247 
249  ret = pthread_mutex_unlock(queueLock);
250  FW_ASSERT(ret == 0, errno);
253 
254  return status;
255  }
256 
257  Queue::QueueStatus receiveBlock(QueueHandle* queueHandle, U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority) {
258 
259  BufferQueue* queue = &queueHandle->queue;
260  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
261  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
262  pthread_mutex_t* queueLock = &queueHandle->queueLock;
263  NATIVE_INT_TYPE ret;
264 
265  NATIVE_UINT_TYPE size = capacity;
266  NATIVE_INT_TYPE pri = 0;
268 
270  // Locked Section
272  ret = pthread_mutex_lock(queueLock);
273  FW_ASSERT(ret == 0, errno);
275 
276  // If the queue is empty, wait until a message is put on the queue:
277  while( queue->isEmpty() ) {
278  NATIVE_INT_TYPE ret = pthread_cond_wait(queueNotEmpty, queueLock);
279  FW_ASSERT(ret == 0, errno);
280  }
281 
282  // Get an item off of the queue:
283  bool popSucceeded = queue->pop(buffer, size, pri);
284 
285  if(popSucceeded) {
286  // Pop worked - set the return size and priority:
287  actualSize = (NATIVE_INT_TYPE) size;
288  priority = pri;
289 
290  // Pop worked - wake up a thread that might be waiting on
291  // the send end of the queue:
292  NATIVE_INT_TYPE ret = pthread_cond_signal(queueNotFull);
293  FW_ASSERT(ret == 0, errno); // If this fails, something horrible happened.
294  }
295  else {
296  actualSize = 0;
297  if( size > (NATIVE_UINT_TYPE) capacity ) {
298  // The buffer capacity was too small!
300  }
301  else {
302  // If this happens, a programming error or bit flip occurred:
303  // The only reason a pop should fail is if the user's buffer
304  // was too small.
305  FW_ASSERT(0);
306  }
307  }
308 
310  ret = pthread_mutex_unlock(queueLock);
311  FW_ASSERT(ret == 0, errno);
314 
315  return status;
316  }
317 
318  Queue::QueueStatus Queue::receive(U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority, QueueBlocking block) {
319 
320  if( (POINTER_CAST) NULL == this->m_handle ) {
321  return QUEUE_UNINITIALIZED;
322  }
323 
324  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
325 
326  if (NULL == queueHandle) {
327  return QUEUE_UNINITIALIZED;
328  }
329 
330  // Do not need to check the upper bound of capacity, We don't care
331  // how big the user's buffer is.. as long as it's big enough.
332  if (capacity < 0) {
333  return QUEUE_SIZE_MISMATCH;
334  }
335 
336  if( QUEUE_NONBLOCKING == block ) {
337  return receiveNonBlock(queueHandle, buffer, capacity, actualSize, priority);
338  }
339 
340  return receiveBlock(queueHandle, buffer, capacity, actualSize, priority);
341  }
342 
343  NATIVE_INT_TYPE Queue::getNumMsgs(void) const {
344  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
345  if (NULL == queueHandle) {
346  return 0;
347  }
348  BufferQueue* queue = &queueHandle->queue;
349  return queue->getCount();
350  }
351 
352  NATIVE_INT_TYPE Queue::getMaxMsgs(void) const {
353  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
354  if (NULL == queueHandle) {
355  return 0;
356  }
357  BufferQueue* queue = &queueHandle->queue;
358  return queue->getMaxCount();
359  }
360 
362  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
363  if (NULL == queueHandle) {
364  return 0;
365  }
366  BufferQueue* queue = &queueHandle->queue;
367  return queue->getDepth();
368  }
369 
370  NATIVE_INT_TYPE Queue::getMsgSize(void) const {
371  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
372  if (NULL == queueHandle) {
373  return 0;
374  }
375  BufferQueue* queue = &queueHandle->queue;
376  return queue->getMsgSize();
377  }
378 
379 }
380 
Os::QueueHandle
Definition: IPCQueueStub.cpp:28
Os
Definition: File.cpp:7
Os::sendBlock
Queue::QueueStatus sendBlock(QueueHandle *queueHandle, const U8 *buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority)
Definition: Queue.cpp:132
Os::Queue::QUEUE_FULL
@ QUEUE_FULL
queue was full when attempting to send a message
Definition: Queue.hpp:36
Os::receiveBlock
Queue::QueueStatus receiveBlock(QueueHandle *queueHandle, U8 *buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority)
Definition: Queue.cpp:257
Os::QueueHandle::~QueueHandle
~QueueHandle()
Definition: Queue.cpp:39
Fw::StringBase
Definition: StringType.hpp:23
U8
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.hpp:76
Os::QueueHandle::queueNotEmpty
pthread_cond_t queueNotEmpty
Definition: IPCQueueStub.cpp:48
Os::Queue::QueueStatus
QueueStatus
Definition: Queue.hpp:27
Os::Queue::getQueueSize
NATIVE_INT_TYPE getQueueSize(void) const
get the queue depth (maximum number of messages queue can hold)
Definition: Queue.cpp:232
Os::QueueHandle::queueNotFull
pthread_cond_t queueNotFull
Definition: IPCQueueStub.cpp:49
Os::Queue::m_handle
POINTER_CAST m_handle
handle for implementation specific queue
Definition: Queue.hpp:75
Os::BufferQueue::push
bool push(const U8 *buffer, NATIVE_UINT_TYPE size, NATIVE_INT_TYPE priority)
push an item onto the queue
Definition: BufferQueueCommon.cpp:51
Os::BufferQueue
A generic buffer queue data structure.
Definition: BufferQueue.hpp:26
Os::Queue::QUEUE_EMPTY_BUFFER
@ QUEUE_EMPTY_BUFFER
supplied buffer is empty
Definition: Queue.hpp:35
Assert.hpp
Os::Queue::send
QueueStatus send(const Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE priority, QueueBlocking block)
send a message
Definition: QueueCommon.cpp:13
Os::Queue::Queue
Queue()
Definition: Queue.cpp:37
Os::Queue::QUEUE_SIZE_MISMATCH
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition: Queue.hpp:31
Os::QueueHandle::create
bool create(NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
Definition: Queue.cpp:44
Os::Queue::receive
QueueStatus receive(Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE &priority, QueueBlocking block)
receive a message
Definition: QueueCommon.cpp:22
Os::QueueHandle::QueueHandle
QueueHandle()
Definition: Queue.cpp:30
Os::QueueHandle::queue
BufferQueue queue
Definition: IPCQueueStub.cpp:47
Os::Queue::QUEUE_UNINITIALIZED
@ QUEUE_UNINITIALIZED
Queue wasn't initialized successfully.
Definition: Queue.hpp:30
Os::sendNonBlock
Queue::QueueStatus sendNonBlock(QueueHandle *queueHandle, const U8 *buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority)
Definition: Queue.cpp:94
BufferQueue.hpp
NATIVE_UINT_TYPE
unsigned int NATIVE_UINT_TYPE
native unsigned integer type declaration
Definition: BasicTypes.hpp:30
Os::receiveNonBlock
Queue::QueueStatus receiveNonBlock(QueueHandle *queueHandle, U8 *buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority)
Definition: Queue.cpp:201
Os::BufferQueue::isEmpty
bool isEmpty()
check if the queue is empty
Definition: BufferQueueCommon.cpp:95
Os::Queue::QUEUE_NO_MORE_MSGS
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition: Queue.hpp:29
FW_ASSERT
#define FW_ASSERT(...)
Definition: Assert.hpp:9
Os::BufferQueue::isFull
bool isFull()
check if the queue is full
Definition: BufferQueueCommon.cpp:91
Os::Queue::getMsgSize
NATIVE_INT_TYPE getMsgSize(void) const
get the message size (maximum message size queue can hold)
Definition: Queue.cpp:243
Os::Queue::getNumMsgs
NATIVE_INT_TYPE getNumMsgs(void) const
get the number of messages in the queue
Definition: Queue.cpp:210
Os::Queue::QUEUE_OK
@ QUEUE_OK
message sent/received okay
Definition: Queue.hpp:28
Os::QueueHandle::queueLock
pthread_mutex_t queueLock
Definition: IPCQueueStub.cpp:50
Os::BufferQueue::create
bool create(NATIVE_UINT_TYPE depth, NATIVE_UINT_TYPE msgSize)
BufferQueue creation.
Definition: BufferQueueCommon.cpp:38
Os::Queue::QUEUE_NONBLOCKING
@ QUEUE_NONBLOCKING
Queue receive always returns even if there is no message.
Definition: Queue.hpp:42
Os::BufferQueue::pop
bool pop(U8 *buffer, NATIVE_UINT_TYPE &size, NATIVE_INT_TYPE &priority)
pop an item off the queue
Definition: BufferQueueCommon.cpp:72
Os::Queue::getMaxMsgs
NATIVE_INT_TYPE getMaxMsgs(void) const
get the maximum number of messages (high watermark)
Definition: Queue.cpp:221
Os::Queue::createInternal
QueueStatus createInternal(const Fw::StringBase &name, NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
create a message queue
Definition: Queue.cpp:41
Os::Queue::~Queue
virtual ~Queue()
Definition: Queue.cpp:64
NATIVE_INT_TYPE
int NATIVE_INT_TYPE
native integer type declaration
Definition: BasicTypes.hpp:29
NULL
#define NULL
NULL.
Definition: BasicTypes.hpp:100
Queue.hpp