F´ Flight Software - C/C++ Documentation NASA-v1.6.0
A framework for building embedded system applications to NASA flight quality standards.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ComQueue.cpp
Go to the documentation of this file.
1// ======================================================================
2// \title ComQueue.cpp
3// \author vbai
4// \brief cpp file for ComQueue component implementation class
5// ======================================================================
6
7#include <Fw/Types/Assert.hpp>
10
11namespace Svc {
12
13// ----------------------------------------------------------------------
14// Construction, initialization, and destruction
15// ----------------------------------------------------------------------
16
17ComQueue ::QueueConfigurationTable ::QueueConfigurationTable() {
18 for (NATIVE_UINT_TYPE i = 0; i < FW_NUM_ARRAY_ELEMENTS(this->entries); i++) {
19 this->entries[i].priority = 0;
20 this->entries[i].depth = 0;
21 }
22}
23
24ComQueue ::ComQueue(const char* const compName)
25 : ComQueueComponentBase(compName),
26 m_state(WAITING),
27 m_allocationId(-1),
28 m_allocator(nullptr),
29 m_allocation(nullptr) {
30 // Initialize throttles to "off"
31 for (NATIVE_UINT_TYPE i = 0; i < TOTAL_PORT_COUNT; i++) {
32 this->m_throttle[i] = false;
33 }
34}
35
36ComQueue ::~ComQueue() {}
37
38void ComQueue ::init(const NATIVE_INT_TYPE queueDepth, const NATIVE_INT_TYPE instance) {
39 ComQueueComponentBase::init(queueDepth, instance);
40}
41
42void ComQueue ::cleanup() {
43 // Deallocate memory ignoring error conditions
44 if ((this->m_allocator != nullptr) && (this->m_allocation != nullptr)) {
45 this->m_allocator->deallocate(this->m_allocationId, this->m_allocation);
46 }
47}
48
50 NATIVE_UINT_TYPE allocationId,
51 Fw::MemAllocator& allocator) {
52 FwIndexType currentPriorityIndex = 0;
53 NATIVE_UINT_TYPE totalAllocation = 0;
54
55 // Store/initialize allocator members
56 this->m_allocator = &allocator;
57 this->m_allocationId = allocationId;
58 this->m_allocation = nullptr;
59
60 // Initializes the sorted queue metadata list in priority (sorted) order. This is accomplished by walking the
61 // priority values in priority order from 0 to TOTAL_PORT_COUNT. At each priory value, the supplied queue
62 // configuration table is walked and any entry matching the current priority values is used to add queue metadata to
63 // the prioritized list. This results in priority-sorted queue metadata objects that index back into the unsorted
64 // queue data structures.
65 //
66 // The total allocation size is tracked for passing to the allocation call and is a summation of
67 // (depth * message size) for each prioritized metadata object of (depth * message size)
68 for (FwIndexType currentPriority = 0; currentPriority < TOTAL_PORT_COUNT; currentPriority++) {
69 // Walk each queue configuration entry and add them into the prioritized metadata list when matching the current
70 // priority value
71 for (NATIVE_UINT_TYPE entryIndex = 0; entryIndex < FW_NUM_ARRAY_ELEMENTS(queueConfig.entries); entryIndex++) {
72 // Check for valid configuration entry
73 FW_ASSERT(queueConfig.entries[entryIndex].priority < TOTAL_PORT_COUNT,
74 queueConfig.entries[entryIndex].priority, TOTAL_PORT_COUNT, entryIndex);
75
76 if (currentPriority == queueConfig.entries[entryIndex].priority) {
77 // Set up the queue metadata object in order to track priority, depth, index into the queue list of the
78 // backing queue object, and message size. Both index and message size are calculated where priority and
79 // depth are copied from the configuration object.
80 QueueMetadata& entry = this->m_prioritizedList[currentPriorityIndex];
81 entry.priority = queueConfig.entries[entryIndex].priority;
82 entry.depth = queueConfig.entries[entryIndex].depth;
83 entry.index = entryIndex;
84 // Message size is determined by the type of object being stored, which in turn is determined by the
85 // index of the entry. Those lower than COM_PORT_COUNT are Fw::ComBuffers and those larger Fw::Buffer.
86 entry.msgSize = (entryIndex < COM_PORT_COUNT) ? sizeof(Fw::ComBuffer) : sizeof(Fw::Buffer);
87 totalAllocation += entry.depth * entry.msgSize;
88 currentPriorityIndex++;
89 }
90 }
91 }
92 // Allocate a single chunk of memory from the memory allocator. Memory recover is neither needed nor used.
93 bool recoverable = false;
94 this->m_allocation = this->m_allocator->allocate(this->m_allocationId, totalAllocation, recoverable);
95
96 // Each of the backing queue objects must be supplied memory to store the queued messages. These data regions are
97 // sub-portions of the total allocated data. This memory is passed out by looping through each queue in prioritized
98 // order and passing out the memory to each queue's setup method.
99 FwSizeType allocationOffset = 0;
100 for (FwIndexType i = 0; i < TOTAL_PORT_COUNT; i++) {
101 // Get current queue's allocation size and safety check the values
102 FwSizeType allocationSize = this->m_prioritizedList[i].depth * this->m_prioritizedList[i].msgSize;
103 FW_ASSERT(this->m_prioritizedList[i].index < static_cast<FwIndexType>(FW_NUM_ARRAY_ELEMENTS(this->m_queues)),
104 this->m_prioritizedList[i].index);
105 FW_ASSERT((allocationSize + allocationOffset) <= totalAllocation, allocationSize, allocationOffset,
106 totalAllocation);
107
108 // Setup queue's memory allocation, depth, and message size. Setup is skipped for a depth 0 queue
109 if (allocationSize > 0) {
110 this->m_queues[this->m_prioritizedList[i].index].setup(
111 reinterpret_cast<U8*>(this->m_allocation) + allocationOffset, allocationSize,
112 this->m_prioritizedList[i].depth, this->m_prioritizedList[i].msgSize);
113 }
114 allocationOffset += allocationSize;
115 }
116 // Safety check that all memory was used as expected
117 FW_ASSERT(allocationOffset == totalAllocation, allocationOffset, totalAllocation);
118}
119// ----------------------------------------------------------------------
120// Handler implementations for user-defined typed input ports
121// ----------------------------------------------------------------------
122
123void ComQueue::comQueueIn_handler(const NATIVE_INT_TYPE portNum, Fw::ComBuffer& data, U32 context) {
124 // Ensure that the port number of comQueueIn is consistent with the expectation
125 FW_ASSERT(portNum >= 0 && portNum < COM_PORT_COUNT, portNum);
126 this->enqueue(portNum, QueueType::COM_QUEUE, reinterpret_cast<const U8*>(&data), sizeof(Fw::ComBuffer));
127}
128
129void ComQueue::buffQueueIn_handler(const NATIVE_INT_TYPE portNum, Fw::Buffer& fwBuffer) {
130 const NATIVE_INT_TYPE queueNum = portNum + COM_PORT_COUNT;
131 // Ensure that the port number of buffQueueIn is consistent with the expectation
132 FW_ASSERT(portNum >= 0 && portNum < BUFFER_PORT_COUNT, portNum);
133 FW_ASSERT(queueNum < TOTAL_PORT_COUNT);
134 this->enqueue(queueNum, QueueType::BUFFER_QUEUE, reinterpret_cast<const U8*>(&fwBuffer), sizeof(Fw::Buffer));
135}
136
137void ComQueue::comStatusIn_handler(const NATIVE_INT_TYPE portNum, Fw::Success& condition) {
138 switch (this->m_state) {
139 // On success, the queue should be processed. On failure, the component should still wait.
140 case WAITING:
141 if (condition.e == Fw::Success::SUCCESS) {
142 this->m_state = READY;
143 this->processQueue();
144 // A message may or may not be sent. Thus, READY or WAITING are acceptable final states.
145 FW_ASSERT((this->m_state == WAITING || this->m_state == READY), this->m_state);
146 } else {
147 this->m_state = WAITING;
148 }
149 break;
150 // Both READY and unknown states should not be possible at this point. To receive a status message we must be
151 // one of the WAITING or RETRY states.
152 default:
153 FW_ASSERT(0, this->m_state);
154 break;
155 }
156}
157
158void ComQueue::run_handler(const NATIVE_INT_TYPE portNum, NATIVE_UINT_TYPE context) {
159 // Downlink the high-water marks for the Fw::ComBuffer array types
160 ComQueueDepth comQueueDepth;
161 for (FwSizeType i = 0; i < comQueueDepth.SIZE; i++) {
162 comQueueDepth[i] = this->m_queues[i].get_high_water_mark();
163 this->m_queues[i].clear_high_water_mark();
164 }
165 this->tlmWrite_comQueueDepth(comQueueDepth);
166
167 // Downlink the high-water marks for the Fw::Buffer array types
168 BuffQueueDepth buffQueueDepth;
169 for (FwSizeType i = 0; i < buffQueueDepth.SIZE; i++) {
170 buffQueueDepth[i] = this->m_queues[i + COM_PORT_COUNT].get_high_water_mark();
171 this->m_queues[i + COM_PORT_COUNT].clear_high_water_mark();
172 }
173 this->tlmWrite_buffQueueDepth(buffQueueDepth);
174}
175
176// ----------------------------------------------------------------------
177// Private helper methods
178// ----------------------------------------------------------------------
179
180void ComQueue::enqueue(const FwIndexType queueNum, QueueType queueType, const U8* data, const FwSizeType size) {
181 // Enqueue the given message onto the matching queue. When no space is available then emit the queue overflow event,
182 // set the appropriate throttle, and move on. Will assert if passed a message for a depth 0 queue.
183 const FwSizeType expectedSize = (queueType == QueueType::COM_QUEUE) ? sizeof(Fw::ComBuffer) : sizeof(Fw::Buffer);
184 const FwIndexType portNum = queueNum - ((queueType == QueueType::COM_QUEUE) ? 0 : COM_PORT_COUNT);
185 FW_ASSERT(expectedSize == size, size, expectedSize);
186 FW_ASSERT(portNum >= 0, portNum);
187 Fw::SerializeStatus status = this->m_queues[queueNum].enqueue(data, size);
188 if (status == Fw::FW_SERIALIZE_NO_ROOM_LEFT && !this->m_throttle[queueNum]) {
189 this->log_WARNING_HI_QueueOverflow(queueType, portNum);
190 this->m_throttle[queueNum] = true;
191 }
192 // When the component is already in READY state process the queue to send out the next available message immediately
193 if (this->m_state == READY) {
194 this->processQueue();
195 }
196}
197
198void ComQueue::sendComBuffer(Fw::ComBuffer& comBuffer) {
199 FW_ASSERT(this->m_state == READY);
200 this->comQueueSend_out(0, comBuffer, 0);
201 this->m_state = WAITING;
202}
203
204void ComQueue::sendBuffer(Fw::Buffer& buffer) {
205 // Retry buffer expected to be cleared as we are either transferring ownership or have already deallocated it.
206 FW_ASSERT(this->m_state == READY);
207 this->buffQueueSend_out(0, buffer);
208 this->m_state = WAITING;
209}
210
211void ComQueue::processQueue() {
212 FwIndexType priorityIndex = 0;
213 FwIndexType sendPriority = 0;
214 // Check that we are in the appropriate state
215 FW_ASSERT(this->m_state == READY);
216
217 // Walk all the queues in priority order. Send the first message that is available in priority order. No balancing
218 // is done within this loop.
219 for (priorityIndex = 0; priorityIndex < TOTAL_PORT_COUNT; priorityIndex++) {
220 QueueMetadata& entry = this->m_prioritizedList[priorityIndex];
221 Types::Queue& queue = this->m_queues[entry.index];
222
223 // Continue onto next prioritized queue if there is no items in the current queue
224 if (queue.getQueueSize() == 0) {
225 continue;
226 }
227
228 // Send out the message based on the type
229 if (entry.index < COM_PORT_COUNT) {
230 Fw::ComBuffer comBuffer;
231 queue.dequeue(reinterpret_cast<U8*>(&comBuffer), sizeof(comBuffer));
232 this->sendComBuffer(comBuffer);
233 } else {
234 Fw::Buffer buffer;
235 queue.dequeue(reinterpret_cast<U8*>(&buffer), sizeof(buffer));
236 this->sendBuffer(buffer);
237 }
238
239 // Update the throttle and the index that was just sent
240 this->m_throttle[entry.index] = false;
241
242 // Priority used in the next loop
243 sendPriority = entry.priority;
244 break;
245 }
246
247 // Starting on the priority entry after the one dispatched and continuing through the end of the set of entries that
248 // share the same priority, rotate those entries such that the currently dispatched queue is last and the rest are
249 // shifted up by one. This effectively round-robins the queues of the same priority.
250 for (priorityIndex++;
251 priorityIndex < TOTAL_PORT_COUNT && (this->m_prioritizedList[priorityIndex].priority == sendPriority);
252 priorityIndex++) {
253 // Swap the previous entry with this one.
254 QueueMetadata temp = this->m_prioritizedList[priorityIndex];
255 this->m_prioritizedList[priorityIndex] = this->m_prioritizedList[priorityIndex - 1];
256 this->m_prioritizedList[priorityIndex - 1] = temp;
257 }
258}
259} // end namespace Svc
#define FW_ASSERT(...)
Definition Assert.hpp:7
PlatformIntType NATIVE_INT_TYPE
Definition BasicTypes.h:51
uint8_t U8
8-bit unsigned integer
Definition BasicTypes.h:26
#define FW_NUM_ARRAY_ELEMENTS(a)
number of elements in an array
Definition BasicTypes.h:66
PlatformUIntType NATIVE_UINT_TYPE
Definition BasicTypes.h:52
C++ header for working with basic fprime types.
PlatformSizeType FwSizeType
Definition FpConfig.h:18
PlatformIndexType FwIndexType
Definition FpConfig.h:15
virtual void * allocate(const NATIVE_UINT_TYPE identifier, NATIVE_UINT_TYPE &size, bool &recoverable)=0
Allocate memory.
void init()
Object initializer.
Definition ObjBase.cpp:27
Success/Failure.
T e
The raw enum value.
@ SUCCESS
Representing success.
Auto-generated base for ComQueue component.
void buffQueueSend_out(NATIVE_INT_TYPE portNum, Fw::Buffer &fwBuffer)
Invoke output port buffQueueSend.
void comQueueSend_out(NATIVE_INT_TYPE portNum, Fw::ComBuffer &data, U32 context)
Invoke output port comQueueSend.
void tlmWrite_comQueueDepth(const Svc::ComQueueDepth &arg, Fw::Time _tlmTime=Fw::Time())
void tlmWrite_buffQueueDepth(const Svc::BuffQueueDepth &arg, Fw::Time _tlmTime=Fw::Time())
void log_WARNING_HI_QueueOverflow(Svc::QueueType queueType, U32 index)
void configure(QueueConfigurationTable queueConfig, NATIVE_UINT_TYPE allocationId, Fw::MemAllocator &allocator)
Definition ComQueue.cpp:49
static const FwIndexType BUFFER_PORT_COUNT
Total count of input buffer ports and thus total queues.
Definition ComQueue.hpp:29
static const FwIndexType COM_PORT_COUNT
< Count of Fw::Com input ports and thus Fw::Com queues
Definition ComQueue.hpp:26
static const FwIndexType TOTAL_PORT_COUNT
Definition ComQueue.hpp:32
NATIVE_UINT_TYPE get_high_water_mark() const
Definition Queue.cpp:41
Fw::SerializeStatus enqueue(const U8 *const message, const FwSizeType size)
pushes a fixed-size message onto the back of the queue
Definition Queue.cpp:25
void setup(U8 *const storage, const FwSizeType storage_size, const FwSizeType depth, const FwSizeType message_size)
setup the queue object to setup storage
Definition Queue.cpp:17
void clear_high_water_mark()
Definition Queue.cpp:46
NATIVE_UINT_TYPE getQueueSize() const
Definition Queue.cpp:50
Fw::SerializeStatus dequeue(U8 *const message, const FwSizeType size)
pops a fixed-size message off the front of the queue
Definition Queue.cpp:31
SerializeStatus
forward declaration for string
@ FW_SERIALIZE_NO_ROOM_LEFT
No room left in the buffer to serialize data.
FwIndexType priority
Priority of the queue [0, TOTAL_PORT_COUNT)
Definition ComQueue.hpp:47
FwSizeType depth
Depth of the queue [0, infinity)
Definition ComQueue.hpp:46
configuration table for each queue
Definition ComQueue.hpp:59
QueueConfigurationEntry entries[TOTAL_PORT_COUNT]
Definition ComQueue.hpp:60