F´ Flight Software - C/C++ Documentation  NASA-v1.5.0
A framework for building embedded system applications to NASA flight quality standards.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Queue.cpp
Go to the documentation of this file.
1 #include <Fw/Types/Assert.hpp>
2 #include <Os/Queue.hpp>
3 
4 #ifdef TGT_OS_TYPE_VXWORKS
5  #include <vxWorks.h>
6 #endif
7 
8 #ifdef TGT_OS_TYPE_LINUX
9  #include <sys/types.h>
10  #include <unistd.h>
11 #endif
12 
13 #include <mqueue.h>
14 #include <fcntl.h>
15 #include <errno.h>
16 #include <string.h>
17 #include <stdio.h>
18 #include <time.h>
19 #include <pthread.h>
20 
21 namespace Os {
22 
23  class QueueHandle {
24  public:
25  QueueHandle(mqd_t m_handle) {
26  // Initialize the handle:
27  int ret;
28  ret = pthread_cond_init(&this->queueNotEmpty, NULL);
29  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
30  ret = pthread_cond_init(&this->queueNotFull, NULL);
31  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
32  ret = pthread_mutex_init(&this->mp, NULL);
33  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
34  this->handle = m_handle;
35  }
37  // Destroy the handle:
38  if (-1 != this->handle) {
39  (void) mq_close(this->handle);
40  }
41  (void) pthread_cond_destroy(&this->queueNotEmpty);
42  (void) pthread_mutex_destroy(&this->mp);
43  }
44  mqd_t handle;
45  pthread_cond_t queueNotEmpty;
46  pthread_cond_t queueNotFull;
47  pthread_mutex_t mp;
48  };
49 
50  Queue::Queue() :
51  m_handle(-1) {
52  }
53 
55 
56  this->m_name = "/QP_";
57  this->m_name += name;
58 #ifndef TGT_OS_TYPE_VXWORKS
59  char pid[40];
60  (void)snprintf(pid,sizeof(pid),".%d",getpid());
61  pid[sizeof(pid)-1] = 0;
62  this->m_name += pid;
63 #endif
64  mq_attr att;
65  mqd_t handle;
66 
67  memset(&att,0,sizeof(att));
68  att.mq_maxmsg = depth;
69  att.mq_msgsize = msgSize;
70  att.mq_flags = 0;
71  att.mq_curmsgs = 0;
72 
73  handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL | O_NONBLOCK, 0666, &att);
74 
75  // If queue already exists, then unlink it and try again.
76  if (-1 == (NATIVE_INT_TYPE) handle) {
77  switch (errno) {
78  case EEXIST:
79  (void)mq_unlink(this->m_name.toChar());
80  break;
81  default:
82  return QUEUE_UNINITIALIZED;
83  }
84 
85  handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL, 0666, &att);
86 
87  if (-1 == (NATIVE_INT_TYPE) handle) {
88  return QUEUE_UNINITIALIZED;
89  }
90  }
91 
92  // Set up queue handle:
93  QueueHandle* queueHandle = new QueueHandle(handle);
94  if (NULL == queueHandle) {
95  return QUEUE_UNINITIALIZED;
96  }
97  this->m_handle = (POINTER_CAST) queueHandle;
98 
100 
101  return QUEUE_OK;
102  }
103 
104  Queue::~Queue() {
105  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
106  delete queueHandle;
107  (void) mq_unlink(this->m_name.toChar());
108  }
109 
110  Queue::QueueStatus Queue::send(const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority, QueueBlocking block) {
111 
112  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
113  mqd_t handle = queueHandle->handle;
114  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
115  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
116  pthread_mutex_t* mp = &queueHandle->mp;
117 
118  if (-1 == handle) {
119  return QUEUE_UNINITIALIZED;
120  }
121 
122  if (NULL == buffer) {
123  return QUEUE_EMPTY_BUFFER;
124  }
125 
126  bool keepTrying = true;
127  int ret;
128  while (keepTrying) {
129  NATIVE_INT_TYPE stat = mq_send(handle, (const char*) buffer, size, priority);
130  if (-1 == stat) {
131  switch (errno) {
132  case EINTR:
133  continue;
134  case EMSGSIZE:
135  return QUEUE_SIZE_MISMATCH;
136  case EINVAL:
137  return QUEUE_INVALID_PRIORITY;
138  case EAGAIN:
139  if (block == QUEUE_NONBLOCKING) {
140  // no more messages. If we are
141  // non-blocking, return
142  return QUEUE_FULL;
143  } else {
144  // Go to sleep until we receive a signal that something was takeng off the queue:
145  // Note: pthread_cont_wait must be called "with mutex locked by the calling
146  // thread or undefined behavior results." - from the docs
147  ret = pthread_mutex_lock(mp);
148  FW_ASSERT(ret == 0, errno);
149  ret = pthread_cond_wait(queueNotFull, mp);
150  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
151  ret = pthread_mutex_unlock(mp);
152  FW_ASSERT(ret == 0, errno);
153  continue;
154  }
155  default:
156  return QUEUE_UNKNOWN_ERROR;
157  }
158  } else {
159  keepTrying=false;
160  // Wake up a thread that might be waiting on the other end of the queue:
161  ret = pthread_cond_signal(queueNotEmpty);
162  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
163  }
164  }
165 
166  return QUEUE_OK;
167  }
168 
169  Queue::QueueStatus Queue::receive(U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority, QueueBlocking block) {
170 
171  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
172  mqd_t handle = queueHandle->handle;
173  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
174  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
175  pthread_mutex_t* mp = &queueHandle->mp;
176 
177  if (-1 == handle) {
178  return QUEUE_UNINITIALIZED;
179  }
180 
181  ssize_t size;
182  int ret;
183  bool notFinished = true;
184  while (notFinished) {
185  size = mq_receive(handle, (char*) buffer, (size_t) capacity,
186 #ifdef TGT_OS_TYPE_VXWORKS
187  (int*)&priority);
188 #else
189  (unsigned int*) &priority);
190 #endif
191 
192  if (-1 == size) { // error
193  switch (errno) {
194  case EINTR:
195  continue;
196  case EMSGSIZE:
197  return QUEUE_SIZE_MISMATCH;
198  case EAGAIN:
199  if (block == QUEUE_NONBLOCKING) {
200  // no more messages. If we are
201  // non-blocking, return
202  return QUEUE_NO_MORE_MSGS;
203  } else {
204  // Go to sleep until we receive a signal that something was put on the queue:
205  // Note: pthread_cont_wait must be called "with mutex locked by the calling
206  // thread or undefined behavior results." - from the docs
207  ret = pthread_mutex_lock(mp);
208  FW_ASSERT(ret == 0, errno);
209  ret = pthread_cond_wait(queueNotEmpty, mp);
210  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
211  ret = pthread_mutex_unlock(mp);
212  FW_ASSERT(ret == 0, errno);
213  continue;
214  }
215  break;
216  default:
217  return QUEUE_UNKNOWN_ERROR;
218  }
219  }
220  else {
221  notFinished = false;
222  // Wake up a thread that might be waiting on the other end of the queue:
223  ret = pthread_cond_signal(queueNotFull);
224  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
225  }
226  }
227 
228  actualSize = (NATIVE_INT_TYPE) size;
229  return QUEUE_OK;
230  }
231 
232  NATIVE_INT_TYPE Queue::getNumMsgs(void) const {
233  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
234  mqd_t handle = queueHandle->handle;
235 
236  struct mq_attr attr;
237  int status = mq_getattr(handle, &attr);
238  FW_ASSERT(status == 0);
239  return (U32) attr.mq_curmsgs;
240  }
241 
242  NATIVE_INT_TYPE Queue::getMaxMsgs(void) const {
243  //FW_ASSERT(0);
244  return 0;
245  }
246 
248  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
249  mqd_t handle = queueHandle->handle;
250 
251  struct mq_attr attr;
252  int status = mq_getattr(handle, &attr);
253  FW_ASSERT(status == 0);
254  return (U32) attr.mq_maxmsg;
255  }
256 
257  NATIVE_INT_TYPE Queue::getMsgSize(void) const {
258  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
259  mqd_t handle = queueHandle->handle;
260 
261  struct mq_attr attr;
262  int status = mq_getattr(handle, &attr);
263  FW_ASSERT(status == 0);
264  return (U32) attr.mq_msgsize;
265  }
266 
267 }
268 
Os
Definition: File.cpp:7
Os::Queue::QUEUE_FULL
@ QUEUE_FULL
queue was full when attempting to send a message
Definition: Queue.hpp:36
Os::QueueHandle::~QueueHandle
~QueueHandle()
Definition: Queue.cpp:36
Fw::StringBase
Definition: StringType.hpp:23
U8
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.hpp:76
Os::QueueHandle::queueNotEmpty
pthread_cond_t queueNotEmpty
Definition: IPCQueueStub.cpp:48
Os::Queue::QueueStatus
QueueStatus
Definition: Queue.hpp:27
Os::Queue::getQueueSize
NATIVE_INT_TYPE getQueueSize(void) const
get the queue depth (maximum number of messages queue can hold)
Definition: Queue.cpp:249
Os::QueueHandle::queueNotFull
pthread_cond_t queueNotFull
Definition: IPCQueueStub.cpp:49
Os::Queue::m_handle
POINTER_CAST m_handle
handle for implementation specific queue
Definition: Queue.hpp:69
Os::Queue::QUEUE_EMPTY_BUFFER
@ QUEUE_EMPTY_BUFFER
supplied buffer is empty
Definition: Queue.hpp:35
Assert.hpp
Os::Queue::QUEUE_INVALID_PRIORITY
@ QUEUE_INVALID_PRIORITY
invalid priority requested
Definition: Queue.hpp:34
Os::Queue::send
QueueStatus send(const Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE priority, QueueBlocking block)
send a message
Definition: QueueCommon.cpp:13
Os::Queue::Queue
Queue()
Definition: Queue.cpp:39
Os::Queue::QUEUE_SIZE_MISMATCH
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition: Queue.hpp:31
Os::QueueHandle::mp
pthread_mutex_t mp
Definition: Queue.cpp:47
Os::Queue::receive
QueueStatus receive(Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE &priority, QueueBlocking block)
receive a message
Definition: QueueCommon.cpp:22
Os::Queue::QUEUE_UNINITIALIZED
@ QUEUE_UNINITIALIZED
Queue wasn't initialized successfully.
Definition: Queue.hpp:30
Os::Queue::m_name
QueueString m_name
queue name
Definition: Queue.hpp:70
Os::Queue::QUEUE_NO_MORE_MSGS
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition: Queue.hpp:29
FW_ASSERT
#define FW_ASSERT(...)
Definition: Assert.hpp:9
Os::Queue::getMsgSize
NATIVE_INT_TYPE getMsgSize(void) const
get the message size (maximum message size queue can hold)
Definition: Queue.cpp:260
Os::Queue::getNumMsgs
NATIVE_INT_TYPE getNumMsgs(void) const
get the number of messages in the queue
Definition: Queue.cpp:227
Os::Queue::QUEUE_OK
@ QUEUE_OK
message sent/received okay
Definition: Queue.hpp:28
Os::Queue::s_numQueues
static NATIVE_INT_TYPE s_numQueues
tracks number of queues in the system
Definition: Queue.hpp:74
Os::Queue::QUEUE_UNKNOWN_ERROR
@ QUEUE_UNKNOWN_ERROR
Unexpected error; can't match with returns.
Definition: Queue.hpp:37
Os::Queue::QUEUE_NONBLOCKING
@ QUEUE_NONBLOCKING
Queue receive always returns even if there is no message.
Definition: Queue.hpp:42
Os::QueueHandle::handle
mqd_t handle
Definition: IPCQueue.cpp:38
Os::Queue::getMaxMsgs
NATIVE_INT_TYPE getMaxMsgs(void) const
get the maximum number of messages (high watermark)
Definition: Queue.cpp:238
Os::Queue::createInternal
QueueStatus createInternal(const Fw::StringBase &name, NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
create a message queue
Definition: Queue.cpp:51
Os::Queue::~Queue
virtual ~Queue()
Definition: Queue.cpp:76
NATIVE_INT_TYPE
int NATIVE_INT_TYPE
native integer type declaration
Definition: BasicTypes.hpp:29
NULL
#define NULL
NULL.
Definition: BasicTypes.hpp:100
Os::QueueHandle::QueueHandle
QueueHandle(mqd_t m_handle)
Definition: Queue.cpp:25
Queue.hpp