F´ Flight Software - C/C++ Documentation  NASA-v1.6.0
A framework for building embedded system applications to NASA flight quality standards.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
IPCQueue.cpp
Go to the documentation of this file.
1 #include <Fw/Types/Assert.hpp>
2 #include <Os/Queue.hpp>
3 #include <Os/IPCQueue.hpp>
4 
5 #ifdef TGT_OS_TYPE_VXWORKS
6  #include <vxWorks.h>
7 #endif
8 
9 #ifdef TGT_OS_TYPE_LINUX
10  #include <sys/types.h>
11  #include <unistd.h>
12 #endif
13 
14 #include <mqueue.h>
15 #include <fcntl.h>
16 #include <cerrno>
17 #include <cstring>
18 #include <cstdio>
19 #include <ctime>
20 #include <sys/time.h>
21 #include <pthread.h>
22 #include <new>
23 
24 #define IPC_QUEUE_TIMEOUT_SEC (1)
25 
26 namespace Os {
27 
28  class QueueHandle {
29  public:
30  QueueHandle(mqd_t m_handle) {
31  this->handle = m_handle;
32  }
34  // Destroy the handle:
35  if (-1 != this->handle) {
36  (void) mq_close(this->handle);
37  }
38  }
39  mqd_t handle;
40  };
41 
43  }
44 
46 
47  this->m_name = "/QP_";
48  this->m_name += name;
49 #ifndef TGT_OS_TYPE_VXWORKS
50  char pid[40];
51  (void)snprintf(pid,sizeof(pid),".%d",getpid());
52  pid[sizeof(pid)-1] = 0;
53  this->m_name += pid;
54 #endif
55  mq_attr att;
56  mqd_t handle;
57 
58  memset(&att,0,sizeof(att));
59  att.mq_maxmsg = depth;
60  att.mq_msgsize = msgSize;
61  att.mq_flags = 0;
62  att.mq_curmsgs = 0;
63 
64  /* NOTE(mereweth) - O_BLOCK is the default; we use timedsend and
65  * timedreceive below if QUEUE_NONBLOCKING is specified
66  *
67  */
68  handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL, 0666, &att);
69 
70  // If queue already exists, then unlink it and try again.
71  if (-1 == handle) {
72  switch (errno) {
73  case EEXIST:
74  (void)mq_unlink(this->m_name.toChar());
75  break;
76  default:
77  return QUEUE_UNINITIALIZED;
78  }
79 
80  handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL, 0666, &att);
81 
82  if (-1 == handle) {
83  return QUEUE_UNINITIALIZED;
84  }
85  }
86 
87  // Set up queue handle:
88  QueueHandle* queueHandle = new(std::nothrow) QueueHandle(handle);
89  if (nullptr == queueHandle) {
90  return QUEUE_UNINITIALIZED;
91  }
92  this->m_handle = reinterpret_cast<POINTER_CAST>(queueHandle);
93 
95 
96  return QUEUE_OK;
97  }
98 
100  // Clean up the queue handle:
101  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
102  if (nullptr != queueHandle) {
103  delete queueHandle;
104  }
105  this->m_handle = reinterpret_cast<POINTER_CAST>(nullptr); // important so base Queue class doesn't free it
106  (void) mq_unlink(this->m_name.toChar());
107  }
108 
109  Queue::QueueStatus IPCQueue::send(const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority, QueueBlocking block) {
110 
111  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
112  mqd_t handle = queueHandle->handle;
113 
114  if (-1 == handle) {
115  return QUEUE_UNINITIALIZED;
116  }
117 
118  if (nullptr == buffer) {
119  return QUEUE_EMPTY_BUFFER;
120  }
121 
122  bool keepTrying = true;
123  while (keepTrying) {
124  struct timeval now;
125  gettimeofday(&now,nullptr);
126  struct timespec wait;
127  wait.tv_sec = now.tv_sec;
128  wait.tv_nsec = now.tv_usec * 1000;
129 
130  if (block == QUEUE_BLOCKING) {
131  wait.tv_sec += IPC_QUEUE_TIMEOUT_SEC;
132  }
133  NATIVE_INT_TYPE stat = mq_timedsend(handle, reinterpret_cast<const char*>(buffer), size, priority, &wait);
134  if (-1 == stat) {
135  switch (errno) {
136  case EINTR:
137  continue;
138  case EMSGSIZE:
139  return QUEUE_SIZE_MISMATCH;
140  case EINVAL:
141  return QUEUE_INVALID_PRIORITY;
142  case ETIMEDOUT:
143  if (block == QUEUE_NONBLOCKING) {
144  // no more messages. If we are
145  // non-blocking, return
146  return QUEUE_FULL;
147  } else {
148  // TODO(mereweth) - multiprocess signalling necessary?
149  // Go to sleep until we receive a signal that something was taken off the queue
150  continue;
151  }
152  default:
153  return QUEUE_UNKNOWN_ERROR;
154  }
155  } else {
156  keepTrying=false;
157  // TODO(mereweth) - multiprocess signalling necessary?
158  // Wake up a thread that might be waiting on the other end of the queue:
159  }
160  }
161 
162  return QUEUE_OK;
163  }
164 
165  Queue::QueueStatus IPCQueue::receive(U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority, QueueBlocking block) {
166 
167  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
168  mqd_t handle = queueHandle->handle;
169 
170  if (-1 == handle) {
171  return QUEUE_UNINITIALIZED;
172  }
173 
174  ssize_t size;
175  bool notFinished = true;
176  while (notFinished) {
177  struct timeval now;
178  gettimeofday(&now,nullptr);
179  struct timespec wait;
180  wait.tv_sec = now.tv_sec;
181  wait.tv_nsec = now.tv_usec * 1000;
182 
183  if (block == QUEUE_BLOCKING) {
184  wait.tv_sec += IPC_QUEUE_TIMEOUT_SEC;
185  }
186  size = mq_timedreceive(handle, reinterpret_cast<char*>(buffer), static_cast<size_t>(capacity),
187 #ifdef TGT_OS_TYPE_VXWORKS
188  reinterpret_cast<int*>(&priority), &wait);
189 #else
190  reinterpret_cast<unsigned int*>(&priority), &wait);
191 #endif
192 
193  if (-1 == size) { // error
194  switch (errno) {
195  case EINTR:
196  continue;
197  case EMSGSIZE:
198  return QUEUE_SIZE_MISMATCH;
199  case ETIMEDOUT:
200  if (block == QUEUE_NONBLOCKING) {
201  // no more messages. If we are
202  // non-blocking, return
203  return QUEUE_NO_MORE_MSGS;
204  } else {
205  // TODO(mereweth) - multiprocess signalling necessary?
206  // Go to sleep until we receive a signal that something was put on the queue:
207  continue;
208  }
209  break;
210  default:
211  return QUEUE_UNKNOWN_ERROR;
212  }
213  }
214  else {
215  notFinished = false;
216  // TODO(mereweth) - multiprocess signalling necessary?
217  // Wake up a thread that might be waiting on the other end of the queue
218  }
219  }
220 
221  actualSize = static_cast<NATIVE_INT_TYPE>(size);
222  return QUEUE_OK;
223  }
224 
226  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
227  mqd_t handle = queueHandle->handle;
228 
229  struct mq_attr attr;
230  int status = mq_getattr(handle, &attr);
231  FW_ASSERT(status == 0);
232  return static_cast<U32>(attr.mq_curmsgs);
233  }
234 
236  //FW_ASSERT(0);
237  return 0;
238  }
239 
241  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
242  mqd_t handle = queueHandle->handle;
243 
244  struct mq_attr attr;
245  int status = mq_getattr(handle, &attr);
246  FW_ASSERT(status == 0);
247  return static_cast<U32>(attr.mq_maxmsg);
248  }
249 
251  QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
252  mqd_t handle = queueHandle->handle;
253 
254  struct mq_attr attr;
255  int status = mq_getattr(handle, &attr);
256  FW_ASSERT(status == 0);
257  return static_cast<U32>(attr.mq_msgsize);
258  }
259 
260 }
Os
Definition: File.cpp:7
Os::IPCQueue::receive
QueueStatus receive(Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE &priority, QueueBlocking block)
receive a message
Definition: IPCQueueCommon.cpp:15
Os::Queue::QUEUE_FULL
@ QUEUE_FULL
queue was full when attempting to send a message
Definition: Queue.hpp:36
Os::IPCQueue::~IPCQueue
~IPCQueue()
Definition: IPCQueueStub.cpp:85
Os::QueueHandle::~QueueHandle
~QueueHandle()
Definition: IPCQueue.cpp:33
Fw::StringBase
Definition: StringType.hpp:23
U8
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.hpp:76
Os::Queue::QueueStatus
QueueStatus
Definition: Queue.hpp:27
IPCQueue.hpp
Os::Queue::m_handle
POINTER_CAST m_handle
handle for implementation specific queue
Definition: Queue.hpp:75
Os::Queue::QUEUE_EMPTY_BUFFER
@ QUEUE_EMPTY_BUFFER
supplied buffer is empty
Definition: Queue.hpp:35
Assert.hpp
Os::Queue::QUEUE_INVALID_PRIORITY
@ QUEUE_INVALID_PRIORITY
invalid priority requested
Definition: Queue.hpp:34
Os::IPCQueue::getQueueSize
NATIVE_INT_TYPE getQueueSize() const
get the queue depth (maximum number of messages queue can hold)
Definition: IPCQueueStub.cpp:361
Os::Queue::QUEUE_SIZE_MISMATCH
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition: Queue.hpp:31
IPC_QUEUE_TIMEOUT_SEC
#define IPC_QUEUE_TIMEOUT_SEC
Definition: IPCQueue.cpp:24
Os::Queue::QUEUE_UNINITIALIZED
@ QUEUE_UNINITIALIZED
Queue wasn't initialized successfully.
Definition: Queue.hpp:30
Os::Queue::m_name
QueueString m_name
queue name
Definition: Queue.hpp:76
Os::IPCQueue::IPCQueue
IPCQueue()
Definition: IPCQueueStub.cpp:54
Os::Queue::QUEUE_NO_MORE_MSGS
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition: Queue.hpp:29
FW_ASSERT
#define FW_ASSERT(...)
Definition: Assert.hpp:8
Os::Queue::QUEUE_OK
@ QUEUE_OK
message sent/received okay
Definition: Queue.hpp:28
Os::IPCQueue::getMsgSize
NATIVE_INT_TYPE getMsgSize() const
get the message size (maximum message size queue can hold)
Definition: IPCQueueStub.cpp:370
Os::Queue::s_numQueues
static NATIVE_INT_TYPE s_numQueues
tracks number of queues in the system
Definition: Queue.hpp:80
Os::IPCQueue::create
QueueStatus create(const Fw::StringBase &name, NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
create a message queue
Definition: IPCQueueStub.cpp:57
Os::Queue::QUEUE_UNKNOWN_ERROR
@ QUEUE_UNKNOWN_ERROR
Unexpected error; can't match with returns.
Definition: Queue.hpp:37
Os::Queue::QUEUE_NONBLOCKING
@ QUEUE_NONBLOCKING
Queue receive always returns even if there is no message.
Definition: Queue.hpp:42
Os::QueueHandle::handle
mqd_t handle
Definition: IPCQueue.cpp:39
Os::IPCQueue::getMaxMsgs
NATIVE_INT_TYPE getMaxMsgs() const
get the maximum number of messages (high watermark)
Definition: IPCQueueStub.cpp:352
Os::IPCQueue::getNumMsgs
NATIVE_INT_TYPE getNumMsgs() const
get the number of messages in the queue
Definition: IPCQueueStub.cpp:343
NATIVE_INT_TYPE
int NATIVE_INT_TYPE
native integer type declaration
Definition: BasicTypes.hpp:29
Os::Queue
Definition: Queue.hpp:24
Os::IPCQueue::send
QueueStatus send(const Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE priority, QueueBlocking block)
send a message
Definition: IPCQueueCommon.cpp:6
Os::QueueHandle::QueueHandle
QueueHandle(mqd_t m_handle)
Definition: IPCQueue.cpp:30
Queue.hpp
Os::Queue::QUEUE_BLOCKING
@ QUEUE_BLOCKING
Queue receive blocks until a message arrives.
Definition: Queue.hpp:41