F´ Flight Software - C/C++ Documentation  NASA-v1.6.0
A framework for building embedded system applications to NASA flight quality standards.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Queue.cpp
Go to the documentation of this file.
1 #include <Fw/Types/Assert.hpp>
2 #include <Os/Queue.hpp>
3 
4 #ifdef TGT_OS_TYPE_VXWORKS
5  #include <vxWorks.h>
6 #endif
7 
8 #ifdef TGT_OS_TYPE_LINUX
9  #include <sys/types.h>
10  #include <unistd.h>
11 #endif
12 
13 #include <mqueue.h>
14 #include <fcntl.h>
15 #include <cerrno>
16 #include <cstring>
17 #include <cstdio>
18 #include <ctime>
19 #include <pthread.h>
20 #include <new>
21 
22 namespace Os {
23 
24  class QueueHandle {
25  public:
26  QueueHandle(mqd_t m_handle) {
27  // Initialize the handle:
28  int ret;
29  ret = pthread_cond_init(&this->queueNotEmpty, nullptr);
30  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
31  ret = pthread_cond_init(&this->queueNotFull, nullptr);
32  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
33  ret = pthread_mutex_init(&this->mp, nullptr);
34  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
35  this->handle = m_handle;
36  }
38  // Destroy the handle:
39  if (-1 != this->handle) {
40  (void) mq_close(this->handle);
41  }
42  (void) pthread_cond_destroy(&this->queueNotEmpty);
43  (void) pthread_mutex_destroy(&this->mp);
44  }
45  mqd_t handle;
46  pthread_cond_t queueNotEmpty;
47  pthread_cond_t queueNotFull;
48  pthread_mutex_t mp;
49  };
50 
51  Queue::Queue() :
52  m_handle(-1) {
53  }
54 
56 
57  this->m_name = "/QP_";
58  this->m_name += name;
59 #ifndef TGT_OS_TYPE_VXWORKS
60  char pid[40];
61  (void)snprintf(pid,sizeof(pid),".%d",getpid());
62  pid[sizeof(pid)-1] = 0;
63  this->m_name += pid;
64 #endif
65  mq_attr att;
66  mqd_t handle;
67 
68  memset(&att,0,sizeof(att));
69  att.mq_maxmsg = depth;
70  att.mq_msgsize = msgSize;
71  att.mq_flags = 0;
72  att.mq_curmsgs = 0;
73 
74  handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL | O_NONBLOCK, 0666, &att);
75 
76  // If queue already exists, then unlink it and try again.
77  if (-1 == handle) {
78  switch (errno) {
79  case EEXIST:
80  (void)mq_unlink(this->m_name.toChar());
81  break;
82  default:
83  return QUEUE_UNINITIALIZED;
84  }
85 
86  handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL, 0666, &att);
87 
88  if (-1 == handle) {
89  return QUEUE_UNINITIALIZED;
90  }
91  }
92 
93  // Set up queue handle:
94  QueueHandle* queueHandle = new(std::nothrow) QueueHandle(handle);
95  if (nullptr == queueHandle) {
96  return QUEUE_UNINITIALIZED;
97  }
98  this->m_handle = reinterpret_cast<POINTER_CAST>(queueHandle);
99 
101 
102  return QUEUE_OK;
103  }
104 
105  Queue::~Queue() {
106  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
107  delete queueHandle;
108  (void) mq_unlink(this->m_name.toChar());
109  }
110 
111  Queue::QueueStatus Queue::send(const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority, QueueBlocking block) {
112 
113  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
114  mqd_t handle = queueHandle->handle;
115  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
116  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
117  pthread_mutex_t* mp = &queueHandle->mp;
118 
119  if (-1 == handle) {
120  return QUEUE_UNINITIALIZED;
121  }
122 
123  if (nullptr == buffer) {
124  return QUEUE_EMPTY_BUFFER;
125  }
126 
127  bool keepTrying = true;
128  int ret;
129  while (keepTrying) {
130  NATIVE_INT_TYPE stat = mq_send(handle, reinterpret_cast<const char*>(buffer), size, priority);
131  if (-1 == stat) {
132  switch (errno) {
133  case EINTR:
134  continue;
135  case EMSGSIZE:
136  return QUEUE_SIZE_MISMATCH;
137  case EINVAL:
138  return QUEUE_INVALID_PRIORITY;
139  case EAGAIN:
140  if (block == QUEUE_NONBLOCKING) {
141  // no more messages. If we are
142  // non-blocking, return
143  return QUEUE_FULL;
144  } else {
145  // Go to sleep until we receive a signal that something was taken off the queue:
146  // Note: pthread_cont_wait must be called "with mutex locked by the calling
147  // thread or undefined behavior results." - from the docs
148  ret = pthread_mutex_lock(mp);
149  FW_ASSERT(ret == 0, errno);
150  ret = pthread_cond_wait(queueNotFull, mp);
151  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
152  ret = pthread_mutex_unlock(mp);
153  FW_ASSERT(ret == 0, errno);
154  continue;
155  }
156  default:
157  return QUEUE_UNKNOWN_ERROR;
158  }
159  } else {
160  keepTrying=false;
161  // Wake up a thread that might be waiting on the other end of the queue:
162  ret = pthread_cond_signal(queueNotEmpty);
163  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
164  }
165  }
166 
167  return QUEUE_OK;
168  }
169 
170  Queue::QueueStatus Queue::receive(U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority, QueueBlocking block) {
171 
172  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
173  mqd_t handle = queueHandle->handle;
174  pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
175  pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
176  pthread_mutex_t* mp = &queueHandle->mp;
177 
178  if (-1 == handle) {
179  return QUEUE_UNINITIALIZED;
180  }
181 
182  ssize_t size;
183  int ret;
184  bool notFinished = true;
185  while (notFinished) {
186  size = mq_receive(handle, static_cast<char*>(buffer), static_cast<size_t>(capacity),
187 #ifdef TGT_OS_TYPE_VXWORKS
188  reinterpret_cast<int*>(&priority));
189 #else
190  reinterpret_cast<unsigned int*>(&priority));
191 #endif
192 
193  if (-1 == size) { // error
194  switch (errno) {
195  case EINTR:
196  continue;
197  case EMSGSIZE:
198  return QUEUE_SIZE_MISMATCH;
199  case EAGAIN:
200  if (block == QUEUE_NONBLOCKING) {
201  // no more messages. If we are
202  // non-blocking, return
203  return QUEUE_NO_MORE_MSGS;
204  } else {
205  // Go to sleep until we receive a signal that something was put on the queue:
206  // Note: pthread_cont_wait must be called "with mutex locked by the calling
207  // thread or undefined behavior results." - from the docs
208  ret = pthread_mutex_lock(mp);
209  FW_ASSERT(ret == 0, errno);
210  ret = pthread_cond_wait(queueNotEmpty, mp);
211  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
212  ret = pthread_mutex_unlock(mp);
213  FW_ASSERT(ret == 0, errno);
214  continue;
215  }
216  break;
217  default:
218  return QUEUE_UNKNOWN_ERROR;
219  }
220  }
221  else {
222  notFinished = false;
223  // Wake up a thread that might be waiting on the other end of the queue:
224  ret = pthread_cond_signal(queueNotFull);
225  FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
226  }
227  }
228 
229  actualSize = static_cast<NATIVE_INT_TYPE>(size);
230  return QUEUE_OK;
231  }
232 
234  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
235  mqd_t handle = queueHandle->handle;
236 
237  struct mq_attr attr;
238  int status = mq_getattr(handle, &attr);
239  FW_ASSERT(status == 0);
240  return static_cast<U32>(attr.mq_curmsgs);
241  }
242 
244  //FW_ASSERT(0);
245  return 0;
246  }
247 
249  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
250  mqd_t handle = queueHandle->handle;
251 
252  struct mq_attr attr;
253  int status = mq_getattr(handle, &attr);
254  FW_ASSERT(status == 0);
255  return static_cast<U32>(attr.mq_maxmsg);
256  }
257 
259  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
260  mqd_t handle = queueHandle->handle;
261 
262  struct mq_attr attr;
263  int status = mq_getattr(handle, &attr);
264  FW_ASSERT(status == 0);
265  return static_cast<U32>(attr.mq_msgsize);
266  }
267 
268 }
269 
Os
Definition: File.cpp:7
Os::Queue::QUEUE_FULL
@ QUEUE_FULL
queue was full when attempting to send a message
Definition: Queue.hpp:36
Os::Queue::getMsgSize
NATIVE_INT_TYPE getMsgSize() const
get the message size (maximum message size queue can hold)
Definition: Queue.cpp:244
Os::QueueHandle::~QueueHandle
~QueueHandle()
Definition: Queue.cpp:37
Fw::StringBase
Definition: StringType.hpp:23
U8
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.hpp:76
Os::QueueHandle::queueNotEmpty
pthread_cond_t queueNotEmpty
Definition: IPCQueueStub.cpp:49
Os::Queue::QueueStatus
QueueStatus
Definition: Queue.hpp:27
Os::QueueHandle::queueNotFull
pthread_cond_t queueNotFull
Definition: IPCQueueStub.cpp:50
Os::Queue::getMaxMsgs
NATIVE_INT_TYPE getMaxMsgs() const
get the maximum number of messages (high watermark)
Definition: Queue.cpp:222
Os::Queue::m_handle
POINTER_CAST m_handle
handle for implementation specific queue
Definition: Queue.hpp:75
Os::Queue::QUEUE_EMPTY_BUFFER
@ QUEUE_EMPTY_BUFFER
supplied buffer is empty
Definition: Queue.hpp:35
Os::Queue::getNumMsgs
NATIVE_INT_TYPE getNumMsgs() const
get the number of messages in the queue
Definition: Queue.cpp:211
Assert.hpp
Os::Queue::QUEUE_INVALID_PRIORITY
@ QUEUE_INVALID_PRIORITY
invalid priority requested
Definition: Queue.hpp:34
Os::Queue::send
QueueStatus send(const Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE priority, QueueBlocking block)
send a message
Definition: QueueCommon.cpp:13
Os::Queue::Queue
Queue()
Definition: Queue.cpp:38
Os::Queue::QUEUE_SIZE_MISMATCH
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition: Queue.hpp:31
Os::QueueHandle::mp
pthread_mutex_t mp
Definition: Queue.cpp:48
Os::Queue::receive
QueueStatus receive(Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE &priority, QueueBlocking block)
receive a message
Definition: QueueCommon.cpp:22
Os::Queue::QUEUE_UNINITIALIZED
@ QUEUE_UNINITIALIZED
Queue wasn't initialized successfully.
Definition: Queue.hpp:30
Os::Queue::m_name
QueueString m_name
queue name
Definition: Queue.hpp:76
Os::Queue::QUEUE_NO_MORE_MSGS
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition: Queue.hpp:29
FW_ASSERT
#define FW_ASSERT(...)
Definition: Assert.hpp:8
Os::Queue::QUEUE_OK
@ QUEUE_OK
message sent/received okay
Definition: Queue.hpp:28
Os::Queue::s_numQueues
static NATIVE_INT_TYPE s_numQueues
tracks number of queues in the system
Definition: Queue.hpp:80
Os::Queue::getQueueSize
NATIVE_INT_TYPE getQueueSize() const
get the queue depth (maximum number of messages queue can hold)
Definition: Queue.cpp:233
Os::Queue::QUEUE_UNKNOWN_ERROR
@ QUEUE_UNKNOWN_ERROR
Unexpected error; can't match with returns.
Definition: Queue.hpp:37
Os::Queue::QUEUE_NONBLOCKING
@ QUEUE_NONBLOCKING
Queue receive always returns even if there is no message.
Definition: Queue.hpp:42
Os::QueueHandle::handle
mqd_t handle
Definition: IPCQueue.cpp:39
Os::Queue::createInternal
QueueStatus createInternal(const Fw::StringBase &name, NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
create a message queue
Definition: Queue.cpp:42
Os::Queue::~Queue
virtual ~Queue()
Definition: Queue.cpp:65
NATIVE_INT_TYPE
int NATIVE_INT_TYPE
native integer type declaration
Definition: BasicTypes.hpp:29
Os::QueueHandle::QueueHandle
QueueHandle(mqd_t m_handle)
Definition: Queue.cpp:26
Queue.hpp