F´ Flight Software - C/C++ Documentation  NASA-v1.5.0
A framework for building embedded system applications to NASA flight quality standards.
Queue.cpp
Go to the documentation of this file.
1 #include <Fw/Types/Assert.hpp>
2 #include <Os/Queue.hpp>
3 
4 #ifdef TGT_OS_TYPE_VXWORKS
5  #include <vxWorks.h>
6 #endif
7 
8 #ifdef TGT_OS_TYPE_LINUX
9  #include <sys/types.h>
10  #include <unistd.h>
11 #endif
12 
13 #include <mqueue.h>
14 #include <fcntl.h>
15 #include <errno.h>
16 #include <string.h>
17 #include <stdio.h>
18 #include <time.h>
19 #include <pthread.h>
20 
21 namespace Os {
22 
23  class QueueHandle {
24  public:
25  QueueHandle(mqd_t m_handle) {
26  // Initialize the handle:
27  int ret;
28  ret = pthread_cond_init(&this->queueNotEmpty, NULL);
29  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
30  ret = pthread_cond_init(&this->queueNotFull, NULL);
31  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
32  ret = pthread_mutex_init(&this->mp, NULL);
33  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
34  this->handle = m_handle;
35  }
37  // Destroy the handle:
38  if (-1 != this->handle) {
39  (void) mq_close(this->handle);
40  }
41  (void) pthread_cond_destroy(&this->queueNotEmpty);
42  (void) pthread_mutex_destroy(&this->mp);
43  }
44  mqd_t handle;
45  pthread_cond_t queueNotEmpty;
46  pthread_cond_t queueNotFull;
47  pthread_mutex_t mp;
48  };
49 
50  Queue::Queue() :
51  m_handle(-1) {
52  }
53 
55 
56  this->m_name = "/QP_";
57  this->m_name += name;
58 #ifndef TGT_OS_TYPE_VXWORKS
59  char pid[40];
60  (void)snprintf(pid,sizeof(pid),".%d",getpid());
61  pid[sizeof(pid)-1] = 0;
62  this->m_name += pid;
63 #endif
64  mq_attr att;
65  mqd_t handle;
66 
67  memset(&att,0,sizeof(att));
68  att.mq_maxmsg = depth;
69  att.mq_msgsize = msgSize;
70  att.mq_flags = 0;
71  att.mq_curmsgs = 0;
72 
73  handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL | O_NONBLOCK, 0666, &att);
74 
75  // If queue already exists, then unlink it and try again.
76  if (-1 == (NATIVE_INT_TYPE) handle) {
77  switch (errno) {
78  case EEXIST:
79  (void)mq_unlink(this->m_name.toChar());
80  break;
81  default:
82  return QUEUE_UNINITIALIZED;
83  }
84 
85  handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL, 0666, &att);
86 
87  if (-1 == (NATIVE_INT_TYPE) handle) {
88  return QUEUE_UNINITIALIZED;
89  }
90  }
91 
92  // Set up queue handle:
93  QueueHandle* queueHandle = new QueueHandle(handle);
94  if (NULL == queueHandle) {
95  return QUEUE_UNINITIALIZED;
96  }
97  this->m_handle = (POINTER_CAST) queueHandle;
98 
100 
101  return QUEUE_OK;
102  }
103 
104  Queue::~Queue() {
105  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
106  delete queueHandle;
107  (void) mq_unlink(this->m_name.toChar());
108  }
109 
110  Queue::QueueStatus Queue::send(const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority, QueueBlocking block) {
111 
112  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
113  mqd_t handle = queueHandle->handle;
114  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
115  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
116  pthread_mutex_t* mp = &queueHandle->mp;
117 
118  if (-1 == handle) {
119  return QUEUE_UNINITIALIZED;
120  }
121 
122  if (NULL == buffer) {
123  return QUEUE_EMPTY_BUFFER;
124  }
125 
126  bool keepTrying = true;
127  int ret;
128  while (keepTrying) {
129  NATIVE_INT_TYPE stat = mq_send(handle, (const char*) buffer, size, priority);
130  if (-1 == stat) {
131  switch (errno) {
132  case EINTR:
133  continue;
134  case EMSGSIZE:
135  return QUEUE_SIZE_MISMATCH;
136  case EINVAL:
137  return QUEUE_INVALID_PRIORITY;
138  case EAGAIN:
139  if (block == QUEUE_NONBLOCKING) {
140  // no more messages. If we are
141  // non-blocking, return
142  return QUEUE_FULL;
143  } else {
144  // Go to sleep until we receive a signal that something was takeng off the queue:
145  // Note: pthread_cont_wait must be called "with mutex locked by the calling
146  // thread or undefined behavior results." - from the docs
147  ret = pthread_mutex_lock(mp);
148  FW_ASSERT(ret == 0, errno);
149  ret = pthread_cond_wait(queueNotFull, mp);
150  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
151  ret = pthread_mutex_unlock(mp);
152  FW_ASSERT(ret == 0, errno);
153  continue;
154  }
155  default:
156  return QUEUE_UNKNOWN_ERROR;
157  }
158  } else {
159  keepTrying=false;
160  // Wake up a thread that might be waiting on the other end of the queue:
161  ret = pthread_cond_signal(queueNotEmpty);
162  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
163  }
164  }
165 
166  return QUEUE_OK;
167  }
168 
169  Queue::QueueStatus Queue::receive(U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority, QueueBlocking block) {
170 
171  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
172  mqd_t handle = queueHandle->handle;
173  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
174  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
175  pthread_mutex_t* mp = &queueHandle->mp;
176 
177  if (-1 == handle) {
178  return QUEUE_UNINITIALIZED;
179  }
180 
181  ssize_t size;
182  int ret;
183  bool notFinished = true;
184  while (notFinished) {
185  size = mq_receive(handle, (char*) buffer, (size_t) capacity,
186 #ifdef TGT_OS_TYPE_VXWORKS
187  (int*)&priority);
188 #else
189  (unsigned int*) &priority);
190 #endif
191 
192  if (-1 == size) { // error
193  switch (errno) {
194  case EINTR:
195  continue;
196  case EMSGSIZE:
197  return QUEUE_SIZE_MISMATCH;
198  case EAGAIN:
199  if (block == QUEUE_NONBLOCKING) {
200  // no more messages. If we are
201  // non-blocking, return
202  return QUEUE_NO_MORE_MSGS;
203  } else {
204  // Go to sleep until we receive a signal that something was put on the queue:
205  // Note: pthread_cont_wait must be called "with mutex locked by the calling
206  // thread or undefined behavior results." - from the docs
207  ret = pthread_mutex_lock(mp);
208  FW_ASSERT(ret == 0, errno);
209  ret = pthread_cond_wait(queueNotEmpty, mp);
210  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
211  ret = pthread_mutex_unlock(mp);
212  FW_ASSERT(ret == 0, errno);
213  continue;
214  }
215  break;
216  default:
217  return QUEUE_UNKNOWN_ERROR;
218  }
219  }
220  else {
221  notFinished = false;
222  // Wake up a thread that might be waiting on the other end of the queue:
223  ret = pthread_cond_signal(queueNotFull);
224  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
225  }
226  }
227 
228  actualSize = (NATIVE_INT_TYPE) size;
229  return QUEUE_OK;
230  }
231 
232  NATIVE_INT_TYPE Queue::getNumMsgs(void) const {
233  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
234  mqd_t handle = queueHandle->handle;
235 
236  struct mq_attr attr;
237  int status = mq_getattr(handle, &attr);
238  FW_ASSERT(status == 0);
239  return (U32) attr.mq_curmsgs;
240  }
241 
242  NATIVE_INT_TYPE Queue::getMaxMsgs(void) const {
243  //FW_ASSERT(0);
244  return 0;
245  }
246 
248  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
249  mqd_t handle = queueHandle->handle;
250 
251  struct mq_attr attr;
252  int status = mq_getattr(handle, &attr);
253  FW_ASSERT(status == 0);
254  return (U32) attr.mq_maxmsg;
255  }
256 
257  NATIVE_INT_TYPE Queue::getMsgSize(void) const {
258  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
259  mqd_t handle = queueHandle->handle;
260 
261  struct mq_attr attr;
262  int status = mq_getattr(handle, &attr);
263  FW_ASSERT(status == 0);
264  return (U32) attr.mq_msgsize;
265  }
266 
267 }
268 
Os
Definition: File.cpp:7
Os::Queue::QUEUE_FULL
@ QUEUE_FULL
queue was full when attempting to send a message
Definition: Queue.hpp:36
Os::QueueHandle::~QueueHandle
~QueueHandle()
Definition: Queue.cpp:36
Fw::StringBase
Definition: StringType.hpp:23
U8
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.hpp:76
Os::QueueHandle::queueNotEmpty
pthread_cond_t queueNotEmpty
Definition: IPCQueueStub.cpp:48
Os::Queue::QueueStatus
QueueStatus
Definition: Queue.hpp:27
Os::Queue::getQueueSize
NATIVE_INT_TYPE getQueueSize(void) const
get the queue depth (maximum number of messages queue can hold)
Definition: Queue.cpp:249
Os::QueueHandle::queueNotFull
pthread_cond_t queueNotFull
Definition: IPCQueueStub.cpp:49
Os::Queue::m_handle
POINTER_CAST m_handle
handle for implementation specific queue
Definition: Queue.hpp:69
Os::Queue::QUEUE_EMPTY_BUFFER
@ QUEUE_EMPTY_BUFFER
supplied buffer is empty
Definition: Queue.hpp:35
Assert.hpp
Os::Queue::QUEUE_INVALID_PRIORITY
@ QUEUE_INVALID_PRIORITY
invalid priority requested
Definition: Queue.hpp:34
Os::Queue::send
QueueStatus send(const Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE priority, QueueBlocking block)
send a message
Definition: QueueCommon.cpp:13
Os::Queue::Queue
Queue()
Definition: Queue.cpp:39
Os::Queue::QUEUE_SIZE_MISMATCH
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition: Queue.hpp:31
Os::QueueHandle::mp
pthread_mutex_t mp
Definition: Queue.cpp:47
Os::Queue::receive
QueueStatus receive(Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE &priority, QueueBlocking block)
receive a message
Definition: QueueCommon.cpp:22
Os::Queue::QUEUE_UNINITIALIZED
@ QUEUE_UNINITIALIZED
Queue wasn't initialized successfully.
Definition: Queue.hpp:30
Os::Queue::m_name
QueueString m_name
queue name
Definition: Queue.hpp:70
Os::Queue::QUEUE_NO_MORE_MSGS
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition: Queue.hpp:29
FW_ASSERT
#define FW_ASSERT(...)
Definition: Assert.hpp:9
Os::Queue::getMsgSize
NATIVE_INT_TYPE getMsgSize(void) const
get the message size (maximum message size queue can hold)
Definition: Queue.cpp:260
Os::Queue::getNumMsgs
NATIVE_INT_TYPE getNumMsgs(void) const
get the number of messages in the queue
Definition: Queue.cpp:227
Os::Queue::QUEUE_OK
@ QUEUE_OK
message sent/received okay
Definition: Queue.hpp:28
Os::Queue::s_numQueues
static NATIVE_INT_TYPE s_numQueues
tracks number of queues in the system
Definition: Queue.hpp:74
Os::Queue::QUEUE_UNKNOWN_ERROR
@ QUEUE_UNKNOWN_ERROR
Unexpected error; can't match with returns.
Definition: Queue.hpp:37
Os::Queue::QUEUE_NONBLOCKING
@ QUEUE_NONBLOCKING
Queue receive always returns even if there is no message.
Definition: Queue.hpp:42
Os::QueueHandle::handle
mqd_t handle
Definition: IPCQueue.cpp:38
Os::Queue::getMaxMsgs
NATIVE_INT_TYPE getMaxMsgs(void) const
get the maximum number of messages (high watermark)
Definition: Queue.cpp:238
Os::Queue::createInternal
QueueStatus createInternal(const Fw::StringBase &name, NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
create a message queue
Definition: Queue.cpp:51
Os::Queue::~Queue
virtual ~Queue()
Definition: Queue.cpp:76
NATIVE_INT_TYPE
int NATIVE_INT_TYPE
native integer type declaration
Definition: BasicTypes.hpp:29
NULL
#define NULL
NULL.
Definition: BasicTypes.hpp:100
Os::QueueHandle::QueueHandle
QueueHandle(mqd_t m_handle)
Definition: Queue.cpp:25
Queue.hpp