F´ Flight Software - C/C++ Documentation NASA-v1.6.0
A framework for building embedded system applications to NASA flight quality standards.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Queue.cpp
Go to the documentation of this file.
1#include <Fw/Types/Assert.hpp>
2#include <Os/Queue.hpp>
3
4#ifdef TGT_OS_TYPE_VXWORKS
5 #include <vxWorks.h>
6#endif
7
8#ifdef TGT_OS_TYPE_LINUX
9 #include <sys/types.h>
10 #include <unistd.h>
11#endif
12
13#include <mqueue.h>
14#include <fcntl.h>
15#include <cerrno>
16#include <cstring>
17#include <cstdio>
18#include <ctime>
19#include <pthread.h>
20#include <new>
21
22namespace Os {
23
24 class QueueHandle {
25 public:
26 QueueHandle(mqd_t m_handle) {
27 // Initialize the handle:
28 int ret;
29 ret = pthread_cond_init(&this->queueNotEmpty, nullptr);
30 FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
31 ret = pthread_cond_init(&this->queueNotFull, nullptr);
32 FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
33 ret = pthread_mutex_init(&this->mp, nullptr);
34 FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
35 this->handle = m_handle;
36 }
38 // Destroy the handle:
39 if (-1 != this->handle) {
40 (void) mq_close(this->handle);
41 }
42 (void) pthread_cond_destroy(&this->queueNotEmpty);
43 (void) pthread_mutex_destroy(&this->mp);
44 }
45 mqd_t handle;
46 pthread_cond_t queueNotEmpty;
47 pthread_cond_t queueNotFull;
48 pthread_mutex_t mp;
49 };
50
51 Queue::Queue() :
52 m_handle(-1) {
53 }
54
56
57 this->m_name = "/QP_";
58 this->m_name += name;
59#ifndef TGT_OS_TYPE_VXWORKS
60 char pid[40];
61 (void)snprintf(pid,sizeof(pid),".%d",getpid());
62 pid[sizeof(pid)-1] = 0;
63 this->m_name += pid;
64#endif
65 mq_attr att;
66 mqd_t handle;
67
68 memset(&att,0,sizeof(att));
69 att.mq_maxmsg = depth;
70 att.mq_msgsize = msgSize;
71 att.mq_flags = 0;
72 att.mq_curmsgs = 0;
73
74 handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL | O_NONBLOCK, 0666, &att);
75
76 // If queue already exists, then unlink it and try again.
77 if (-1 == handle) {
78 switch (errno) {
79 case EEXIST:
80 (void)mq_unlink(this->m_name.toChar());
81 break;
82 default:
84 }
85
86 handle = mq_open(this->m_name.toChar(), O_RDWR | O_CREAT | O_EXCL, 0666, &att);
87
88 if (-1 == handle) {
90 }
91 }
92
93 // Set up queue handle:
94 QueueHandle* queueHandle = new(std::nothrow) QueueHandle(handle);
95 if (nullptr == queueHandle) {
97 }
98 this->m_handle = reinterpret_cast<POINTER_CAST>(queueHandle);
99
101
102 return QUEUE_OK;
103 }
104
105 Queue::~Queue() {
106 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
107 delete queueHandle;
108 (void) mq_unlink(this->m_name.toChar());
109 }
110
111 Queue::QueueStatus Queue::send(const U8* buffer, NATIVE_INT_TYPE size, NATIVE_INT_TYPE priority, QueueBlocking block) {
112
113 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
114 mqd_t handle = queueHandle->handle;
115 pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
116 pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
117 pthread_mutex_t* mp = &queueHandle->mp;
118
119 if (-1 == handle) {
120 return QUEUE_UNINITIALIZED;
121 }
122
123 if (nullptr == buffer) {
124 return QUEUE_EMPTY_BUFFER;
125 }
126
127 bool keepTrying = true;
128 int ret;
129 while (keepTrying) {
130 NATIVE_INT_TYPE stat = mq_send(handle, reinterpret_cast<const char*>(buffer), size, priority);
131 if (-1 == stat) {
132 switch (errno) {
133 case EINTR:
134 continue;
135 case EMSGSIZE:
136 return QUEUE_SIZE_MISMATCH;
137 case EINVAL:
139 case EAGAIN:
140 if (block == QUEUE_NONBLOCKING) {
141 // no more messages. If we are
142 // non-blocking, return
143 return QUEUE_FULL;
144 } else {
145 // Go to sleep until we receive a signal that something was taken off the queue:
146 // Note: pthread_cont_wait must be called "with mutex locked by the calling
147 // thread or undefined behavior results." - from the docs
148 ret = pthread_mutex_lock(mp);
149 FW_ASSERT(ret == 0, errno);
150 ret = pthread_cond_wait(queueNotFull, mp);
151 FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
152 ret = pthread_mutex_unlock(mp);
153 FW_ASSERT(ret == 0, errno);
154 continue;
155 }
156 default:
157 return QUEUE_UNKNOWN_ERROR;
158 }
159 } else {
160 keepTrying=false;
161 // Wake up a thread that might be waiting on the other end of the queue:
162 ret = pthread_cond_signal(queueNotEmpty);
163 FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
164 }
165 }
166
167 return QUEUE_OK;
168 }
169
170 Queue::QueueStatus Queue::receive(U8* buffer, NATIVE_INT_TYPE capacity, NATIVE_INT_TYPE &actualSize, NATIVE_INT_TYPE &priority, QueueBlocking block) {
171
172 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
173 mqd_t handle = queueHandle->handle;
174 pthread_cond_t* queueNotEmpty = &queueHandle->queueNotEmpty;
175 pthread_cond_t* queueNotFull = &queueHandle->queueNotFull;
176 pthread_mutex_t* mp = &queueHandle->mp;
177
178 if (-1 == handle) {
179 return QUEUE_UNINITIALIZED;
180 }
181
182 ssize_t size;
183 int ret;
184 bool notFinished = true;
185 while (notFinished) {
186 size = mq_receive(handle, static_cast<char*>(buffer), static_cast<size_t>(capacity),
187#ifdef TGT_OS_TYPE_VXWORKS
188 reinterpret_cast<int*>(&priority));
189#else
190 reinterpret_cast<unsigned int*>(&priority));
191#endif
192
193 if (-1 == size) { // error
194 switch (errno) {
195 case EINTR:
196 continue;
197 case EMSGSIZE:
198 return QUEUE_SIZE_MISMATCH;
199 case EAGAIN:
200 if (block == QUEUE_NONBLOCKING) {
201 // no more messages. If we are
202 // non-blocking, return
203 return QUEUE_NO_MORE_MSGS;
204 } else {
205 // Go to sleep until we receive a signal that something was put on the queue:
206 // Note: pthread_cont_wait must be called "with mutex locked by the calling
207 // thread or undefined behavior results." - from the docs
208 ret = pthread_mutex_lock(mp);
209 FW_ASSERT(ret == 0, errno);
210 ret = pthread_cond_wait(queueNotEmpty, mp);
211 FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
212 ret = pthread_mutex_unlock(mp);
213 FW_ASSERT(ret == 0, errno);
214 continue;
215 }
216 break;
217 default:
218 return QUEUE_UNKNOWN_ERROR;
219 }
220 }
221 else {
222 notFinished = false;
223 // Wake up a thread that might be waiting on the other end of the queue:
224 ret = pthread_cond_signal(queueNotFull);
225 FW_ASSERT(ret == 0, ret); // If this fails, something horrible happened.
226 }
227 }
228
229 actualSize = static_cast<NATIVE_INT_TYPE>(size);
230 return QUEUE_OK;
231 }
232
234 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
235 mqd_t handle = queueHandle->handle;
236
237 struct mq_attr attr;
238 int status = mq_getattr(handle, &attr);
239 FW_ASSERT(status == 0);
240 return static_cast<U32>(attr.mq_curmsgs);
241 }
242
244 //FW_ASSERT(0);
245 return 0;
246 }
247
249 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
250 mqd_t handle = queueHandle->handle;
251
252 struct mq_attr attr;
253 int status = mq_getattr(handle, &attr);
254 FW_ASSERT(status == 0);
255 return static_cast<U32>(attr.mq_maxmsg);
256 }
257
259 QueueHandle* queueHandle = reinterpret_cast<QueueHandle*>(this->m_handle);
260 mqd_t handle = queueHandle->handle;
261
262 struct mq_attr attr;
263 int status = mq_getattr(handle, &attr);
264 FW_ASSERT(status == 0);
265 return static_cast<U32>(attr.mq_msgsize);
266 }
267
268}
269
#define FW_ASSERT(...)
Definition Assert.hpp:7
PlatformPointerCastType POINTER_CAST
Definition BasicTypes.h:53
PlatformIntType NATIVE_INT_TYPE
Definition BasicTypes.h:51
uint8_t U8
8-bit unsigned integer
Definition BasicTypes.h:26
pthread_cond_t queueNotEmpty
pthread_mutex_t mp
Definition Queue.cpp:48
QueueHandle(mqd_t m_handle)
Definition Queue.cpp:26
pthread_cond_t queueNotFull
QueueString m_name
queue name
Definition Queue.hpp:76
NATIVE_INT_TYPE getMsgSize() const
get the message size (maximum message size queue can hold)
Definition Queue.cpp:244
@ QUEUE_SIZE_MISMATCH
attempted to send or receive with buffer too large, too small
Definition Queue.hpp:31
@ QUEUE_UNINITIALIZED
Queue wasn't initialized successfully.
Definition Queue.hpp:30
@ QUEUE_NO_MORE_MSGS
If non-blocking, all the messages have been drained.
Definition Queue.hpp:29
@ QUEUE_INVALID_PRIORITY
invalid priority requested
Definition Queue.hpp:34
@ QUEUE_EMPTY_BUFFER
supplied buffer is empty
Definition Queue.hpp:35
@ QUEUE_UNKNOWN_ERROR
Unexpected error; can't match with returns.
Definition Queue.hpp:37
@ QUEUE_OK
message sent/received okay
Definition Queue.hpp:28
@ QUEUE_FULL
queue was full when attempting to send a message
Definition Queue.hpp:36
QueueStatus send(const Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE priority, QueueBlocking block)
send a message
QueueStatus createInternal(const Fw::StringBase &name, NATIVE_INT_TYPE depth, NATIVE_INT_TYPE msgSize)
create a message queue
Definition Queue.cpp:42
QueueStatus receive(Fw::SerializeBufferBase &buffer, NATIVE_INT_TYPE &priority, QueueBlocking block)
receive a message
NATIVE_INT_TYPE getQueueSize() const
get the queue depth (maximum number of messages queue can hold)
Definition Queue.cpp:233
NATIVE_INT_TYPE getMaxMsgs() const
get the maximum number of messages (high watermark)
Definition Queue.cpp:222
@ QUEUE_NONBLOCKING
Queue receive always returns even if there is no message.
Definition Queue.hpp:42
NATIVE_INT_TYPE getNumMsgs() const
get the number of messages in the queue
Definition Queue.cpp:211
virtual ~Queue()
Definition Queue.cpp:65
static NATIVE_INT_TYPE s_numQueues
tracks number of queues in the system
Definition Queue.hpp:80
POINTER_CAST m_handle
handle for implementation specific queue
Definition Queue.hpp:75
Definition File.cpp:6