F´ Flight Software - C/C++ Documentation NASA-v1.6.0
A framework for building embedded system applications to NASA flight quality standards.
Loading...
Searching...
No Matches
ComQueue.cpp
Go to the documentation of this file.
1// ======================================================================
2// \title ComQueue.cpp
3// \author vbai
4// \brief cpp file for ComQueue component implementation class
5// ======================================================================
6
7#include <Fw/Types/Assert.hpp>
10
11namespace Svc {
12
13// ----------------------------------------------------------------------
14// Construction, initialization, and destruction
15// ----------------------------------------------------------------------
16
17ComQueue ::QueueConfigurationTable ::QueueConfigurationTable() {
18 for (NATIVE_UINT_TYPE i = 0; i < FW_NUM_ARRAY_ELEMENTS(this->entries); i++) {
19 this->entries[i].priority = 0;
20 this->entries[i].depth = 0;
21 }
22}
23
24ComQueue ::ComQueue(const char* const compName)
25 : ComQueueComponentBase(compName),
26 m_state(WAITING),
27 m_allocationId(-1),
28 m_allocator(nullptr),
29 m_allocation(nullptr) {
30 // Initialize throttles to "off"
31 for (NATIVE_UINT_TYPE i = 0; i < TOTAL_PORT_COUNT; i++) {
32 this->m_throttle[i] = false;
33 }
34}
35
36ComQueue ::~ComQueue() {}
37
38void ComQueue ::init(const NATIVE_INT_TYPE queueDepth, const NATIVE_INT_TYPE instance) {
39 ComQueueComponentBase::init(queueDepth, instance);
40}
41
42void ComQueue ::cleanup() {
43 // Deallocate memory ignoring error conditions
44 if ((this->m_allocator != nullptr) && (this->m_allocation != nullptr)) {
45 this->m_allocator->deallocate(this->m_allocationId, this->m_allocation);
46 }
47}
48
50 NATIVE_UINT_TYPE allocationId,
51 Fw::MemAllocator& allocator) {
52 FwIndexType currentPriorityIndex = 0;
53 NATIVE_UINT_TYPE totalAllocation = 0;
54
55 // Store/initialize allocator members
56 this->m_allocator = &allocator;
57 this->m_allocationId = allocationId;
58 this->m_allocation = nullptr;
59
60 // Initializes the sorted queue metadata list in priority (sorted) order. This is accomplished by walking the
61 // priority values in priority order from 0 to TOTAL_PORT_COUNT. At each priory value, the supplied queue
62 // configuration table is walked and any entry matching the current priority values is used to add queue metadata to
63 // the prioritized list. This results in priority-sorted queue metadata objects that index back into the unsorted
64 // queue data structures.
65 //
66 // The total allocation size is tracked for passing to the allocation call and is a summation of
67 // (depth * message size) for each prioritized metadata object of (depth * message size)
68 for (FwIndexType currentPriority = 0; currentPriority < TOTAL_PORT_COUNT; currentPriority++) {
69 // Walk each queue configuration entry and add them into the prioritized metadata list when matching the current
70 // priority value
71 for (NATIVE_UINT_TYPE entryIndex = 0; entryIndex < FW_NUM_ARRAY_ELEMENTS(queueConfig.entries); entryIndex++) {
72 // Check for valid configuration entry
73 FW_ASSERT(queueConfig.entries[entryIndex].priority < TOTAL_PORT_COUNT,
74 queueConfig.entries[entryIndex].priority, TOTAL_PORT_COUNT, entryIndex);
75
76 if (currentPriority == queueConfig.entries[entryIndex].priority) {
77 // Set up the queue metadata object in order to track priority, depth, index into the queue list of the
78 // backing queue object, and message size. Both index and message size are calculated where priority and
79 // depth are copied from the configuration object.
80 QueueMetadata& entry = this->m_prioritizedList[currentPriorityIndex];
81 entry.priority = queueConfig.entries[entryIndex].priority;
82 entry.depth = queueConfig.entries[entryIndex].depth;
83 entry.index = entryIndex;
84 // Message size is determined by the type of object being stored, which in turn is determined by the
85 // index of the entry. Those lower than COM_PORT_COUNT are Fw::ComBuffers and those larger Fw::Buffer.
86 entry.msgSize = (entryIndex < COM_PORT_COUNT) ? sizeof(Fw::ComBuffer) : sizeof(Fw::Buffer);
87 totalAllocation += entry.depth * entry.msgSize;
88 currentPriorityIndex++;
89 }
90 }
91 }
92 // Allocate a single chunk of memory from the memory allocator. Memory recover is neither needed nor used.
93 bool recoverable = false;
94 this->m_allocation = this->m_allocator->allocate(this->m_allocationId, totalAllocation, recoverable);
95
96 // Each of the backing queue objects must be supplied memory to store the queued messages. These data regions are
97 // sub-portions of the total allocated data. This memory is passed out by looping through each queue in prioritized
98 // order and passing out the memory to each queue's setup method.
99 FwSizeType allocationOffset = 0;
100 for (FwIndexType i = 0; i < TOTAL_PORT_COUNT; i++) {
101 // Get current queue's allocation size and safety check the values
102 FwSizeType allocationSize = this->m_prioritizedList[i].depth * this->m_prioritizedList[i].msgSize;
103 FW_ASSERT(this->m_prioritizedList[i].index < static_cast<FwIndexType>(FW_NUM_ARRAY_ELEMENTS(this->m_queues)),
104 this->m_prioritizedList[i].index);
105 FW_ASSERT((allocationSize + allocationOffset) <= totalAllocation, allocationSize, allocationOffset,
106 totalAllocation);
107
108 // Setup queue's memory allocation, depth, and message size. Setup is skipped for a depth 0 queue
109 if (allocationSize > 0) {
110 this->m_queues[this->m_prioritizedList[i].index].setup(
111 reinterpret_cast<U8*>(this->m_allocation) + allocationOffset, allocationSize,
112 this->m_prioritizedList[i].depth, this->m_prioritizedList[i].msgSize);
113 }
114 allocationOffset += allocationSize;
115 }
116 // Safety check that all memory was used as expected
117 FW_ASSERT(allocationOffset == totalAllocation, allocationOffset, totalAllocation);
118}
119// ----------------------------------------------------------------------
120// Handler implementations for user-defined typed input ports
121// ----------------------------------------------------------------------
122
123void ComQueue::comQueueIn_handler(const NATIVE_INT_TYPE portNum, Fw::ComBuffer& data, U32 context) {
124 // Ensure that the port number of comQueueIn is consistent with the expectation
125 FW_ASSERT(portNum >= 0 && portNum < COM_PORT_COUNT, portNum);
126 this->enqueue(portNum, QueueType::COM_QUEUE, reinterpret_cast<const U8*>(&data), sizeof(Fw::ComBuffer));
127}
128
129void ComQueue::buffQueueIn_handler(const NATIVE_INT_TYPE portNum, Fw::Buffer& fwBuffer) {
130 const NATIVE_INT_TYPE queueNum = portNum + COM_PORT_COUNT;
131 // Ensure that the port number of buffQueueIn is consistent with the expectation
132 FW_ASSERT(portNum >= 0 && portNum < BUFFER_PORT_COUNT, portNum);
133 FW_ASSERT(queueNum < TOTAL_PORT_COUNT);
134 this->enqueue(queueNum, QueueType::BUFFER_QUEUE, reinterpret_cast<const U8*>(&fwBuffer), sizeof(Fw::Buffer));
135}
136
137void ComQueue::comStatusIn_handler(const NATIVE_INT_TYPE portNum, Fw::Success& condition) {
138 switch (this->m_state) {
139 // On success, the queue should be processed. On failure, the component should still wait.
140 case WAITING:
141 if (condition.e == Fw::Success::SUCCESS) {
142 this->m_state = READY;
143 this->processQueue();
144 // A message may or may not be sent. Thus, READY or WAITING are acceptable final states.
145 FW_ASSERT((this->m_state == WAITING || this->m_state == READY), this->m_state);
146 } else {
147 this->m_state = WAITING;
148 }
149 break;
150 // Both READY and unknown states should not be possible at this point. To receive a status message we must be
151 // one of the WAITING or RETRY states.
152 default:
153 FW_ASSERT(0, this->m_state);
154 break;
155 }
156}
157
158void ComQueue::run_handler(const NATIVE_INT_TYPE portNum, NATIVE_UINT_TYPE context) {
159 // Downlink the high-water marks for the Fw::ComBuffer array types
160 ComQueueDepth comQueueDepth;
161 for (FwSizeType i = 0; i < comQueueDepth.SIZE; i++) {
162 comQueueDepth[i] = this->m_queues[i].get_high_water_mark();
163 this->m_queues[i].clear_high_water_mark();
164 }
165 this->tlmWrite_comQueueDepth(comQueueDepth);
166
167 // Downlink the high-water marks for the Fw::Buffer array types
168 BuffQueueDepth buffQueueDepth;
169 for (FwSizeType i = 0; i < buffQueueDepth.SIZE; i++) {
170 buffQueueDepth[i] = this->m_queues[i + COM_PORT_COUNT].get_high_water_mark();
171 this->m_queues[i + COM_PORT_COUNT].clear_high_water_mark();
172 }
173 this->tlmWrite_buffQueueDepth(buffQueueDepth);
174}
175
176// ----------------------------------------------------------------------
177// Private helper methods
178// ----------------------------------------------------------------------
179
180void ComQueue::enqueue(const FwIndexType queueNum, QueueType queueType, const U8* data, const FwSizeType size) {
181 // Enqueue the given message onto the matching queue. When no space is available then emit the queue overflow event,
182 // set the appropriate throttle, and move on. Will assert if passed a message for a depth 0 queue.
183 const FwSizeType expectedSize = (queueType == QueueType::COM_QUEUE) ? sizeof(Fw::ComBuffer) : sizeof(Fw::Buffer);
184 const FwIndexType portNum = queueNum - ((queueType == QueueType::COM_QUEUE) ? 0 : COM_PORT_COUNT);
185 FW_ASSERT(expectedSize == size, size, expectedSize);
186 FW_ASSERT(portNum >= 0, portNum);
187 Fw::SerializeStatus status = this->m_queues[queueNum].enqueue(data, size);
188 if (status == Fw::FW_SERIALIZE_NO_ROOM_LEFT && !this->m_throttle[queueNum]) {
189 this->log_WARNING_HI_QueueOverflow(queueType, portNum);
190 this->m_throttle[queueNum] = true;
191 }
192 // When the component is already in READY state process the queue to send out the next available message immediately
193 if (this->m_state == READY) {
194 this->processQueue();
195 }
196}
197
198void ComQueue::sendComBuffer(Fw::ComBuffer& comBuffer) {
199 FW_ASSERT(this->m_state == READY);
200 this->comQueueSend_out(0, comBuffer, 0);
201 this->m_state = WAITING;
202}
203
204void ComQueue::sendBuffer(Fw::Buffer& buffer) {
205 // Retry buffer expected to be cleared as we are either transferring ownership or have already deallocated it.
206 FW_ASSERT(this->m_state == READY);
207 this->buffQueueSend_out(0, buffer);
208 this->m_state = WAITING;
209}
210
211void ComQueue::processQueue() {
212 FwIndexType priorityIndex = 0;
213 FwIndexType sendPriority = 0;
214 // Check that we are in the appropriate state
215 FW_ASSERT(this->m_state == READY);
216
217 // Walk all the queues in priority order. Send the first message that is available in priority order. No balancing
218 // is done within this loop.
219 for (priorityIndex = 0; priorityIndex < TOTAL_PORT_COUNT; priorityIndex++) {
220 QueueMetadata& entry = this->m_prioritizedList[priorityIndex];
221 Types::Queue& queue = this->m_queues[entry.index];
222
223 // Continue onto next prioritized queue if there is no items in the current queue
224 if (queue.getQueueSize() == 0) {
225 continue;
226 }
227
228 // Send out the message based on the type
229 if (entry.index < COM_PORT_COUNT) {
230 Fw::ComBuffer comBuffer;
231 queue.dequeue(reinterpret_cast<U8*>(&comBuffer), sizeof(comBuffer));
232 this->sendComBuffer(comBuffer);
233 } else {
234 Fw::Buffer buffer;
235 queue.dequeue(reinterpret_cast<U8*>(&buffer), sizeof(buffer));
236 this->sendBuffer(buffer);
237 }
238
239 // Update the throttle and the index that was just sent
240 this->m_throttle[entry.index] = false;
241
242 // Priority used in the next loop
243 sendPriority = entry.priority;
244 break;
245 }
246
247 // Starting on the priority entry after the one dispatched and continuing through the end of the set of entries that
248 // share the same priority, rotate those entries such that the currently dispatched queue is last and the rest are
249 // shifted up by one. This effectively round-robins the queues of the same priority.
250 for (priorityIndex++;
251 priorityIndex < TOTAL_PORT_COUNT && (this->m_prioritizedList[priorityIndex].priority == sendPriority);
252 priorityIndex++) {
253 // Swap the previous entry with this one.
254 QueueMetadata temp = this->m_prioritizedList[priorityIndex];
255 this->m_prioritizedList[priorityIndex] = this->m_prioritizedList[priorityIndex - 1];
256 this->m_prioritizedList[priorityIndex - 1] = temp;
257 }
258}
259} // end namespace Svc
#define FW_ASSERT(...)
Definition Assert.hpp:7
PlatformIntType NATIVE_INT_TYPE
Definition BasicTypes.h:51
uint8_t U8
8-bit unsigned integer
Definition BasicTypes.h:26
#define FW_NUM_ARRAY_ELEMENTS(a)
number of elements in an array
Definition BasicTypes.h:66
PlatformUIntType NATIVE_UINT_TYPE
Definition BasicTypes.h:52
C++ header for working with basic fprime types.
PlatformSizeType FwSizeType
Definition FpConfig.h:18
PlatformIndexType FwIndexType
Definition FpConfig.h:15
virtual void * allocate(const NATIVE_UINT_TYPE identifier, NATIVE_UINT_TYPE &size, bool &recoverable)=0
Allocate memory.
void configure(QueueConfigurationTable queueConfig, NATIVE_UINT_TYPE allocationId, Fw::MemAllocator &allocator)
Definition ComQueue.cpp:49
static const FwIndexType BUFFER_PORT_COUNT
Total count of input buffer ports and thus total queues.
Definition ComQueue.hpp:29
static const FwIndexType COM_PORT_COUNT
< Count of Fw::Com input ports and thus Fw::Com queues
Definition ComQueue.hpp:26
static const FwIndexType TOTAL_PORT_COUNT
Definition ComQueue.hpp:32
NATIVE_UINT_TYPE getQueueSize() const
Definition Queue.cpp:50
void setup(U8 *const storage, const FwSizeType storage_size, const FwSizeType depth, const FwSizeType message_size)
setup the queue object to setup storage
Definition Queue.cpp:17
Fw::SerializeStatus dequeue(U8 *const message, const FwSizeType size)
pops a fixed-size message off the front of the queue
Definition Queue.cpp:31
Fw::SerializeStatus enqueue(const U8 *const message, const FwSizeType size)
pushes a fixed-size message onto the back of the queue
Definition Queue.cpp:25
NATIVE_UINT_TYPE get_high_water_mark() const
Definition Queue.cpp:41
void clear_high_water_mark()
Definition Queue.cpp:46
SerializeStatus
forward declaration for string
@ FW_SERIALIZE_NO_ROOM_LEFT
No room left in the buffer to serialize data.
FwIndexType priority
Priority of the queue [0, TOTAL_PORT_COUNT)
Definition ComQueue.hpp:47
FwSizeType depth
Depth of the queue [0, infinity)
Definition ComQueue.hpp:46
configuration table for each queue
Definition ComQueue.hpp:59
QueueConfigurationEntry entries[TOTAL_PORT_COUNT]
Definition ComQueue.hpp:60