F´ Flight Software - C/C++ Documentation  devel
A framework for building embedded system applications to NASA flight quality standards.
ComQueue.cpp
Go to the documentation of this file.
1 // ======================================================================
2 // \title ComQueue.cpp
3 // \author vbai
4 // \brief cpp file for ComQueue component implementation class
5 // ======================================================================
6 
7 #include <Fw/Types/Assert.hpp>
10 
11 namespace Svc {
12 
13 // ----------------------------------------------------------------------
14 // Construction, initialization, and destruction
15 // ----------------------------------------------------------------------
16 
17 ComQueue ::QueueConfigurationTable ::QueueConfigurationTable() {
18  for (NATIVE_UINT_TYPE i = 0; i < FW_NUM_ARRAY_ELEMENTS(this->entries); i++) {
19  this->entries[i].priority = 0;
20  this->entries[i].depth = 0;
21  }
22 }
23 
24 ComQueue ::ComQueue(const char* const compName)
25  : ComQueueComponentBase(compName),
26  m_state(WAITING),
27  m_allocationId(static_cast<NATIVE_UINT_TYPE>(-1)),
28  m_allocator(nullptr),
29  m_allocation(nullptr) {
30  // Initialize throttles to "off"
31  for (NATIVE_UINT_TYPE i = 0; i < TOTAL_PORT_COUNT; i++) {
32  this->m_throttle[i] = false;
33  }
34 }
35 
37 
39  // Deallocate memory ignoring error conditions
40  if ((this->m_allocator != nullptr) && (this->m_allocation != nullptr)) {
41  this->m_allocator->deallocate(this->m_allocationId, this->m_allocation);
42  }
43 }
44 
46  NATIVE_UINT_TYPE allocationId,
47  Fw::MemAllocator& allocator) {
48  FwIndexType currentPriorityIndex = 0;
49  NATIVE_UINT_TYPE totalAllocation = 0;
50 
51  // Store/initialize allocator members
52  this->m_allocator = &allocator;
53  this->m_allocationId = allocationId;
54  this->m_allocation = nullptr;
55 
56  // Initializes the sorted queue metadata list in priority (sorted) order. This is accomplished by walking the
57  // priority values in priority order from 0 to TOTAL_PORT_COUNT. At each priory value, the supplied queue
58  // configuration table is walked and any entry matching the current priority values is used to add queue metadata to
59  // the prioritized list. This results in priority-sorted queue metadata objects that index back into the unsorted
60  // queue data structures.
61  //
62  // The total allocation size is tracked for passing to the allocation call and is a summation of
63  // (depth * message size) for each prioritized metadata object of (depth * message size)
64  for (FwIndexType currentPriority = 0; currentPriority < TOTAL_PORT_COUNT; currentPriority++) {
65  // Walk each queue configuration entry and add them into the prioritized metadata list when matching the current
66  // priority value
67  for (FwIndexType entryIndex = 0; entryIndex < static_cast<FwIndexType>(FW_NUM_ARRAY_ELEMENTS(queueConfig.entries)); entryIndex++) {
68  // Check for valid configuration entry
69  FW_ASSERT(queueConfig.entries[entryIndex].priority < TOTAL_PORT_COUNT,
70  static_cast<FwAssertArgType>(queueConfig.entries[entryIndex].priority),
71  static_cast<FwAssertArgType>(TOTAL_PORT_COUNT),
72  static_cast<FwAssertArgType>(entryIndex));
73 
74  if (currentPriority == queueConfig.entries[entryIndex].priority) {
75  // Set up the queue metadata object in order to track priority, depth, index into the queue list of the
76  // backing queue object, and message size. Both index and message size are calculated where priority and
77  // depth are copied from the configuration object.
78  QueueMetadata& entry = this->m_prioritizedList[currentPriorityIndex];
79  entry.priority = queueConfig.entries[entryIndex].priority;
80  entry.depth = queueConfig.entries[entryIndex].depth;
81  entry.index = entryIndex;
82  // Message size is determined by the type of object being stored, which in turn is determined by the
83  // index of the entry. Those lower than COM_PORT_COUNT are Fw::ComBuffers and those larger Fw::Buffer.
84  entry.msgSize = (entryIndex < COM_PORT_COUNT) ? sizeof(Fw::ComBuffer) : sizeof(Fw::Buffer);
85  totalAllocation += static_cast<NATIVE_UINT_TYPE>(entry.depth * entry.msgSize);
86  currentPriorityIndex++;
87  }
88  }
89  }
90  // Allocate a single chunk of memory from the memory allocator. Memory recover is neither needed nor used.
91  bool recoverable = false;
92  this->m_allocation = this->m_allocator->allocate(this->m_allocationId, totalAllocation, recoverable);
93 
94  // Each of the backing queue objects must be supplied memory to store the queued messages. These data regions are
95  // sub-portions of the total allocated data. This memory is passed out by looping through each queue in prioritized
96  // order and passing out the memory to each queue's setup method.
97  FwSizeType allocationOffset = 0;
98  for (FwIndexType i = 0; i < TOTAL_PORT_COUNT; i++) {
99  // Get current queue's allocation size and safety check the values
100  FwSizeType allocationSize = this->m_prioritizedList[i].depth * this->m_prioritizedList[i].msgSize;
101  FW_ASSERT(this->m_prioritizedList[i].index < static_cast<FwIndexType>(FW_NUM_ARRAY_ELEMENTS(this->m_queues)),
102  this->m_prioritizedList[i].index);
103  FW_ASSERT(
104  (allocationSize + allocationOffset) <= totalAllocation,
105  static_cast<FwAssertArgType>(allocationSize),
106  static_cast<FwAssertArgType>(allocationOffset),
107  static_cast<FwAssertArgType>(totalAllocation));
108 
109  // Setup queue's memory allocation, depth, and message size. Setup is skipped for a depth 0 queue
110  if (allocationSize > 0) {
111  this->m_queues[this->m_prioritizedList[i].index].setup(
112  reinterpret_cast<U8*>(this->m_allocation) + allocationOffset, allocationSize,
113  this->m_prioritizedList[i].depth, this->m_prioritizedList[i].msgSize);
114  }
115  allocationOffset += allocationSize;
116  }
117  // Safety check that all memory was used as expected
118  FW_ASSERT(
119  allocationOffset == totalAllocation,
120  static_cast<FwAssertArgType>(allocationOffset),
121  static_cast<FwAssertArgType>(totalAllocation));
122 }
123 // ----------------------------------------------------------------------
124 // Handler implementations for user-defined typed input ports
125 // ----------------------------------------------------------------------
126 
127 void ComQueue::comQueueIn_handler(const NATIVE_INT_TYPE portNum, Fw::ComBuffer& data, U32 context) {
128  // Ensure that the port number of comQueueIn is consistent with the expectation
129  FW_ASSERT(portNum >= 0 && portNum < COM_PORT_COUNT, portNum);
130  (void)this->enqueue(portNum, QueueType::COM_QUEUE, reinterpret_cast<const U8*>(&data), sizeof(Fw::ComBuffer));
131 }
132 
133 void ComQueue::buffQueueIn_handler(const NATIVE_INT_TYPE portNum, Fw::Buffer& fwBuffer) {
134  const NATIVE_INT_TYPE queueNum = portNum + COM_PORT_COUNT;
135  // Ensure that the port number of buffQueueIn is consistent with the expectation
136  FW_ASSERT(portNum >= 0 && portNum < BUFFER_PORT_COUNT, portNum);
137  FW_ASSERT(queueNum < TOTAL_PORT_COUNT);
138  bool status =
139  this->enqueue(queueNum, QueueType::BUFFER_QUEUE, reinterpret_cast<const U8*>(&fwBuffer), sizeof(Fw::Buffer));
140  if (!status) {
141  this->deallocate_out(portNum, fwBuffer);
142  }
143 }
144 
145 void ComQueue::comStatusIn_handler(const NATIVE_INT_TYPE portNum, Fw::Success& condition) {
146  switch (this->m_state) {
147  // On success, the queue should be processed. On failure, the component should still wait.
148  case WAITING:
149  if (condition.e == Fw::Success::SUCCESS) {
150  this->m_state = READY;
151  this->processQueue();
152  // A message may or may not be sent. Thus, READY or WAITING are acceptable final states.
153  FW_ASSERT((this->m_state == WAITING || this->m_state == READY), this->m_state);
154  } else {
155  this->m_state = WAITING;
156  }
157  break;
158  // Both READY and unknown states should not be possible at this point. To receive a status message we must be
159  // one of the WAITING or RETRY states.
160  default:
161  FW_ASSERT(0, this->m_state);
162  break;
163  }
164 }
165 
166 void ComQueue::run_handler(const NATIVE_INT_TYPE portNum, U32 context) {
167  // Downlink the high-water marks for the Fw::ComBuffer array types
168  ComQueueDepth comQueueDepth;
169  for (U32 i = 0; i < comQueueDepth.SIZE; i++) {
170  comQueueDepth[i] = this->m_queues[i].get_high_water_mark();
171  this->m_queues[i].clear_high_water_mark();
172  }
173  this->tlmWrite_comQueueDepth(comQueueDepth);
174 
175  // Downlink the high-water marks for the Fw::Buffer array types
176  BuffQueueDepth buffQueueDepth;
177  for (U32 i = 0; i < buffQueueDepth.SIZE; i++) {
178  buffQueueDepth[i] = this->m_queues[i + COM_PORT_COUNT].get_high_water_mark();
179  this->m_queues[i + COM_PORT_COUNT].clear_high_water_mark();
180  }
181  this->tlmWrite_buffQueueDepth(buffQueueDepth);
182 }
183 
184 // ----------------------------------------------------------------------
185 // Hook implementations for typed async input ports
186 // ----------------------------------------------------------------------
187 
188 void ComQueue::buffQueueIn_overflowHook(FwIndexType portNum, Fw::Buffer& fwBuffer) {
189  FW_ASSERT(portNum >= 0 && portNum < BUFFER_PORT_COUNT, portNum);
190  this->deallocate_out(portNum, fwBuffer);
191 }
192 
193 // ----------------------------------------------------------------------
194 // Private helper methods
195 // ----------------------------------------------------------------------
196 
197 bool ComQueue::enqueue(const FwIndexType queueNum, QueueType queueType, const U8* data, const FwSizeType size) {
198  // Enqueue the given message onto the matching queue. When no space is available then emit the queue overflow event,
199  // set the appropriate throttle, and move on. Will assert if passed a message for a depth 0 queue.
200  const FwSizeType expectedSize = (queueType == QueueType::COM_QUEUE) ? sizeof(Fw::ComBuffer) : sizeof(Fw::Buffer);
201  const FwIndexType portNum = queueNum - ((queueType == QueueType::COM_QUEUE) ? 0 : COM_PORT_COUNT);
202  bool rvStatus = true;
203  FW_ASSERT(
204  expectedSize == size,
205  static_cast<FwAssertArgType>(size),
206  static_cast<FwAssertArgType>(expectedSize));
207  FW_ASSERT(portNum >= 0, portNum);
208  Fw::SerializeStatus status = this->m_queues[queueNum].enqueue(data, size);
209  if (status == Fw::FW_SERIALIZE_NO_ROOM_LEFT) {
210  if (!this->m_throttle[queueNum]) {
211  this->log_WARNING_HI_QueueOverflow(queueType, static_cast<U32>(portNum));
212  this->m_throttle[queueNum] = true;
213  }
214 
215  rvStatus = false;
216  }
217  // When the component is already in READY state process the queue to send out the next available message immediately
218  if (this->m_state == READY) {
219  this->processQueue();
220  }
221 
222  return rvStatus;
223 }
224 
225 void ComQueue::sendComBuffer(Fw::ComBuffer& comBuffer) {
226  FW_ASSERT(this->m_state == READY);
227  this->comQueueSend_out(0, comBuffer, 0);
228  this->m_state = WAITING;
229 }
230 
231 void ComQueue::sendBuffer(Fw::Buffer& buffer) {
232  // Retry buffer expected to be cleared as we are either transferring ownership or have already deallocated it.
233  FW_ASSERT(this->m_state == READY);
234  this->buffQueueSend_out(0, buffer);
235  this->m_state = WAITING;
236 }
237 
238 void ComQueue::processQueue() {
239  FwIndexType priorityIndex = 0;
240  FwIndexType sendPriority = 0;
241  // Check that we are in the appropriate state
242  FW_ASSERT(this->m_state == READY);
243 
244  // Walk all the queues in priority order. Send the first message that is available in priority order. No balancing
245  // is done within this loop.
246  for (priorityIndex = 0; priorityIndex < TOTAL_PORT_COUNT; priorityIndex++) {
247  QueueMetadata& entry = this->m_prioritizedList[priorityIndex];
248  Types::Queue& queue = this->m_queues[entry.index];
249 
250  // Continue onto next prioritized queue if there is no items in the current queue
251  if (queue.getQueueSize() == 0) {
252  continue;
253  }
254 
255  // Send out the message based on the type
256  if (entry.index < COM_PORT_COUNT) {
257  Fw::ComBuffer comBuffer;
258  queue.dequeue(reinterpret_cast<U8*>(&comBuffer), sizeof(comBuffer));
259  this->sendComBuffer(comBuffer);
260  } else {
261  Fw::Buffer buffer;
262  queue.dequeue(reinterpret_cast<U8*>(&buffer), sizeof(buffer));
263  this->sendBuffer(buffer);
264  }
265 
266  // Update the throttle and the index that was just sent
267  this->m_throttle[entry.index] = false;
268 
269  // Priority used in the next loop
270  sendPriority = entry.priority;
271  break;
272  }
273 
274  // Starting on the priority entry after the one dispatched and continuing through the end of the set of entries that
275  // share the same priority, rotate those entries such that the currently dispatched queue is last and the rest are
276  // shifted up by one. This effectively round-robins the queues of the same priority.
277  for (priorityIndex++;
278  priorityIndex < TOTAL_PORT_COUNT && (this->m_prioritizedList[priorityIndex].priority == sendPriority);
279  priorityIndex++) {
280  // Swap the previous entry with this one.
281  QueueMetadata temp = this->m_prioritizedList[priorityIndex];
282  this->m_prioritizedList[priorityIndex] = this->m_prioritizedList[priorityIndex - 1];
283  this->m_prioritizedList[priorityIndex - 1] = temp;
284  }
285 }
286 } // end namespace Svc
#define FW_ASSERT(...)
Definition: Assert.hpp:14
PlatformIntType NATIVE_INT_TYPE
Definition: BasicTypes.h:55
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.h:30
#define FW_NUM_ARRAY_ELEMENTS(a)
number of elements in an array
Definition: BasicTypes.h:70
PlatformUIntType NATIVE_UINT_TYPE
Definition: BasicTypes.h:56
C++ header for working with basic fprime types.
PlatformAssertArgType FwAssertArgType
Definition: FpConfig.h:39
PlatformSizeType FwSizeType
Definition: FpConfig.h:35
PlatformIndexType FwIndexType
Definition: FpConfig.h:25
virtual void deallocate(const NATIVE_UINT_TYPE identifier, void *ptr)=0
Deallocate memory.
virtual void * allocate(const NATIVE_UINT_TYPE identifier, NATIVE_UINT_TYPE &size, bool &recoverable)=0
Allocate memory.
Success/Failure.
T e
The raw enum value.
@ SUCCESS
Representing success.
Auto-generated base for ComQueue component.
void deallocate_out(FwIndexType portNum, Fw::Buffer &fwBuffer)
Invoke output port deallocate.
void tlmWrite_comQueueDepth(const Svc::ComQueueDepth &arg, Fw::Time _tlmTime=Fw::Time())
void tlmWrite_buffQueueDepth(const Svc::BuffQueueDepth &arg, Fw::Time _tlmTime=Fw::Time())
void buffQueueSend_out(FwIndexType portNum, Fw::Buffer &fwBuffer)
Invoke output port buffQueueSend.
void comQueueSend_out(FwIndexType portNum, Fw::ComBuffer &data, U32 context)
Invoke output port comQueueSend.
void log_WARNING_HI_QueueOverflow(Svc::QueueType queueType, U32 index)
ComQueue(const char *const compName)
Definition: ComQueue.cpp:24
void configure(QueueConfigurationTable queueConfig, NATIVE_UINT_TYPE allocationId, Fw::MemAllocator &allocator)
Definition: ComQueue.cpp:45
void cleanup()
Definition: ComQueue.cpp:38
static const FwIndexType BUFFER_PORT_COUNT
Total count of input buffer ports and thus total queues.
Definition: ComQueue.hpp:29
static const FwIndexType COM_PORT_COUNT
< Count of Fw::Com input ports and thus Fw::Com queues
Definition: ComQueue.hpp:26
static const FwIndexType TOTAL_PORT_COUNT
Definition: ComQueue.hpp:32
NATIVE_UINT_TYPE getQueueSize() const
Definition: Queue.cpp:60
void setup(U8 *const storage, const FwSizeType storage_size, const FwSizeType depth, const FwSizeType message_size)
setup the queue object to setup storage
Definition: Queue.cpp:17
Fw::SerializeStatus dequeue(U8 *const message, const FwSizeType size)
pops a fixed-size message off the front of the queue
Definition: Queue.cpp:38
Fw::SerializeStatus enqueue(const U8 *const message, const FwSizeType size)
pushes a fixed-size message onto the back of the queue
Definition: Queue.cpp:29
NATIVE_UINT_TYPE get_high_water_mark() const
Definition: Queue.cpp:51
void clear_high_water_mark()
Definition: Queue.cpp:56
SerializeStatus
forward declaration for string
@ FW_SERIALIZE_NO_ROOM_LEFT
No room left in the buffer to serialize data.
FwIndexType priority
Priority of the queue [0, TOTAL_PORT_COUNT)
Definition: ComQueue.hpp:47
FwSizeType depth
Depth of the queue [0, infinity)
Definition: ComQueue.hpp:46
configuration table for each queue
Definition: ComQueue.hpp:59
QueueConfigurationEntry entries[TOTAL_PORT_COUNT]
Definition: ComQueue.hpp:60