F´ Flight Software - C/C++ Documentation  NASA-v1.5.0
A framework for building embedded system applications to NASA flight quality standards.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
IPCQueue.cpp
Go to the documentation of this file.
1 #include <Fw/Types/Assert.hpp>
2 #include <Os/Queue.hpp>
3 #include <Os/IPCQueue.hpp>
4 
5 #ifdef TGT_OS_TYPE_VXWORKS
6  #include <vxWorks.h>
7 #endif
8 
9 #ifdef TGT_OS_TYPE_LINUX
10  #include <sys/types.h>
11  #include <unistd.h>
12 #endif
13 
14 #include <mqueue.h>
15 #include <fcntl.h>
16 #include <errno.h>
17 #include <string.h>
18 #include <stdio.h>
19 #include <time.h>
20 #include <sys/time.h>
21 #include <pthread.h>
22 
23 #define IPC_QUEUE_TIMEOUT_SEC (1)
24 
25 namespace Os {
26 
27  class QueueHandle {
28  public:
29  QueueHandle(mqd_t m_handle) {
30  this->handle = m_handle;
31  }
33  // Destroy the handle:
34  if (-1 != this->handle) {
35  (void) mq_close(this->handle);
36  }
37  }
38  mqd_t handle;
39  };
40 
42  }
43 
45 
46  this->m_name = "/QP_";
47  this->m_name += name;
48 #ifndef TGT_OS_TYPE_VXWORKS
49  char pid[40];
50  (void)snprintf(pid,sizeof(pid),".%d",getpid());
51  pid[sizeof(pid)-1] = 0;
52  this->m_name += pid;
53 #endif
54  mq_attr att;
55  mqd_t handle;
56 
57  memset(&att,0,sizeof(att));
58  att.mq_maxmsg = depth;
59  att.mq_msgsize = msgSize;
60  att.mq_flags = 0;
61  att.mq_curmsgs = 0;
62 
63  /* NOTE(mereweth) - O_BLOCK is the default; we use timedsend and
64  * timedreceive below if QUEUE_NONBLOCKING is specified
65  *
66  */
67  handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL, 0666, &att);
68 
69  // If queue already exists, then unlink it and try again.
70  if (-1 == (NATIVE_INT_TYPE) handle) {
71  switch (errno) {
72  case EEXIST:
73  (void)mq_unlink(this->m_name.toChar());
74  break;
75  default:
76  return QUEUE_UNINITIALIZED;
77  }
78 
79  handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL, 0666, &att);
80 
81  if (-1 == (NATIVE_INT_TYPE) handle) {
82  return QUEUE_UNINITIALIZED;
83  }
84  }
85 
86  // Set up queue handle:
87  QueueHandle* queueHandle = new QueueHandle(handle);
88  if (NULL == queueHandle) {
89  return QUEUE_UNINITIALIZED;
90  }
91  this->m_handle = (POINTER_CAST) queueHandle;
92 
94 
95  return QUEUE_OK;
96  }
97 
99  // Clean up the queue handle:
100  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
101  if (NULL != queueHandle) {
102  delete queueHandle;
103  }
104  this->m_handle = (POINTER_CAST) NULL; // important so base Queue class doesn't free it
105  (void) mq_unlink(this->m_name.toChar());
106  }
107 
108  Queue::QueueStatus IPCQueue::send(const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority, QueueBlocking block) {
109 
110  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
111  mqd_t handle = queueHandle->handle;
112 
113  if (-1 == handle) {
114  return QUEUE_UNINITIALIZED;
115  }
116 
117  if (NULL == buffer) {
118  return QUEUE_EMPTY_BUFFER;
119  }
120 
121  bool keepTrying = true;
122  while (keepTrying) {
123  struct timeval now;
124  gettimeofday(&now,NULL);
125  struct timespec wait;
126  wait.tv_sec = now.tv_sec;
127  wait.tv_nsec = now.tv_usec * 1000;
128 
129  if (block == QUEUE_BLOCKING) {
130  wait.tv_sec += IPC_QUEUE_TIMEOUT_SEC;
131  }
132  NATIVE_INT_TYPE stat = mq_timedsend(handle, (const char*) buffer, size, priority, &wait);
133  if (-1 == stat) {
134  switch (errno) {
135  case EINTR:
136  continue;
137  case EMSGSIZE:
138  return QUEUE_SIZE_MISMATCH;
139  case EINVAL:
140  return QUEUE_INVALID_PRIORITY;
141  case ETIMEDOUT:
142  if (block == QUEUE_NONBLOCKING) {
143  // no more messages. If we are
144  // non-blocking, return
145  return QUEUE_FULL;
146  } else {
147  // TODO(mereweth) - multiprocess signalling necessary?
148  // Go to sleep until we receive a signal that something was takeng off the queue
149  continue;
150  }
151  default:
152  return QUEUE_UNKNOWN_ERROR;
153  }
154  } else {
155  keepTrying=false;
156  // TODO(mereweth) - multiprocess signalling necessary?
157  // Wake up a thread that might be waiting on the other end of the queue:
158  }
159  }
160 
161  return QUEUE_OK;
162  }
163 
164  Queue::QueueStatus IPCQueue::receive(U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority, QueueBlocking block) {
165 
166  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
167  mqd_t handle = queueHandle->handle;
168 
169  if (-1 == handle) {
170  return QUEUE_UNINITIALIZED;
171  }
172 
173  ssize_t size;
174  bool notFinished = true;
175  while (notFinished) {
176  struct timeval now;
177  gettimeofday(&now,NULL);
178  struct timespec wait;
179  wait.tv_sec = now.tv_sec;
180  wait.tv_nsec = now.tv_usec * 1000;
181 
182  if (block == QUEUE_BLOCKING) {
183  wait.tv_sec += IPC_QUEUE_TIMEOUT_SEC;
184  }
185  size = mq_timedreceive(handle, (char*) buffer, (size_t) capacity,
186 #ifdef TGT_OS_TYPE_VXWORKS
187  (int*)&priority, &wait);
188 #else
189  (unsigned int*) &priority, &wait);
190 #endif
191 
192  if (-1 == size) { // error
193  switch (errno) {
194  case EINTR:
195  continue;
196  case EMSGSIZE:
197  return QUEUE_SIZE_MISMATCH;
198  case ETIMEDOUT:
199  if (block == QUEUE_NONBLOCKING) {
200  // no more messages. If we are
201  // non-blocking, return
202  return QUEUE_NO_MORE_MSGS;
203  } else {
204  // TODO(mereweth) - multiprocess signalling necessary?
205  // Go to sleep until we receive a signal that something was put on the queue:
206  continue;
207  }
208  break;
209  default:
210  return QUEUE_UNKNOWN_ERROR;
211  }
212  }
213  else {
214  notFinished = false;
215  // TODO(mereweth) - multiprocess signalling necessary?
216  // Wake up a thread that might be waiting on the other end of the queue
217  }
218  }
219 
220  actualSize = (NATIVE_INT_TYPE) size;
221  return QUEUE_OK;
222  }
223 
225  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
226  mqd_t handle = queueHandle->handle;
227 
228  struct mq_attr attr;
229  int status = mq_getattr(handle, &attr);
230  FW_ASSERT(status == 0);
231  return (U32) attr.mq_curmsgs;
232  }
233 
235  //FW_ASSERT(0);
236  return 0;
237  }
238 
240  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
241  mqd_t handle = queueHandle->handle;
242 
243  struct mq_attr attr;
244  int status = mq_getattr(handle, &attr);
245  FW_ASSERT(status == 0);
246  return (U32) attr.mq_maxmsg;
247  }
248 
250  QueueHandle* queueHandle = (QueueHandle*) this->m_handle;
251  mqd_t handle = queueHandle->handle;
252 
253  struct mq_attr attr;
254  int status = mq_getattr(handle, &attr);
255  FW_ASSERT(status == 0);
256  return (U32) attr.mq_msgsize;
257  }
258 
259 }
Os
Definition: File.cpp:7
Os::IPCQueue::receive
QueueStatus receive(Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE &priority, QueueBlocking block)
receive a message
Definition: IPCQueueCommon.cpp:15
Os::IPCQueue::getNumMsgs
NATIVE_INT_TYPE getNumMsgs(void) const
get the number of messages in the queue
Definition: IPCQueueStub.cpp:342
Os::Queue::QUEUE_FULL
@ QUEUE_FULL
queue was full when attempting to send a message
Definition: Queue.hpp:36
Os::IPCQueue::~IPCQueue
~IPCQueue()
Definition: IPCQueueStub.cpp:84
Os::QueueHandle::~QueueHandle
~QueueHandle()
Definition: IPCQueue.cpp:32
Fw::StringBase
Definition: StringType.hpp:23
U8
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.hpp:76
Os::Queue::QueueStatus
QueueStatus
Definition: Queue.hpp:27
IPCQueue.hpp
Os::Queue::m_handle
POINTER_CAST m_handle
handle for implementation specific queue
Definition: Queue.hpp:69
Os::Queue::QUEUE_EMPTY_BUFFER
@ QUEUE_EMPTY_BUFFER
supplied buffer is empty
Definition: Queue.hpp:35
Assert.hpp
Os::Queue::QUEUE_INVALID_PRIORITY
@ QUEUE_INVALID_PRIORITY
invalid priority requested
Definition: Queue.hpp:34
Os::Queue::QUEUE_SIZE_MISMATCH
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition: Queue.hpp:31
IPC_QUEUE_TIMEOUT_SEC
#define IPC_QUEUE_TIMEOUT_SEC
Definition: IPCQueue.cpp:23
Os::Queue::QUEUE_UNINITIALIZED
@ QUEUE_UNINITIALIZED
Queue wasn't initialized successfully.
Definition: Queue.hpp:30
Os::Queue::m_name
QueueString m_name
queue name
Definition: Queue.hpp:70
Os::IPCQueue::IPCQueue
IPCQueue()
Definition: IPCQueueStub.cpp:53
Os::IPCQueue::getQueueSize
NATIVE_INT_TYPE getQueueSize(void) const
get the queue depth (maximum number of messages queue can hold)
Definition: IPCQueueStub.cpp:360
Os::Queue::QUEUE_NO_MORE_MSGS
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition: Queue.hpp:29
FW_ASSERT
#define FW_ASSERT(...)
Definition: Assert.hpp:9
Os::Queue::QUEUE_OK
@ QUEUE_OK
message sent/received okay
Definition: Queue.hpp:28
Os::IPCQueue::getMaxMsgs
NATIVE_INT_TYPE getMaxMsgs(void) const
get the maximum number of messages (high watermark)
Definition: IPCQueueStub.cpp:351
Os::Queue::s_numQueues
static NATIVE_INT_TYPE s_numQueues
tracks number of queues in the system
Definition: Queue.hpp:74
Os::IPCQueue::create
QueueStatus create(const Fw::StringBase &name, NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
create a message queue
Definition: IPCQueueStub.cpp:56
Os::Queue::QUEUE_UNKNOWN_ERROR
@ QUEUE_UNKNOWN_ERROR
Unexpected error; can't match with returns.
Definition: Queue.hpp:37
Os::Queue::QUEUE_NONBLOCKING
@ QUEUE_NONBLOCKING
Queue receive always returns even if there is no message.
Definition: Queue.hpp:42
Os::QueueHandle::handle
mqd_t handle
Definition: IPCQueue.cpp:38
NATIVE_INT_TYPE
int NATIVE_INT_TYPE
native integer type declaration
Definition: BasicTypes.hpp:29
NULL
#define NULL
NULL.
Definition: BasicTypes.hpp:100
Os::Queue
Definition: Queue.hpp:24
Os::IPCQueue::send
QueueStatus send(const Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE priority, QueueBlocking block)
send a message
Definition: IPCQueueCommon.cpp:6
Os::QueueHandle::QueueHandle
QueueHandle(mqd_t m_handle)
Definition: IPCQueue.cpp:29
Os::IPCQueue::getMsgSize
NATIVE_INT_TYPE getMsgSize(void) const
get the message size (maximum message size queue can hold)
Definition: IPCQueueStub.cpp:369
Queue.hpp
Os::Queue::QUEUE_BLOCKING
@ QUEUE_BLOCKING
Queue receive blocks until a message arrives.
Definition: Queue.hpp:41