F´ Flight Software - C/C++ Documentation NASA-v1.6.0
A framework for building embedded system applications to NASA flight quality standards.
Loading...
Searching...
No Matches
Queue.cpp
Go to the documentation of this file.
1#include <Fw/Types/Assert.hpp>
2#include <Os/Queue.hpp>
3
4#ifdef TGT_OS_TYPE_VXWORKS
5 #include <vxWorks.h>
6#endif
7
8#ifdef TGT_OS_TYPE_LINUX
9 #include <sys/types.h>
10 #include <unistd.h>
11#endif
12
13#include <mqueue.h>
14#include <fcntl.h>
15#include <cerrno>
16#include <cstring>
17#include <cstdio>
18#include <ctime>
19#include <pthread.h>
20#include <new>
21
22namespace Os {
23
24 class QueueHandle {
25 public:
26 QueueHandle(mqd_t m_handle) {
27 // Initialize the handle:
28 int ret;
29 ret = pthread_cond_init(&this->queueNotEmpty, nullptr);
30 FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
31 ret = pthread_cond_init(&this->queueNotFull, nullptr);
32 FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
33 ret = pthread_mutex_init(&this->mp, nullptr);
34 FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
35 this->handle = m_handle;
36 }
38 // Destroy the handle:
39 if (-1 != this->handle) {
40 (void) mq_close(this->handle);
41 }
42 (void) pthread_cond_destroy(&this->queueNotEmpty);
43 (void) pthread_mutex_destroy(&this->mp);
44 }
45 mqd_t handle;
46 pthread_cond_t queueNotEmpty;
47 pthread_cond_t queueNotFull;
48 pthread_mutex_t mp;
49 };
50
51 Queue::Queue() :
52 m_handle(-1) {
53 }
54
56
57 this->m_name = "/QP_";
58 this->m_name += name;
59#ifndef TGT_OS_TYPE_VXWORKS
60 char pid[40];
61 (void)snprintf(pid,sizeof(pid),".%d",getpid());
62 pid[sizeof(pid)-1] = 0;
63 this->m_name += pid;
64#endif
65 mq_attr att;
66 mqd_t handle;
67
68 memset(&att,0,sizeof(att));
69 att.mq_maxmsg = depth;
70 att.mq_msgsize = msgSize;
71 att.mq_flags = 0;
72 att.mq_curmsgs = 0;
73
74 handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL | O_NONBLOCK, 0666, &att);
75
76 // If queue already exists, then unlink it and try again.
77 if (-1 == handle) {
78 switch (errno) {
79 case EEXIST:
80 (void)mq_unlink(this->m_name.toChar());
81 break;
82 default:
84 }
85
86 handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL, 0666, &att);
87
88 if (-1 == handle) {
90 }
91 }
92
93 // Set up queue handle:
94 QueueHandle* queueHandle = new(std::nothrow) QueueHandle(handle);
95 if (nullptr == queueHandle) {
97 }
98 this->m_handle = reinterpret_cast<POINTER_CAST>(queueHandle);
99
101
102 return QUEUE_OK;
103 }
104
105 Queue::~Queue() {
106 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
107 delete queueHandle;
108 (void) mq_unlink(this->m_name.toChar());
109 }
110
111 Queue::QueueStatus Queue::send(const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority, QueueBlocking block) {
112
113 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
114 mqd_t handle = queueHandle->handle;
115 pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
116 pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
117 pthread_mutex_t* mp = &queueHandle->mp;
118
119 if (-1 == handle) {
120 return QUEUE_UNINITIALIZED;
121 }
122
123 if (nullptr == buffer) {
124 return QUEUE_EMPTY_BUFFER;
125 }
126
127 bool keepTrying = true;
128 int ret;
129 while (keepTrying) {
130 NATIVE_INT_TYPE stat = mq_send(handle, reinterpret_cast<const char*>(buffer), size, priority);
131 if (-1 == stat) {
132 switch (errno) {
133 case EINTR:
134 continue;
135 case EMSGSIZE:
136 return QUEUE_SIZE_MISMATCH;
137 case EINVAL:
139 case EAGAIN:
140 if (block == QUEUE_NONBLOCKING) {
141 // no more messages. If we are
142 // non-blocking, return
143 return QUEUE_FULL;
144 } else {
145 // Go to sleep until we receive a signal that something was taken off the queue:
146 // Note: pthread_cont_wait must be called "with mutex locked by the calling
147 // thread or undefined behavior results." - from the docs
148 ret = pthread_mutex_lock(mp);
149 FW_ASSERT(ret == 0, errno);
150 ret = pthread_cond_wait(queueNotFull, mp);
151 FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
152 ret = pthread_mutex_unlock(mp);
153 FW_ASSERT(ret == 0, errno);
154 continue;
155 }
156 default:
157 return QUEUE_UNKNOWN_ERROR;
158 }
159 } else {
160 keepTrying=false;
161 // Wake up a thread that might be waiting on the other end of the queue:
162 ret = pthread_cond_signal(queueNotEmpty);
163 FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
164 }
165 }
166
167 return QUEUE_OK;
168 }
169
170 Queue::QueueStatus Queue::receive(U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority, QueueBlocking block) {
171
172 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
173 mqd_t handle = queueHandle->handle;
174 pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
175 pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
176 pthread_mutex_t* mp = &queueHandle->mp;
177
178 if (-1 == handle) {
179 return QUEUE_UNINITIALIZED;
180 }
181
182 ssize_t size;
183 int ret;
184 bool notFinished = true;
185 while (notFinished) {
186 size = mq_receive(handle, static_cast<char*>(buffer), static_cast<size_t>(capacity),
187#ifdef TGT_OS_TYPE_VXWORKS
188 reinterpret_cast<int*>(&priority));
189#else
190 reinterpret_cast<unsigned int*>(&priority));
191#endif
192
193 if (-1 == size) { // error
194 switch (errno) {
195 case EINTR:
196 continue;
197 case EMSGSIZE:
198 return QUEUE_SIZE_MISMATCH;
199 case EAGAIN:
200 if (block == QUEUE_NONBLOCKING) {
201 // no more messages. If we are
202 // non-blocking, return
203 return QUEUE_NO_MORE_MSGS;
204 } else {
205 // Go to sleep until we receive a signal that something was put on the queue:
206 // Note: pthread_cont_wait must be called "with mutex locked by the calling
207 // thread or undefined behavior results." - from the docs
208 ret = pthread_mutex_lock(mp);
209 FW_ASSERT(ret == 0, errno);
210 ret = pthread_cond_wait(queueNotEmpty, mp);
211 FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
212 ret = pthread_mutex_unlock(mp);
213 FW_ASSERT(ret == 0, errno);
214 continue;
215 }
216 break;
217 default:
218 return QUEUE_UNKNOWN_ERROR;
219 }
220 }
221 else {
222 notFinished = false;
223 // Wake up a thread that might be waiting on the other end of the queue:
224 ret = pthread_cond_signal(queueNotFull);
225 FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
226 }
227 }
228
229 actualSize = static_cast<NATIVE_INT_TYPE>(size);
230 return QUEUE_OK;
231 }
232
234 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
235 mqd_t handle = queueHandle->handle;
236
237 struct mq_attr attr;
238 int status = mq_getattr(handle, &attr);
239 FW_ASSERT(status == 0);
240 return static_cast<U32>(attr.mq_curmsgs);
241 }
242
244 //FW_ASSERT(0);
245 return 0;
246 }
247
249 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
250 mqd_t handle = queueHandle->handle;
251
252 struct mq_attr attr;
253 int status = mq_getattr(handle, &attr);
254 FW_ASSERT(status == 0);
255 return static_cast<U32>(attr.mq_maxmsg);
256 }
257
259 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
260 mqd_t handle = queueHandle->handle;
261
262 struct mq_attr attr;
263 int status = mq_getattr(handle, &attr);
264 FW_ASSERT(status == 0);
265 return static_cast<U32>(attr.mq_msgsize);
266 }
267
268}
269
#define FW_ASSERT(...)
Definition Assert.hpp:7
PlatformPointerCastType POINTER_CAST
Definition BasicTypes.h:53
PlatformIntType NATIVE_INT_TYPE
Definition BasicTypes.h:51
uint8_t U8
8-bit unsigned integer
Definition BasicTypes.h:26
pthread_cond_t queueNotEmpty
pthread_mutex_t mp
Definition Queue.cpp:48
QueueHandle(mqd_t m_handle)
Definition Queue.cpp:26
pthread_cond_t queueNotFull
QueueString m_name
queue name
Definition Queue.hpp:76
NATIVE_INT_TYPE getMsgSize() const
get the message size (maximum message size queue can hold)
Definition Queue.cpp:244
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition Queue.hpp:31
@ QUEUE_UNINITIALIZED
Queue wasn't initialized successfully.
Definition Queue.hpp:30
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition Queue.hpp:29
@ QUEUE_INVALID_PRIORITY
invalid priority requested
Definition Queue.hpp:34
@ QUEUE_EMPTY_BUFFER
supplied buffer is empty
Definition Queue.hpp:35
@ QUEUE_UNKNOWN_ERROR
Unexpected error; can't match with returns.
Definition Queue.hpp:37
@ QUEUE_OK
message sent/received okay
Definition Queue.hpp:28
@ QUEUE_FULL
queue was full when attempting to send a message
Definition Queue.hpp:36
QueueStatus send(const Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE priority, QueueBlocking block)
send a message
QueueStatus createInternal(const Fw::StringBase &name, NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
create a message queue
Definition Queue.cpp:42
QueueStatus receive(Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE &priority, QueueBlocking block)
receive a message
NATIVE_INT_TYPE getQueueSize() const
get the queue depth (maximum number of messages queue can hold)
Definition Queue.cpp:233
NATIVE_INT_TYPE getMaxMsgs() const
get the maximum number of messages (high watermark)
Definition Queue.cpp:222
@ QUEUE_NONBLOCKING
Queue receive always returns even if there is no message.
Definition Queue.hpp:42
NATIVE_INT_TYPE getNumMsgs() const
get the number of messages in the queue
Definition Queue.cpp:211
virtual ~Queue()
Definition Queue.cpp:65
static NATIVE_INT_TYPE s_numQueues
tracks number of queues in the system
Definition Queue.hpp:80
POINTER_CAST m_handle
handle for implementation specific queue
Definition Queue.hpp:75
Definition File.cpp:6