F´ Flight Software - C/C++ Documentation  devel
A framework for building embedded system applications to NASA flight quality standards.
IPCQueue.cpp
Go to the documentation of this file.
1 #include <Fw/Types/Assert.hpp>
2 #include <Os/Queue.hpp>
3 #include <Os/IPCQueue.hpp>
4 
5 #ifdef TGT_OS_TYPE_VXWORKS
6  #include <vxWorks.h>
7 #endif
8 
9 #ifdef TGT_OS_TYPE_LINUX
10  #include <sys/types.h>
11  #include <unistd.h>
12 #endif
13 
14 #include <mqueue.h>
15 #include <fcntl.h>
16 #include <cerrno>
17 #include <cstring>
18 #include <cstdio>
19 #include <ctime>
20 #include <sys/time.h>
21 #include <pthread.h>
22 #include <new>
23 
24 #define IPC_QUEUE_TIMEOUT_SEC (1)
25 
26 namespace Os {
27 
28  class QueueHandle {
29  public:
30  QueueHandle(mqd_t m_handle) {
31  this->handle = m_handle;
32  }
34  // Destroy the handle:
35  if (-1 != this->handle) {
36  (void) mq_close(this->handle);
37  }
38  }
39  mqd_t handle;
40  };
41 
43  }
44 
46 
47  this->m_name = "/QP_";
48  this->m_name += name;
49 #ifndef TGT_OS_TYPE_VXWORKS
50  char pid[40];
51  (void)snprintf(pid,sizeof(pid),".%d",getpid());
52  pid[sizeof(pid)-1] = 0;
53  this->m_name += pid;
54 #endif
55  mq_attr att;
56  mqd_t handle;
57 
58  memset(&att,0,sizeof(att));
59  att.mq_maxmsg = depth;
60  att.mq_msgsize = msgSize;
61  att.mq_flags = 0;
62  att.mq_curmsgs = 0;
63 
64  /* NOTE(mereweth) - O_BLOCK is the default; we use timedsend and
65  * timedreceive below if QUEUE_NONBLOCKING is specified
66  *
67  */
68  handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL, 0666, &att);
69 
70  // If queue already exists, then unlink it and try again.
71  if (-1 == handle) {
72  switch (errno) {
73  case EEXIST:
74  (void)mq_unlink(this->m_name.toChar());
75  break;
76  default:
77  return QUEUE_UNINITIALIZED;
78  }
79 
80  handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL, 0666, &att);
81 
82  if (-1 == handle) {
83  return QUEUE_UNINITIALIZED;
84  }
85  }
86 
87  // Set up queue handle:
88  QueueHandle* queueHandle = new(std::nothrow) QueueHandle(handle);
89  if (nullptr == queueHandle) {
90  return QUEUE_UNINITIALIZED;
91  }
92  this->m_handle = reinterpret_cast<POINTER_CAST>(queueHandle);
93 
95 
96  return QUEUE_OK;
97  }
98 
100  // Clean up the queue handle:
101  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
102  if (nullptr != queueHandle) {
103  delete queueHandle;
104  }
105  this->m_handle = reinterpret_cast<POINTER_CAST>(nullptr); // important so base Queue class doesn't free it
106  (void) mq_unlink(this->m_name.toChar());
107  }
108 
109  Queue::QueueStatus IPCQueue::send(const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority, QueueBlocking block) {
110 
111  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
112  mqd_t handle = queueHandle->handle;
113 
114  if (-1 == handle) {
115  return QUEUE_UNINITIALIZED;
116  }
117 
118  if (nullptr == buffer) {
119  return QUEUE_EMPTY_BUFFER;
120  }
121 
122  bool keepTrying = true;
123  while (keepTrying) {
124  struct timeval now;
125  gettimeofday(&now,nullptr);
126  struct timespec wait;
127  wait.tv_sec = now.tv_sec;
128  wait.tv_nsec = now.tv_usec * 1000;
129 
130  if (block == QUEUE_BLOCKING) {
131  wait.tv_sec += IPC_QUEUE_TIMEOUT_SEC;
132  }
133 
134  NATIVE_INT_TYPE stat = mq_timedsend(
135  handle,
136  reinterpret_cast<const char*>(buffer),
137  static_cast<size_t>(size),
138  static_cast<unsigned int>(priority),
139  &wait);
140 
141  if (-1 == stat) {
142  switch (errno) {
143  case EINTR:
144  continue;
145  case EMSGSIZE:
146  return QUEUE_SIZE_MISMATCH;
147  case EINVAL:
148  return QUEUE_INVALID_PRIORITY;
149  case ETIMEDOUT:
150  if (block == QUEUE_NONBLOCKING) {
151  // no more messages. If we are
152  // non-blocking, return
153  return QUEUE_FULL;
154  } else {
155  // TODO(mereweth) - multiprocess signalling necessary?
156  // Go to sleep until we receive a signal that something was taken off the queue
157  continue;
158  }
159  default:
160  return QUEUE_UNKNOWN_ERROR;
161  }
162  } else {
163  keepTrying=false;
164  // TODO(mereweth) - multiprocess signalling necessary?
165  // Wake up a thread that might be waiting on the other end of the queue:
166  }
167  }
168 
169  return QUEUE_OK;
170  }
171 
172  Queue::QueueStatus IPCQueue::receive(U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority, QueueBlocking block) {
173 
174  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
175  mqd_t handle = queueHandle->handle;
176 
177  if (-1 == handle) {
178  return QUEUE_UNINITIALIZED;
179  }
180 
181  ssize_t size;
182  bool notFinished = true;
183  while (notFinished) {
184  struct timeval now;
185  gettimeofday(&now,nullptr);
186  struct timespec wait;
187  wait.tv_sec = now.tv_sec;
188  wait.tv_nsec = now.tv_usec * 1000;
189 
190  if (block == QUEUE_BLOCKING) {
191  wait.tv_sec += IPC_QUEUE_TIMEOUT_SEC;
192  }
193  size = mq_timedreceive(handle, reinterpret_cast<char*>(buffer), static_cast<size_t>(capacity),
194 #ifdef TGT_OS_TYPE_VXWORKS
195  reinterpret_cast<int*>(&priority), &wait);
196 #else
197  reinterpret_cast<unsigned int*>(&priority), &wait);
198 #endif
199 
200  if (-1 == size) { // error
201  switch (errno) {
202  case EINTR:
203  continue;
204  case EMSGSIZE:
205  return QUEUE_SIZE_MISMATCH;
206  case ETIMEDOUT:
207  if (block == QUEUE_NONBLOCKING) {
208  // no more messages. If we are
209  // non-blocking, return
210  return QUEUE_NO_MORE_MSGS;
211  } else {
212  // TODO(mereweth) - multiprocess signalling necessary?
213  // Go to sleep until we receive a signal that something was put on the queue:
214  continue;
215  }
216  break;
217  default:
218  return QUEUE_UNKNOWN_ERROR;
219  }
220  }
221  else {
222  notFinished = false;
223  // TODO(mereweth) - multiprocess signalling necessary?
224  // Wake up a thread that might be waiting on the other end of the queue
225  }
226  }
227 
228  actualSize = static_cast<NATIVE_INT_TYPE>(size);
229  return QUEUE_OK;
230  }
231 
233  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
234  mqd_t handle = queueHandle->handle;
235 
236  struct mq_attr attr;
237  int status = mq_getattr(handle, &attr);
238  FW_ASSERT(status == 0);
239  return static_cast<NATIVE_INT_TYPE>(attr.mq_curmsgs);
240  }
241 
243  //FW_ASSERT(0);
244  return 0;
245  }
246 
248  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
249  mqd_t handle = queueHandle->handle;
250 
251  struct mq_attr attr;
252  int status = mq_getattr(handle, &attr);
253  FW_ASSERT(status == 0);
254  return static_cast<NATIVE_INT_TYPE>(attr.mq_maxmsg);
255  }
256 
258  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
259  mqd_t handle = queueHandle->handle;
260 
261  struct mq_attr attr;
262  int status = mq_getattr(handle, &attr);
263  FW_ASSERT(status == 0);
264  return static_cast<NATIVE_INT_TYPE>(attr.mq_msgsize);
265  }
266 
267 }
#define FW_ASSERT(...)
Definition: Assert.hpp:14
PlatformPointerCastType POINTER_CAST
Definition: BasicTypes.h:53
PlatformIntType NATIVE_INT_TYPE
Definition: BasicTypes.h:51
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.h:26
#define IPC_QUEUE_TIMEOUT_SEC
Definition: IPCQueue.cpp:24
NATIVE_INT_TYPE getQueueSize() const
get the queue depth (maximum number of messages queue can hold)
NATIVE_INT_TYPE getMsgSize() const
get the message size (maximum message size queue can hold)
QueueStatus create(const Fw::StringBase &name, NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
create a message queue
QueueStatus send(const Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE priority, QueueBlocking block)
send a message
NATIVE_INT_TYPE getNumMsgs() const
get the number of messages in the queue
QueueStatus receive(Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE &priority, QueueBlocking block)
receive a message
NATIVE_INT_TYPE getMaxMsgs() const
get the maximum number of messages (high watermark)
QueueHandle(mqd_t m_handle)
Definition: IPCQueue.cpp:30
QueueString m_name
queue name
Definition: Queue.hpp:76
QueueStatus
Definition: Queue.hpp:27
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition: Queue.hpp:31
@ QUEUE_UNINITIALIZED
Queue wasn't initialized successfully.
Definition: Queue.hpp:30
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition: Queue.hpp:29
@ QUEUE_INVALID_PRIORITY
invalid priority requested
Definition: Queue.hpp:34
@ QUEUE_EMPTY_BUFFER
supplied buffer is empty
Definition: Queue.hpp:35
@ QUEUE_UNKNOWN_ERROR
Unexpected error; can't match with returns.
Definition: Queue.hpp:37
@ QUEUE_OK
message sent/received okay
Definition: Queue.hpp:28
@ QUEUE_FULL
queue was full when attempting to send a message
Definition: Queue.hpp:36
@ QUEUE_BLOCKING
Queue receive blocks until a message arrives.
Definition: Queue.hpp:41
@ QUEUE_NONBLOCKING
Queue receive always returns even if there is no message.
Definition: Queue.hpp:42
static NATIVE_INT_TYPE s_numQueues
tracks number of queues in the system
Definition: Queue.hpp:80
POINTER_CAST m_handle
handle for implementation specific queue
Definition: Queue.hpp:75