F´ Flight Software - C/C++ Documentation NASA-v1.6.0
A framework for building embedded system applications to NASA flight quality standards.
Loading...
Searching...
No Matches
IPCQueue.cpp
Go to the documentation of this file.
1#include <Fw/Types/Assert.hpp>
2#include <Os/Queue.hpp>
3#include <Os/IPCQueue.hpp>
4
5#ifdef TGT_OS_TYPE_VXWORKS
6 #include <vxWorks.h>
7#endif
8
9#ifdef TGT_OS_TYPE_LINUX
10 #include <sys/types.h>
11 #include <unistd.h>
12#endif
13
14#include <mqueue.h>
15#include <fcntl.h>
16#include <cerrno>
17#include <cstring>
18#include <cstdio>
19#include <ctime>
20#include <sys/time.h>
21#include <pthread.h>
22#include <new>
23
24#define IPC_QUEUE_TIMEOUT_SEC (1)
25
26namespace Os {
27
28 class QueueHandle {
29 public:
30 QueueHandle(mqd_t m_handle) {
31 this->handle = m_handle;
32 }
34 // Destroy the handle:
35 if (-1 != this->handle) {
36 (void) mq_close(this->handle);
37 }
38 }
39 mqd_t handle;
40 };
41
43 }
44
46
47 this->m_name = "/QP_";
48 this->m_name += name;
49#ifndef TGT_OS_TYPE_VXWORKS
50 char pid[40];
51 (void)snprintf(pid,sizeof(pid),".%d",getpid());
52 pid[sizeof(pid)-1] = 0;
53 this->m_name += pid;
54#endif
55 mq_attr att;
56 mqd_t handle;
57
58 memset(&att,0,sizeof(att));
59 att.mq_maxmsg = depth;
60 att.mq_msgsize = msgSize;
61 att.mq_flags = 0;
62 att.mq_curmsgs = 0;
63
64 /* NOTE(mereweth) - O_BLOCK is the default; we use timedsend and
65 * timedreceive below if QUEUE_NONBLOCKING is specified
66 *
67 */
68 handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL, 0666, &att);
69
70 // If queue already exists, then unlink it and try again.
71 if (-1 == handle) {
72 switch (errno) {
73 case EEXIST:
74 (void)mq_unlink(this->m_name.toChar());
75 break;
76 default:
78 }
79
80 handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL, 0666, &att);
81
82 if (-1 == handle) {
84 }
85 }
86
87 // Set up queue handle:
88 QueueHandle* queueHandle = new(std::nothrow) QueueHandle(handle);
89 if (nullptr == queueHandle) {
91 }
92 this->m_handle = reinterpret_cast<POINTER_CAST>(queueHandle);
93
95
96 return QUEUE_OK;
97 }
98
100 // Clean up the queue handle:
101 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
102 if (nullptr != queueHandle) {
103 delete queueHandle;
104 }
105 this->m_handle = reinterpret_cast<POINTER_CAST>(nullptr); // important so base Queue class doesn't free it
106 (void) mq_unlink(this->m_name.toChar());
107 }
108
109 Queue::QueueStatus IPCQueue::send(const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority, QueueBlocking block) {
110
111 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
112 mqd_t handle = queueHandle->handle;
113
114 if (-1 == handle) {
115 return QUEUE_UNINITIALIZED;
116 }
117
118 if (nullptr == buffer) {
119 return QUEUE_EMPTY_BUFFER;
120 }
121
122 bool keepTrying = true;
123 while (keepTrying) {
124 struct timeval now;
125 gettimeofday(&now,nullptr);
126 struct timespec wait;
127 wait.tv_sec = now.tv_sec;
128 wait.tv_nsec = now.tv_usec * 1000;
129
130 if (block == QUEUE_BLOCKING) {
131 wait.tv_sec += IPC_QUEUE_TIMEOUT_SEC;
132 }
133 NATIVE_INT_TYPE stat = mq_timedsend(handle, reinterpret_cast<const char*>(buffer), size, priority, &wait);
134 if (-1 == stat) {
135 switch (errno) {
136 case EINTR:
137 continue;
138 case EMSGSIZE:
139 return QUEUE_SIZE_MISMATCH;
140 case EINVAL:
142 case ETIMEDOUT:
143 if (block == QUEUE_NONBLOCKING) {
144 // no more messages. If we are
145 // non-blocking, return
146 return QUEUE_FULL;
147 } else {
148 // TODO(mereweth) - multiprocess signalling necessary?
149 // Go to sleep until we receive a signal that something was taken off the queue
150 continue;
151 }
152 default:
153 return QUEUE_UNKNOWN_ERROR;
154 }
155 } else {
156 keepTrying=false;
157 // TODO(mereweth) - multiprocess signalling necessary?
158 // Wake up a thread that might be waiting on the other end of the queue:
159 }
160 }
161
162 return QUEUE_OK;
163 }
164
165 Queue::QueueStatus IPCQueue::receive(U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority, QueueBlocking block) {
166
167 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
168 mqd_t handle = queueHandle->handle;
169
170 if (-1 == handle) {
171 return QUEUE_UNINITIALIZED;
172 }
173
174 ssize_t size;
175 bool notFinished = true;
176 while (notFinished) {
177 struct timeval now;
178 gettimeofday(&now,nullptr);
179 struct timespec wait;
180 wait.tv_sec = now.tv_sec;
181 wait.tv_nsec = now.tv_usec * 1000;
182
183 if (block == QUEUE_BLOCKING) {
184 wait.tv_sec += IPC_QUEUE_TIMEOUT_SEC;
185 }
186 size = mq_timedreceive(handle, reinterpret_cast<char*>(buffer), static_cast<size_t>(capacity),
187#ifdef TGT_OS_TYPE_VXWORKS
188 reinterpret_cast<int*>(&priority), &wait);
189#else
190 reinterpret_cast<unsigned int*>(&priority), &wait);
191#endif
192
193 if (-1 == size) { // error
194 switch (errno) {
195 case EINTR:
196 continue;
197 case EMSGSIZE:
198 return QUEUE_SIZE_MISMATCH;
199 case ETIMEDOUT:
200 if (block == QUEUE_NONBLOCKING) {
201 // no more messages. If we are
202 // non-blocking, return
203 return QUEUE_NO_MORE_MSGS;
204 } else {
205 // TODO(mereweth) - multiprocess signalling necessary?
206 // Go to sleep until we receive a signal that something was put on the queue:
207 continue;
208 }
209 break;
210 default:
211 return QUEUE_UNKNOWN_ERROR;
212 }
213 }
214 else {
215 notFinished = false;
216 // TODO(mereweth) - multiprocess signalling necessary?
217 // Wake up a thread that might be waiting on the other end of the queue
218 }
219 }
220
221 actualSize = static_cast<NATIVE_INT_TYPE>(size);
222 return QUEUE_OK;
223 }
224
226 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
227 mqd_t handle = queueHandle->handle;
228
229 struct mq_attr attr;
230 int status = mq_getattr(handle, &attr);
231 FW_ASSERT(status == 0);
232 return static_cast<U32>(attr.mq_curmsgs);
233 }
234
236 //FW_ASSERT(0);
237 return 0;
238 }
239
241 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
242 mqd_t handle = queueHandle->handle;
243
244 struct mq_attr attr;
245 int status = mq_getattr(handle, &attr);
246 FW_ASSERT(status == 0);
247 return static_cast<U32>(attr.mq_maxmsg);
248 }
249
251 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
252 mqd_t handle = queueHandle->handle;
253
254 struct mq_attr attr;
255 int status = mq_getattr(handle, &attr);
256 FW_ASSERT(status == 0);
257 return static_cast<U32>(attr.mq_msgsize);
258 }
259
260}
#define FW_ASSERT(...)
Definition Assert.hpp:7
PlatformPointerCastType POINTER_CAST
Definition BasicTypes.h:53
PlatformIntType NATIVE_INT_TYPE
Definition BasicTypes.h:51
uint8_t U8
8-bit unsigned integer
Definition BasicTypes.h:26
#define IPC_QUEUE_TIMEOUT_SEC
Definition IPCQueue.cpp:24
NATIVE_INT_TYPE getQueueSize() const
get the queue depth (maximum number of messages queue can hold)
NATIVE_INT_TYPE getMsgSize() const
get the message size (maximum message size queue can hold)
QueueStatus create(const Fw::StringBase &name, NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
create a message queue
QueueStatus send(const Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE priority, QueueBlocking block)
send a message
NATIVE_INT_TYPE getNumMsgs() const
get the number of messages in the queue
QueueStatus receive(Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE &priority, QueueBlocking block)
receive a message
NATIVE_INT_TYPE getMaxMsgs() const
get the maximum number of messages (high watermark)
QueueHandle(mqd_t m_handle)
Definition IPCQueue.cpp:30
QueueString m_name
queue name
Definition Queue.hpp:76
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition Queue.hpp:31
@ QUEUE_UNINITIALIZED
Queue wasn't initialized successfully.
Definition Queue.hpp:30
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition Queue.hpp:29
@ QUEUE_INVALID_PRIORITY
invalid priority requested
Definition Queue.hpp:34
@ QUEUE_EMPTY_BUFFER
supplied buffer is empty
Definition Queue.hpp:35
@ QUEUE_UNKNOWN_ERROR
Unexpected error; can't match with returns.
Definition Queue.hpp:37
@ QUEUE_OK
message sent/received okay
Definition Queue.hpp:28
@ QUEUE_FULL
queue was full when attempting to send a message
Definition Queue.hpp:36
@ QUEUE_BLOCKING
Queue receive blocks until a message arrives.
Definition Queue.hpp:41
@ QUEUE_NONBLOCKING
Queue receive always returns even if there is no message.
Definition Queue.hpp:42
static NATIVE_INT_TYPE s_numQueues
tracks number of queues in the system
Definition Queue.hpp:80
POINTER_CAST m_handle
handle for implementation specific queue
Definition Queue.hpp:75
Definition File.cpp:6