F´ Flight Software - C/C++ Documentation NASA-v1.6.0
A framework for building embedded system applications to NASA flight quality standards.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
IPCQueue.cpp
Go to the documentation of this file.
1#include <Fw/Types/Assert.hpp>
2#include <Os/Queue.hpp>
3#include <Os/IPCQueue.hpp>
4
5#ifdef TGT_OS_TYPE_VXWORKS
6 #include <vxWorks.h>
7#endif
8
9#ifdef TGT_OS_TYPE_LINUX
10 #include <sys/types.h>
11 #include <unistd.h>
12#endif
13
14#include <mqueue.h>
15#include <fcntl.h>
16#include <cerrno>
17#include <cstring>
18#include <cstdio>
19#include <ctime>
20#include <sys/time.h>
21#include <pthread.h>
22#include <new>
23
24#define IPC_QUEUE_TIMEOUT_SEC (1)
25
26namespace Os {
27
28 class QueueHandle {
29 public:
30 QueueHandle(mqd_t m_handle) {
31 this->handle = m_handle;
32 }
34 // Destroy the handle:
35 if (-1 != this->handle) {
36 (void) mq_close(this->handle);
37 }
38 }
39 mqd_t handle;
40 };
41
43 }
44
46
47 this->m_name = "/QP_";
48 this->m_name += name;
49#ifndef TGT_OS_TYPE_VXWORKS
50 char pid[40];
51 (void)snprintf(pid,sizeof(pid),".%d",getpid());
52 pid[sizeof(pid)-1] = 0;
53 this->m_name += pid;
54#endif
55 mq_attr att;
56 mqd_t handle;
57
58 memset(&att,0,sizeof(att));
59 att.mq_maxmsg = depth;
60 att.mq_msgsize = msgSize;
61 att.mq_flags = 0;
62 att.mq_curmsgs = 0;
63
64 /* NOTE(mereweth) - O_BLOCK is the default; we use timedsend and
65 * timedreceive below if QUEUE_NONBLOCKING is specified
66 *
67 */
68 handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL, 0666, &att);
69
70 // If queue already exists, then unlink it and try again.
71 if (-1 == handle) {
72 switch (errno) {
73 case EEXIST:
74 (void)mq_unlink(this->m_name.toChar());
75 break;
76 default:
78 }
79
80 handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL, 0666, &att);
81
82 if (-1 == handle) {
84 }
85 }
86
87 // Set up queue handle:
88 QueueHandle* queueHandle = new(std::nothrow) QueueHandle(handle);
89 if (nullptr == queueHandle) {
91 }
92 this->m_handle = reinterpret_cast<POINTER_CAST>(queueHandle);
93
95
96 return QUEUE_OK;
97 }
98
100 // Clean up the queue handle:
101 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
102 if (nullptr != queueHandle) {
103 delete queueHandle;
104 }
105 this->m_handle = reinterpret_cast<POINTER_CAST>(nullptr); // important so base Queue class doesn't free it
106 (void) mq_unlink(this->m_name.toChar());
107 }
108
109 Queue::QueueStatus IPCQueue::send(const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority, QueueBlocking block) {
110
111 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
112 mqd_t handle = queueHandle->handle;
113
114 if (-1 == handle) {
115 return QUEUE_UNINITIALIZED;
116 }
117
118 if (nullptr == buffer) {
119 return QUEUE_EMPTY_BUFFER;
120 }
121
122 bool keepTrying = true;
123 while (keepTrying) {
124 struct timeval now;
125 gettimeofday(&now,nullptr);
126 struct timespec wait;
127 wait.tv_sec = now.tv_sec;
128 wait.tv_nsec = now.tv_usec * 1000;
129
130 if (block == QUEUE_BLOCKING) {
131 wait.tv_sec += IPC_QUEUE_TIMEOUT_SEC;
132 }
133 NATIVE_INT_TYPE stat = mq_timedsend(handle, reinterpret_cast<const char*>(buffer), size, priority, &wait);
134 if (-1 == stat) {
135 switch (errno) {
136 case EINTR:
137 continue;
138 case EMSGSIZE:
139 return QUEUE_SIZE_MISMATCH;
140 case EINVAL:
142 case ETIMEDOUT:
143 if (block == QUEUE_NONBLOCKING) {
144 // no more messages. If we are
145 // non-blocking, return
146 return QUEUE_FULL;
147 } else {
148 // TODO(mereweth) - multiprocess signalling necessary?
149 // Go to sleep until we receive a signal that something was taken off the queue
150 continue;
151 }
152 default:
153 return QUEUE_UNKNOWN_ERROR;
154 }
155 } else {
156 keepTrying=false;
157 // TODO(mereweth) - multiprocess signalling necessary?
158 // Wake up a thread that might be waiting on the other end of the queue:
159 }
160 }
161
162 return QUEUE_OK;
163 }
164
165 Queue::QueueStatus IPCQueue::receive(U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority, QueueBlocking block) {
166
167 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
168 mqd_t handle = queueHandle->handle;
169
170 if (-1 == handle) {
171 return QUEUE_UNINITIALIZED;
172 }
173
174 ssize_t size;
175 bool notFinished = true;
176 while (notFinished) {
177 struct timeval now;
178 gettimeofday(&now,nullptr);
179 struct timespec wait;
180 wait.tv_sec = now.tv_sec;
181 wait.tv_nsec = now.tv_usec * 1000;
182
183 if (block == QUEUE_BLOCKING) {
184 wait.tv_sec += IPC_QUEUE_TIMEOUT_SEC;
185 }
186 size = mq_timedreceive(handle, reinterpret_cast<char*>(buffer), static_cast<size_t>(capacity),
187#ifdef TGT_OS_TYPE_VXWORKS
188 reinterpret_cast<int*>(&priority), &wait);
189#else
190 reinterpret_cast<unsigned int*>(&priority), &wait);
191#endif
192
193 if (-1 == size) { // error
194 switch (errno) {
195 case EINTR:
196 continue;
197 case EMSGSIZE:
198 return QUEUE_SIZE_MISMATCH;
199 case ETIMEDOUT:
200 if (block == QUEUE_NONBLOCKING) {
201 // no more messages. If we are
202 // non-blocking, return
203 return QUEUE_NO_MORE_MSGS;
204 } else {
205 // TODO(mereweth) - multiprocess signalling necessary?
206 // Go to sleep until we receive a signal that something was put on the queue:
207 continue;
208 }
209 break;
210 default:
211 return QUEUE_UNKNOWN_ERROR;
212 }
213 }
214 else {
215 notFinished = false;
216 // TODO(mereweth) - multiprocess signalling necessary?
217 // Wake up a thread that might be waiting on the other end of the queue
218 }
219 }
220
221 actualSize = static_cast<NATIVE_INT_TYPE>(size);
222 return QUEUE_OK;
223 }
224
226 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
227 mqd_t handle = queueHandle->handle;
228
229 struct mq_attr attr;
230 int status = mq_getattr(handle, &attr);
231 FW_ASSERT(status == 0);
232 return static_cast<U32>(attr.mq_curmsgs);
233 }
234
236 //FW_ASSERT(0);
237 return 0;
238 }
239
241 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
242 mqd_t handle = queueHandle->handle;
243
244 struct mq_attr attr;
245 int status = mq_getattr(handle, &attr);
246 FW_ASSERT(status == 0);
247 return static_cast<U32>(attr.mq_maxmsg);
248 }
249
251 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
252 mqd_t handle = queueHandle->handle;
253
254 struct mq_attr attr;
255 int status = mq_getattr(handle, &attr);
256 FW_ASSERT(status == 0);
257 return static_cast<U32>(attr.mq_msgsize);
258 }
259
260}
#define FW_ASSERT(...)
Definition Assert.hpp:7
PlatformPointerCastType POINTER_CAST
Definition BasicTypes.h:53
PlatformIntType NATIVE_INT_TYPE
Definition BasicTypes.h:51
uint8_t U8
8-bit unsigned integer
Definition BasicTypes.h:26
#define IPC_QUEUE_TIMEOUT_SEC
Definition IPCQueue.cpp:24
NATIVE_INT_TYPE getQueueSize() const
get the queue depth (maximum number of messages queue can hold)
NATIVE_INT_TYPE getMsgSize() const
get the message size (maximum message size queue can hold)
QueueStatus create(const Fw::StringBase &name, NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
create a message queue
QueueStatus send(const Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE priority, QueueBlocking block)
send a message
NATIVE_INT_TYPE getNumMsgs() const
get the number of messages in the queue
QueueStatus receive(Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE &priority, QueueBlocking block)
receive a message
NATIVE_INT_TYPE getMaxMsgs() const
get the maximum number of messages (high watermark)
QueueHandle(mqd_t m_handle)
Definition IPCQueue.cpp:30
QueueString m_name
queue name
Definition Queue.hpp:76
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition Queue.hpp:31
@ QUEUE_UNINITIALIZED
Queue wasn't initialized successfully.
Definition Queue.hpp:30
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition Queue.hpp:29
@ QUEUE_INVALID_PRIORITY
invalid priority requested
Definition Queue.hpp:34
@ QUEUE_EMPTY_BUFFER
supplied buffer is empty
Definition Queue.hpp:35
@ QUEUE_UNKNOWN_ERROR
Unexpected error; can't match with returns.
Definition Queue.hpp:37
@ QUEUE_OK
message sent/received okay
Definition Queue.hpp:28
@ QUEUE_FULL
queue was full when attempting to send a message
Definition Queue.hpp:36
@ QUEUE_BLOCKING
Queue receive blocks until a message arrives.
Definition Queue.hpp:41
@ QUEUE_NONBLOCKING
Queue receive always returns even if there is no message.
Definition Queue.hpp:42
static NATIVE_INT_TYPE s_numQueues
tracks number of queues in the system
Definition Queue.hpp:80
POINTER_CAST m_handle
handle for implementation specific queue
Definition Queue.hpp:75
Definition File.cpp:6