F´ Flight Software - C/C++ Documentation  devel
A framework for building embedded system applications to NASA flight quality standards.
Queue.cpp
Go to the documentation of this file.
1 #include <Fw/Types/Assert.hpp>
2 #include <Os/Queue.hpp>
3 
4 #ifdef TGT_OS_TYPE_VXWORKS
5  #include <vxWorks.h>
6 #endif
7 
8 #ifdef TGT_OS_TYPE_LINUX
9  #include <sys/types.h>
10  #include <unistd.h>
11 #endif
12 
13 #include <mqueue.h>
14 #include <fcntl.h>
15 #include <cerrno>
16 #include <cstring>
17 #include <cstdio>
18 #include <ctime>
19 #include <pthread.h>
20 #include <new>
21 
22 namespace Os {
23 
24  class QueueHandle {
25  public:
26  QueueHandle(mqd_t m_handle) {
27  // Initialize the handle:
28  int ret;
29  ret = pthread_cond_init(&this->queueNotEmpty, nullptr);
30  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
31  ret = pthread_cond_init(&this->queueNotFull, nullptr);
32  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
33  ret = pthread_mutex_init(&this->mp, nullptr);
34  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
35  this->handle = m_handle;
36  }
38  // Destroy the handle:
39  if (-1 != this->handle) {
40  (void) mq_close(this->handle);
41  }
42  (void) pthread_cond_destroy(&this->queueNotEmpty);
43  (void) pthread_mutex_destroy(&this->mp);
44  }
45  mqd_t handle;
46  pthread_cond_t queueNotEmpty;
47  pthread_cond_t queueNotFull;
48  pthread_mutex_t mp;
49  };
50 
51  Queue::Queue() :
52  m_handle(-1) {
53  }
54 
56 
57  this->m_name = "/QP_";
58  this->m_name += name;
59 #ifndef TGT_OS_TYPE_VXWORKS
60  char pid[40];
61  (void)snprintf(pid,sizeof(pid),".%d",getpid());
62  pid[sizeof(pid)-1] = 0;
63  this->m_name += pid;
64 #endif
65  mq_attr att;
66  mqd_t handle;
67 
68  memset(&att,0,sizeof(att));
69  att.mq_maxmsg = depth;
70  att.mq_msgsize = msgSize;
71  att.mq_flags = 0;
72  att.mq_curmsgs = 0;
73 
74  handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL | O_NONBLOCK, 0666, &att);
75 
76  // If queue already exists, then unlink it and try again.
77  if (-1 == handle) {
78  switch (errno) {
79  case EEXIST:
80  (void)mq_unlink(this->m_name.toChar());
81  break;
82  default:
83  return QUEUE_UNINITIALIZED;
84  }
85 
86  handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL, 0666, &att);
87 
88  if (-1 == handle) {
89  return QUEUE_UNINITIALIZED;
90  }
91  }
92 
93  // Set up queue handle:
94  QueueHandle* queueHandle = new(std::nothrow) QueueHandle(handle);
95  if (nullptr == queueHandle) {
96  return QUEUE_UNINITIALIZED;
97  }
98  this->m_handle = reinterpret_cast<POINTER_CAST>(queueHandle);
99 
101 
102  return QUEUE_OK;
103  }
104 
105  Queue::~Queue() {
106  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
107  delete queueHandle;
108  (void) mq_unlink(this->m_name.toChar());
109  }
110 
111  Queue::QueueStatus Queue::send(const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority, QueueBlocking block) {
112 
113  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
114  mqd_t handle = queueHandle->handle;
115  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
116  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
117  pthread_mutex_t* mp = &queueHandle->mp;
118 
119  if (-1 == handle) {
120  return QUEUE_UNINITIALIZED;
121  }
122 
123  if (nullptr == buffer) {
124  return QUEUE_EMPTY_BUFFER;
125  }
126 
127  bool keepTrying = true;
128  int ret;
129  while (keepTrying) {
130  NATIVE_INT_TYPE stat = mq_send(handle, reinterpret_cast<const char*>(buffer), size, priority);
131  if (-1 == stat) {
132  switch (errno) {
133  case EINTR:
134  continue;
135  case EMSGSIZE:
136  return QUEUE_SIZE_MISMATCH;
137  case EINVAL:
138  return QUEUE_INVALID_PRIORITY;
139  case EAGAIN:
140  if (block == QUEUE_NONBLOCKING) {
141  // no more messages. If we are
142  // non-blocking, return
143  return QUEUE_FULL;
144  } else {
145  // Go to sleep until we receive a signal that something was taken off the queue:
146  // Note: pthread_cont_wait must be called "with mutex locked by the calling
147  // thread or undefined behavior results." - from the docs
148  ret = pthread_mutex_lock(mp);
149  FW_ASSERT(ret == 0, errno);
150  ret = pthread_cond_wait(queueNotFull, mp);
151  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
152  ret = pthread_mutex_unlock(mp);
153  FW_ASSERT(ret == 0, errno);
154  continue;
155  }
156  default:
157  return QUEUE_UNKNOWN_ERROR;
158  }
159  } else {
160  keepTrying=false;
161  // Wake up a thread that might be waiting on the other end of the queue:
162  ret = pthread_cond_signal(queueNotEmpty);
163  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
164  }
165  }
166 
167  return QUEUE_OK;
168  }
169 
170  Queue::QueueStatus Queue::receive(U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority, QueueBlocking block) {
171 
172  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
173  mqd_t handle = queueHandle->handle;
174  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
175  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
176  pthread_mutex_t* mp = &queueHandle->mp;
177 
178  if (-1 == handle) {
179  return QUEUE_UNINITIALIZED;
180  }
181 
182  ssize_t size;
183  int ret;
184  bool notFinished = true;
185  while (notFinished) {
186  size = mq_receive(handle, static_cast<char*>(buffer), static_cast<size_t>(capacity),
187 #ifdef TGT_OS_TYPE_VXWORKS
188  reinterpret_cast<int*>(&priority));
189 #else
190  reinterpret_cast<unsigned int*>(&priority));
191 #endif
192 
193  if (-1 == size) { // error
194  switch (errno) {
195  case EINTR:
196  continue;
197  case EMSGSIZE:
198  return QUEUE_SIZE_MISMATCH;
199  case EAGAIN:
200  if (block == QUEUE_NONBLOCKING) {
201  // no more messages. If we are
202  // non-blocking, return
203  return QUEUE_NO_MORE_MSGS;
204  } else {
205  // Go to sleep until we receive a signal that something was put on the queue:
206  // Note: pthread_cont_wait must be called "with mutex locked by the calling
207  // thread or undefined behavior results." - from the docs
208  ret = pthread_mutex_lock(mp);
209  FW_ASSERT(ret == 0, errno);
210  ret = pthread_cond_wait(queueNotEmpty, mp);
211  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
212  ret = pthread_mutex_unlock(mp);
213  FW_ASSERT(ret == 0, errno);
214  continue;
215  }
216  break;
217  default:
218  return QUEUE_UNKNOWN_ERROR;
219  }
220  }
221  else {
222  notFinished = false;
223  // Wake up a thread that might be waiting on the other end of the queue:
224  ret = pthread_cond_signal(queueNotFull);
225  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
226  }
227  }
228 
229  actualSize = static_cast<NATIVE_INT_TYPE>(size);
230  return QUEUE_OK;
231  }
232 
234  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
235  mqd_t handle = queueHandle->handle;
236 
237  struct mq_attr attr;
238  int status = mq_getattr(handle, &attr);
239  FW_ASSERT(status == 0);
240  return static_cast<U32>(attr.mq_curmsgs);
241  }
242 
244  //FW_ASSERT(0);
245  return 0;
246  }
247 
249  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
250  mqd_t handle = queueHandle->handle;
251 
252  struct mq_attr attr;
253  int status = mq_getattr(handle, &attr);
254  FW_ASSERT(status == 0);
255  return static_cast<U32>(attr.mq_maxmsg);
256  }
257 
259  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
260  mqd_t handle = queueHandle->handle;
261 
262  struct mq_attr attr;
263  int status = mq_getattr(handle, &attr);
264  FW_ASSERT(status == 0);
265  return static_cast<U32>(attr.mq_msgsize);
266  }
267 
268 }
269 
#define FW_ASSERT(...)
Definition: Assert.hpp:14
PlatformPointerCastType POINTER_CAST
Definition: BasicTypes.h:53
PlatformIntType NATIVE_INT_TYPE
Definition: BasicTypes.h:51
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.h:26
pthread_cond_t queueNotEmpty
pthread_mutex_t mp
Definition: Queue.cpp:48
QueueHandle(mqd_t m_handle)
Definition: Queue.cpp:26
pthread_cond_t queueNotFull
QueueString m_name
queue name
Definition: Queue.hpp:76
NATIVE_INT_TYPE getMsgSize() const
get the message size (maximum message size queue can hold)
Definition: Queue.cpp:244
QueueStatus
Definition: Queue.hpp:27
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition: Queue.hpp:31
@ QUEUE_UNINITIALIZED
Queue wasn't initialized successfully.
Definition: Queue.hpp:30
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition: Queue.hpp:29
@ QUEUE_INVALID_PRIORITY
invalid priority requested
Definition: Queue.hpp:34
@ QUEUE_EMPTY_BUFFER
supplied buffer is empty
Definition: Queue.hpp:35
@ QUEUE_UNKNOWN_ERROR
Unexpected error; can't match with returns.
Definition: Queue.hpp:37
@ QUEUE_OK
message sent/received okay
Definition: Queue.hpp:28
@ QUEUE_FULL
queue was full when attempting to send a message
Definition: Queue.hpp:36
QueueStatus send(const Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE priority, QueueBlocking block)
send a message
Definition: QueueCommon.cpp:13
Queue()
Definition: Queue.cpp:38
QueueStatus createInternal(const Fw::StringBase &name, NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
create a message queue
Definition: Queue.cpp:42
QueueStatus receive(Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE &priority, QueueBlocking block)
receive a message
Definition: QueueCommon.cpp:22
NATIVE_INT_TYPE getQueueSize() const
get the queue depth (maximum number of messages queue can hold)
Definition: Queue.cpp:233
NATIVE_INT_TYPE getMaxMsgs() const
get the maximum number of messages (high watermark)
Definition: Queue.cpp:222
@ QUEUE_NONBLOCKING
Queue receive always returns even if there is no message.
Definition: Queue.hpp:42
NATIVE_INT_TYPE getNumMsgs() const
get the number of messages in the queue
Definition: Queue.cpp:211
virtual ~Queue()
Definition: Queue.cpp:65
static NATIVE_INT_TYPE s_numQueues
tracks number of queues in the system
Definition: Queue.hpp:80
POINTER_CAST m_handle
handle for implementation specific queue
Definition: Queue.hpp:75