F´ Flight Software - C/C++ Documentation  NASA-v2.0.0
A framework for building embedded system applications to NASA flight quality standards.
IPCQueue.cpp
Go to the documentation of this file.
1 #include <Fw/Types/Assert.hpp>
2 #include <Os/Queue.hpp>
3 #include <Os/IPCQueue.hpp>
4 
5 #ifdef TGT_OS_TYPE_VXWORKS
6  #include <vxWorks.h>
7 #endif
8 
9 #ifdef TGT_OS_TYPE_LINUX
10  #include <sys/types.h>
11  #include <unistd.h>
12 #endif
13 
14 #include <mqueue.h>
15 #include <fcntl.h>
16 #include <errno.h>
17 #include <string.h>
18 #include <stdio.h>
19 #include <time.h>
20 #include <sys/time.h>
21 #include <pthread.h>
22 
23 #define IPC_QUEUE_TIMEOUT_SEC (1)
24 
25 namespace Os {
26 
27  class QueueHandle {
28  public:
29  QueueHandle(mqd_t m_handle) {
30  this->handle = m_handle;
31  }
33  // Destroy the handle:
34  if (-1 != this->handle) {
35  (void) mq_close(this->handle);
36  }
37  }
38  mqd_t handle;
39  };
40 
42  }
43 
45 
46  this->m_name = "/QP_";
47  this->m_name += name;
48 #ifndef TGT_OS_TYPE_VXWORKS
49  char pid[40];
50  (void)snprintf(pid,sizeof(pid),".%d",getpid());
51  pid[sizeof(pid)-1] = 0;
52  this->m_name += pid;
53 #endif
54  mq_attr att;
55  mqd_t handle;
56 
57  memset(&att,0,sizeof(att));
58  att.mq_maxmsg = depth;
59  att.mq_msgsize = msgSize;
60  att.mq_flags = 0;
61  att.mq_curmsgs = 0;
62 
63  /* NOTE(mereweth) - O_BLOCK is the default; we use timedsend and
64  * timedreceive below if QUEUE_NONBLOCKING is specified
65  *
66  */
67  handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL, 0666, &att);
68 
69  // If queue already exists, then unlink it and try again.
70  if (-1 == (NATIVE_INT_TYPE) handle) {
71  switch (errno) {
72  case EEXIST:
73  (void)mq_unlink(this->m_name.toChar());
74  break;
75  default:
76  return QUEUE_UNINITIALIZED;
77  }
78 
79  handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL, 0666, &att);
80 
81  if (-1 == (NATIVE_INT_TYPE) handle) {
82  return QUEUE_UNINITIALIZED;
83  }
84  }
85 
86  // Set up queue handle:
87  QueueHandle* queueHandle = new QueueHandle(handle);
88  if (NULL == queueHandle) {
89  return QUEUE_UNINITIALIZED;
90  }
91  this->m_handle = (POINTER_CAST) queueHandle;
92 
94 
95  return QUEUE_OK;
96  }
97 
99  // Clean up the queue handle:
100  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
101  if (NULL != queueHandle) {
102  delete queueHandle;
103  }
104  this->m_handle = (POINTER_CAST) NULL; // important so base Queue class doesn't free it
105  (void) mq_unlink(this->m_name.toChar());
106  }
107 
108  Queue::QueueStatus IPCQueue::send(const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority, QueueBlocking block) {
109 
110  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
111  mqd_t handle = queueHandle->handle;
112 
113  if (-1 == handle) {
114  return QUEUE_UNINITIALIZED;
115  }
116 
117  if (NULL == buffer) {
118  return QUEUE_EMPTY_BUFFER;
119  }
120 
121  bool keepTrying = true;
122  while (keepTrying) {
123  struct timeval now;
124  gettimeofday(&now,NULL);
125  struct timespec wait;
126  wait.tv_sec = now.tv_sec;
127  wait.tv_nsec = now.tv_usec * 1000;
128 
129  if (block == QUEUE_BLOCKING) {
130  wait.tv_sec += IPC_QUEUE_TIMEOUT_SEC;
131  }
132  NATIVE_INT_TYPE stat = mq_timedsend(handle, (const char*) buffer, size, priority, &wait);
133  if (-1 == stat) {
134  switch (errno) {
135  case EINTR:
136  continue;
137  case EMSGSIZE:
138  return QUEUE_SIZE_MISMATCH;
139  case EINVAL:
140  return QUEUE_INVALID_PRIORITY;
141  case ETIMEDOUT:
142  if (block == QUEUE_NONBLOCKING) {
143  // no more messages. If we are
144  // non-blocking, return
145  return QUEUE_FULL;
146  } else {
147  // TODO(mereweth) - multiprocess signalling necessary?
148  // Go to sleep until we receive a signal that something was taken off the queue
149  continue;
150  }
151  default:
152  return QUEUE_UNKNOWN_ERROR;
153  }
154  } else {
155  keepTrying=false;
156  // TODO(mereweth) - multiprocess signalling necessary?
157  // Wake up a thread that might be waiting on the other end of the queue:
158  }
159  }
160 
161  return QUEUE_OK;
162  }
163 
164  Queue::QueueStatus IPCQueue::receive(U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority, QueueBlocking block) {
165 
166  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
167  mqd_t handle = queueHandle->handle;
168 
169  if (-1 == handle) {
170  return QUEUE_UNINITIALIZED;
171  }
172 
173  ssize_t size;
174  bool notFinished = true;
175  while (notFinished) {
176  struct timeval now;
177  gettimeofday(&now,NULL);
178  struct timespec wait;
179  wait.tv_sec = now.tv_sec;
180  wait.tv_nsec = now.tv_usec * 1000;
181 
182  if (block == QUEUE_BLOCKING) {
183  wait.tv_sec += IPC_QUEUE_TIMEOUT_SEC;
184  }
185  size = mq_timedreceive(handle, (char*) buffer, (size_t) capacity,
186 #ifdef TGT_OS_TYPE_VXWORKS
187  (int*)&priority, &wait);
188 #else
189  (unsigned int*) &priority, &wait);
190 #endif
191 
192  if (-1 == size) { // error
193  switch (errno) {
194  case EINTR:
195  continue;
196  case EMSGSIZE:
197  return QUEUE_SIZE_MISMATCH;
198  case ETIMEDOUT:
199  if (block == QUEUE_NONBLOCKING) {
200  // no more messages. If we are
201  // non-blocking, return
202  return QUEUE_NO_MORE_MSGS;
203  } else {
204  // TODO(mereweth) - multiprocess signalling necessary?
205  // Go to sleep until we receive a signal that something was put on the queue:
206  continue;
207  }
208  break;
209  default:
210  return QUEUE_UNKNOWN_ERROR;
211  }
212  }
213  else {
214  notFinished = false;
215  // TODO(mereweth) - multiprocess signalling necessary?
216  // Wake up a thread that might be waiting on the other end of the queue
217  }
218  }
219 
220  actualSize = (NATIVE_INT_TYPE) size;
221  return QUEUE_OK;
222  }
223 
225  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
226  mqd_t handle = queueHandle->handle;
227 
228  struct mq_attr attr;
229  int status = mq_getattr(handle, &attr);
230  FW_ASSERT(status == 0);
231  return (U32) attr.mq_curmsgs;
232  }
233 
235  //FW_ASSERT(0);
236  return 0;
237  }
238 
240  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
241  mqd_t handle = queueHandle->handle;
242 
243  struct mq_attr attr;
244  int status = mq_getattr(handle, &attr);
245  FW_ASSERT(status == 0);
246  return (U32) attr.mq_maxmsg;
247  }
248 
250  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
251  mqd_t handle = queueHandle->handle;
252 
253  struct mq_attr attr;
254  int status = mq_getattr(handle, &attr);
255  FW_ASSERT(status == 0);
256  return (U32) attr.mq_msgsize;
257  }
258 
259 }
Os
Definition: File.cpp:7
Os::IPCQueue::receive
QueueStatus receive(Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE &priority, QueueBlocking block)
receive a message
Definition: IPCQueueCommon.cpp:15
Os::IPCQueue::getNumMsgs
NATIVE_INT_TYPE getNumMsgs(void) const
get the number of messages in the queue
Definition: IPCQueueStub.cpp:342
Os::Queue::QUEUE_FULL
@ QUEUE_FULL
queue was full when attempting to send a message
Definition: Queue.hpp:36
Os::IPCQueue::~IPCQueue
~IPCQueue()
Definition: IPCQueueStub.cpp:84
Os::QueueHandle::~QueueHandle
~QueueHandle()
Definition: IPCQueue.cpp:32
Fw::StringBase
Definition: StringType.hpp:23
U8
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.hpp:76
Os::Queue::QueueStatus
QueueStatus
Definition: Queue.hpp:27
IPCQueue.hpp
Os::Queue::m_handle
POINTER_CAST m_handle
handle for implementation specific queue
Definition: Queue.hpp:75
Os::Queue::QUEUE_EMPTY_BUFFER
@ QUEUE_EMPTY_BUFFER
supplied buffer is empty
Definition: Queue.hpp:35
Assert.hpp
Os::Queue::QUEUE_INVALID_PRIORITY
@ QUEUE_INVALID_PRIORITY
invalid priority requested
Definition: Queue.hpp:34
Os::Queue::QUEUE_SIZE_MISMATCH
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition: Queue.hpp:31
IPC_QUEUE_TIMEOUT_SEC
#define IPC_QUEUE_TIMEOUT_SEC
Definition: IPCQueue.cpp:23
Os::Queue::QUEUE_UNINITIALIZED
@ QUEUE_UNINITIALIZED
Queue wasn't initialized successfully.
Definition: Queue.hpp:30
Os::Queue::m_name
QueueString m_name
queue name
Definition: Queue.hpp:76
Os::IPCQueue::IPCQueue
IPCQueue()
Definition: IPCQueueStub.cpp:53
Os::IPCQueue::getQueueSize
NATIVE_INT_TYPE getQueueSize(void) const
get the queue depth (maximum number of messages queue can hold)
Definition: IPCQueueStub.cpp:360
Os::Queue::QUEUE_NO_MORE_MSGS
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition: Queue.hpp:29
FW_ASSERT
#define FW_ASSERT(...)
Definition: Assert.hpp:9
Os::Queue::QUEUE_OK
@ QUEUE_OK
message sent/received okay
Definition: Queue.hpp:28
Os::IPCQueue::getMaxMsgs
NATIVE_INT_TYPE getMaxMsgs(void) const
get the maximum number of messages (high watermark)
Definition: IPCQueueStub.cpp:351
Os::Queue::s_numQueues
static NATIVE_INT_TYPE s_numQueues
tracks number of queues in the system
Definition: Queue.hpp:80
Os::IPCQueue::create
QueueStatus create(const Fw::StringBase &name, NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
create a message queue
Definition: IPCQueueStub.cpp:56
Os::Queue::QUEUE_UNKNOWN_ERROR
@ QUEUE_UNKNOWN_ERROR
Unexpected error; can't match with returns.
Definition: Queue.hpp:37
Os::Queue::QUEUE_NONBLOCKING
@ QUEUE_NONBLOCKING
Queue receive always returns even if there is no message.
Definition: Queue.hpp:42
Os::QueueHandle::handle
mqd_t handle
Definition: IPCQueue.cpp:38
NATIVE_INT_TYPE
int NATIVE_INT_TYPE
native integer type declaration
Definition: BasicTypes.hpp:29
NULL
#define NULL
NULL.
Definition: BasicTypes.hpp:100
Os::Queue
Definition: Queue.hpp:24
Os::IPCQueue::send
QueueStatus send(const Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE priority, QueueBlocking block)
send a message
Definition: IPCQueueCommon.cpp:6
Os::QueueHandle::QueueHandle
QueueHandle(mqd_t m_handle)
Definition: IPCQueue.cpp:29
Os::IPCQueue::getMsgSize
NATIVE_INT_TYPE getMsgSize(void) const
get the message size (maximum message size queue can hold)
Definition: IPCQueueStub.cpp:369
Queue.hpp
Os::Queue::QUEUE_BLOCKING
@ QUEUE_BLOCKING
Queue receive blocks until a message arrives.
Definition: Queue.hpp:41