F´ Flight Software - C/C++ Documentation  devel
A framework for building embedded system applications to NASA flight quality standards.
IPCQueueStub.cpp
Go to the documentation of this file.
1 // ======================================================================
2 // \title Queue.cpp
3 // \author dinkel
4 // \brief Queue implementation using the pthread library. This is NOT
5 // an IPC queue. It is meant to be used between threads within
6 // the same address space.
7 //
8 // \copyright
9 // Copyright 2009-2015, by the California Institute of Technology.
10 // ALL RIGHTS RESERVED. United States Government Sponsorship
11 // acknowledged.
12 //
13 // ======================================================================
14 
16 #include <Fw/Types/Assert.hpp>
17 #include <Os/IPCQueue.hpp>
18 
19 #include <cerrno>
20 #include <pthread.h>
21 #include <cstdio>
22 #include <new>
23 
24 namespace Os {
25 
26  // A helper class which stores variables for the queue handle.
27  // The queue itself, a pthread condition variable, and pthread
28  // mutex are contained within this container class.
29  class QueueHandle {
30  public:
32  int ret;
33  ret = pthread_cond_init(&this->queueNotEmpty, nullptr);
34  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
35  ret = pthread_cond_init(&this->queueNotFull, nullptr);
36  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
37  ret = pthread_mutex_init(&this->queueLock, nullptr);
38  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
39  }
41  (void) pthread_cond_destroy(&this->queueNotEmpty);
42  (void) pthread_cond_destroy(&this->queueNotFull);
43  (void) pthread_mutex_destroy(&this->queueLock);
44  }
45  bool create(NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize) {
46  return queue.create(static_cast<NATIVE_UINT_TYPE>(depth), static_cast<NATIVE_UINT_TYPE>(msgSize));
47  }
49  pthread_cond_t queueNotEmpty;
50  pthread_cond_t queueNotFull;
51  pthread_mutex_t queueLock;
52  };
53 
55  }
56 
58  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
59 
60  // Queue has already been created... remove it and try again:
61  if (nullptr != queueHandle) {
62  delete queueHandle;
63  queueHandle = nullptr;
64  }
65 
66  // Create queue handle:
67  queueHandle = new(std::nothrow) QueueHandle;
68  if (nullptr == queueHandle) {
69  return QUEUE_UNINITIALIZED;
70  }
71  if( !queueHandle->create(depth, msgSize) ) {
72  return QUEUE_UNINITIALIZED;
73  }
74  this->m_handle = reinterpret_cast<POINTER_CAST>(queueHandle);
75 
76 #if FW_QUEUE_REGISTRATION
77  if (this->s_queueRegistry) {
78  this->s_queueRegistry->regQueue(this);
79  }
80 #endif
81 
82  return QUEUE_OK;
83  }
84 
86  // Clean up the queue handle:
87  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
88  if (nullptr != queueHandle) {
89  delete queueHandle;
90  }
91  this->m_handle = reinterpret_cast<POINTER_CAST>(nullptr);
92  }
93 
95 
96  BufferQueue* queue = &queueHandle->queue;
97  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
98  pthread_mutex_t* queueLock = &queueHandle->queueLock;
99  NATIVE_INT_TYPE ret;
101 
103  // Locked Section
105  ret = pthread_mutex_lock(queueLock);
106  FW_ASSERT(ret == 0, errno);
108 
109  // Push item onto queue:
110  bool pushSucceeded = queue->push(buffer, static_cast<NATIVE_UINT_TYPE>(size), priority);
111 
112  if(pushSucceeded) {
113  // Push worked - wake up a thread that might be waiting on
114  // the other end of the queue:
115  ret = pthread_cond_signal(queueNotEmpty);
116  FW_ASSERT(ret == 0, errno); // If this fails, something horrible happened.
117  }
118  else {
119  // Push failed - the queue is full:
120  status = Queue::QUEUE_FULL;
121  }
122 
124  ret = pthread_mutex_unlock(queueLock);
125  FW_ASSERT(ret == 0, errno);
128 
129  return status;
130  }
131 
132  Queue::QueueStatus sendBlockIPCStub(QueueHandle* queueHandle, const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority) {
133 
134  BufferQueue* queue = &queueHandle->queue;
135  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
136  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
137  pthread_mutex_t* queueLock = &queueHandle->queueLock;
138  NATIVE_INT_TYPE ret;
139 
141  // Locked Section
143  ret = pthread_mutex_lock(queueLock);
144  FW_ASSERT(ret == 0, errno);
146 
147  // If the queue is full, wait until a message is taken off the queue:
148  while( queue->isFull() ) {
149  ret = pthread_cond_wait(queueNotFull, queueLock);
150  FW_ASSERT(ret == 0, errno);
151  }
152 
153  // Push item onto queue:
154  bool pushSucceeded = queue->push(buffer, static_cast<NATIVE_UINT_TYPE>(size), priority);
155 
156  // The only reason push would not succeed is if the queue
157  // was full. Since we waited for the queue to NOT be full
158  // before sending on the queue, the push must have succeeded
159  // unless there was a programming error or a bit flip.
160  FW_ASSERT(pushSucceeded, pushSucceeded);
161 
162  // Push worked - wake up a thread that might be waiting on
163  // the other end of the queue:
164  ret = pthread_cond_signal(queueNotEmpty);
165  FW_ASSERT(ret == 0, errno); // If this fails, something horrible happened.
166 
168  ret = pthread_mutex_unlock(queueLock);
169  FW_ASSERT(ret == 0, errno);
172 
173  return Queue::QUEUE_OK;
174  }
175 
176 
178  (void) block; // Always non-blocking for now
179  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
180  BufferQueue* queue = &queueHandle->queue;
181 
182  if (nullptr == queueHandle) {
183  return QUEUE_UNINITIALIZED;
184  }
185 
186  if (nullptr == buffer) {
187  return QUEUE_EMPTY_BUFFER;
188  }
189 
190  if (size < 0 || static_cast<NATIVE_UINT_TYPE>(size) > queue->getMsgSize()) {
191  return QUEUE_SIZE_MISMATCH;
192  }
193 
194  if( QUEUE_NONBLOCKING == block ) {
195  return sendNonBlockIPCStub(queueHandle, buffer, size, priority);
196  }
197 
198  return sendBlockIPCStub(queueHandle, buffer, size, priority);
199  }
200 
201  Queue::QueueStatus receiveNonBlockIPCStub(QueueHandle* queueHandle, U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority) {
202 
203  BufferQueue* queue = &queueHandle->queue;
204  pthread_mutex_t* queueLock = &queueHandle->queueLock;
205  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
206  NATIVE_INT_TYPE ret;
207 
208  NATIVE_UINT_TYPE size = static_cast<NATIVE_UINT_TYPE>(capacity);
209  NATIVE_INT_TYPE pri = 0;
211 
213  // Locked Section
215  ret = pthread_mutex_lock(queueLock);
216  FW_ASSERT(ret == 0, errno);
218 
219  // Get an item off of the queue:
220  bool popSucceeded = queue->pop(buffer, size, pri);
221 
222  if(popSucceeded) {
223  // Pop worked - set the return size and priority:
224  actualSize = static_cast<NATIVE_INT_TYPE>(size);
225  priority = pri;
226 
227  // Pop worked - wake up a thread that might be waiting on
228  // the send end of the queue:
229  ret = pthread_cond_signal(queueNotFull);
230  FW_ASSERT(ret == 0, errno); // If this fails, something horrible happened.
231  }
232  else {
233  actualSize = 0;
234  if( size > static_cast<NATIVE_UINT_TYPE>(capacity) ) {
235  // The buffer capacity was too small!
237  }
238  else if( size == 0 ) {
239  // The queue is empty:
240  status = Queue::QUEUE_NO_MORE_MSGS;
241  }
242  else {
243  // If this happens, a programming error or bit flip occurred:
244  FW_ASSERT(0);
245  }
246  }
247 
249  ret = pthread_mutex_unlock(queueLock);
250  FW_ASSERT(ret == 0, errno);
253 
254  return status;
255  }
256 
257  Queue::QueueStatus receiveBlockIPCStub(QueueHandle* queueHandle, U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority) {
258 
259  BufferQueue* queue = &queueHandle->queue;
260  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
261  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
262  pthread_mutex_t* queueLock = &queueHandle->queueLock;
263  NATIVE_INT_TYPE ret;
264 
265  NATIVE_UINT_TYPE size = static_cast<NATIVE_UINT_TYPE>(capacity);
266  NATIVE_INT_TYPE pri = 0;
268 
270  // Locked Section
272  ret = pthread_mutex_lock(queueLock);
273  FW_ASSERT(ret == 0, errno);
275 
276  // If the queue is empty, wait until a message is put on the queue:
277  while( queue->isEmpty() ) {
278  ret = pthread_cond_wait(queueNotEmpty, queueLock);
279  FW_ASSERT(ret == 0, errno);
280  }
281 
282  // Get an item off of the queue:
283  bool popSucceeded = queue->pop(buffer, size, pri);
284 
285  if(popSucceeded) {
286  // Pop worked - set the return size and priority:
287  actualSize = static_cast<NATIVE_INT_TYPE>(size);
288  priority = pri;
289 
290  // Pop worked - wake up a thread that might be waiting on
291  // the send end of the queue:
292  ret = pthread_cond_signal(queueNotFull);
293  FW_ASSERT(ret == 0, errno); // If this fails, something horrible happened.
294  }
295  else {
296  actualSize = 0;
297  if( size > static_cast<NATIVE_UINT_TYPE>(capacity) ) {
298  // The buffer capacity was too small!
300  }
301  else {
302  // If this happens, a programming error or bit flip occurred:
303  // The only reason a pop should fail is if the user's buffer
304  // was too small.
305  FW_ASSERT(0);
306  }
307  }
308 
310  ret = pthread_mutex_unlock(queueLock);
311  FW_ASSERT(ret == 0, errno);
314 
315  return status;
316  }
317 
319 
320  if( reinterpret_cast<POINTER_CAST>(nullptr) == this->m_handle ) {
321  return QUEUE_UNINITIALIZED;
322  }
323 
324  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
325 
326  if (nullptr == queueHandle) {
327  return QUEUE_UNINITIALIZED;
328  }
329 
330  // Do not need to check the upper bound of capacity, We don't care
331  // how big the user's buffer is.. as long as it's big enough.
332  if (capacity < 0) {
333  return QUEUE_SIZE_MISMATCH;
334  }
335 
336  if( QUEUE_NONBLOCKING == block ) {
337  return receiveNonBlockIPCStub(queueHandle, buffer, capacity, actualSize, priority);
338  }
339 
340  return receiveBlockIPCStub(queueHandle, buffer, capacity, actualSize, priority);
341  }
342 
344  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
345  if (nullptr == queueHandle) {
346  return 0;
347  }
348  BufferQueue* queue = &queueHandle->queue;
349  return static_cast<NATIVE_INT_TYPE>(queue->getCount());
350  }
351 
353  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
354  if (nullptr == queueHandle) {
355  return 0;
356  }
357  BufferQueue* queue = &queueHandle->queue;
358  return static_cast<NATIVE_INT_TYPE>(queue->getMaxCount());
359  }
360 
362  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
363  if (nullptr == queueHandle) {
364  return 0;
365  }
366  BufferQueue* queue = &queueHandle->queue;
367  return static_cast<NATIVE_INT_TYPE>(queue->getDepth());
368  }
369 
371  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
372  if (nullptr == queueHandle) {
373  return 0;
374  }
375  BufferQueue* queue = &queueHandle->queue;
376  return static_cast<NATIVE_INT_TYPE>(queue->getMsgSize());
377  }
378 
379 }
#define FW_ASSERT(...)
Definition: Assert.hpp:14
PlatformPointerCastType POINTER_CAST
Definition: BasicTypes.h:53
PlatformIntType NATIVE_INT_TYPE
Definition: BasicTypes.h:51
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.h:26
PlatformUIntType NATIVE_UINT_TYPE
Definition: BasicTypes.h:52
A generic buffer queue data structure.
Definition: BufferQueue.hpp:26
bool create(NATIVE_UINT_TYPE depth, NATIVE_UINT_TYPE msgSize)
BufferQueue creation.
NATIVE_UINT_TYPE getCount()
Get the current number of items on the queue.
NATIVE_UINT_TYPE getMsgSize()
Get the maximum message size.
bool pop(U8 *buffer, NATIVE_UINT_TYPE &size, NATIVE_INT_TYPE &priority)
pop an item off the queue
NATIVE_UINT_TYPE getDepth()
Get the queue depths.
bool isEmpty()
check if the queue is empty
bool push(const U8 *buffer, NATIVE_UINT_TYPE size, NATIVE_INT_TYPE priority)
push an item onto the queue
bool isFull()
check if the queue is full
NATIVE_UINT_TYPE getMaxCount()
Get the maximum number of items seen on the queue.
NATIVE_INT_TYPE getQueueSize() const
get the queue depth (maximum number of messages queue can hold)
NATIVE_INT_TYPE getMsgSize() const
get the message size (maximum message size queue can hold)
QueueStatus create(const Fw::StringBase &name, NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
create a message queue
QueueStatus send(const Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE priority, QueueBlocking block)
send a message
NATIVE_INT_TYPE getNumMsgs() const
get the number of messages in the queue
QueueStatus receive(Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE &priority, QueueBlocking block)
receive a message
NATIVE_INT_TYPE getMaxMsgs() const
get the maximum number of messages (high watermark)
pthread_cond_t queueNotEmpty
bool create(NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
pthread_mutex_t queueLock
BufferQueue queue
pthread_cond_t queueNotFull
QueueStatus
Definition: Queue.hpp:27
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition: Queue.hpp:31
@ QUEUE_UNINITIALIZED
Queue wasn't initialized successfully.
Definition: Queue.hpp:30
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition: Queue.hpp:29
@ QUEUE_EMPTY_BUFFER
supplied buffer is empty
Definition: Queue.hpp:35
@ QUEUE_OK
message sent/received okay
Definition: Queue.hpp:28
@ QUEUE_FULL
queue was full when attempting to send a message
Definition: Queue.hpp:36
QueueBlocking
Definition: Queue.hpp:40
@ QUEUE_NONBLOCKING
Queue receive always returns even if there is no message.
Definition: Queue.hpp:42
POINTER_CAST m_handle
handle for implementation specific queue
Definition: Queue.hpp:75
Queue::QueueStatus sendNonBlockIPCStub(QueueHandle *queueHandle, const U8 *buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority)
Queue::QueueStatus receiveNonBlockIPCStub(QueueHandle *queueHandle, U8 *buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority)
Queue::QueueStatus receiveBlockIPCStub(QueueHandle *queueHandle, U8 *buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority)
Queue::QueueStatus sendBlockIPCStub(QueueHandle *queueHandle, const U8 *buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority)