F´ Flight Software - C/C++ Documentation  devel
A framework for building embedded system applications to NASA flight quality standards.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ComQueue.cpp
Go to the documentation of this file.
1 // ======================================================================
2 // \title ComQueue.cpp
3 // \author vbai
4 // \brief cpp file for ComQueue component implementation class
5 // ======================================================================
6 
7 #include <Fw/Types/Assert.hpp>
10 
11 namespace Svc {
12 
13 // ----------------------------------------------------------------------
14 // Construction, initialization, and destruction
15 // ----------------------------------------------------------------------
16 
17 ComQueue ::QueueConfigurationTable ::QueueConfigurationTable() {
18  for (NATIVE_UINT_TYPE i = 0; i < FW_NUM_ARRAY_ELEMENTS(this->entries); i++) {
19  this->entries[i].priority = 0;
20  this->entries[i].depth = 0;
21  }
22 }
23 
24 ComQueue ::ComQueue(const char* const compName)
25  : ComQueueComponentBase(compName),
26  m_state(WAITING),
27  m_allocationId(static_cast<NATIVE_UINT_TYPE>(-1)),
28  m_allocator(nullptr),
29  m_allocation(nullptr) {
30  // Initialize throttles to "off"
31  for (NATIVE_UINT_TYPE i = 0; i < TOTAL_PORT_COUNT; i++) {
32  this->m_throttle[i] = false;
33  }
34 }
35 
37 
39  // Deallocate memory ignoring error conditions
40  if ((this->m_allocator != nullptr) && (this->m_allocation != nullptr)) {
41  this->m_allocator->deallocate(this->m_allocationId, this->m_allocation);
42  }
43 }
44 
46  NATIVE_UINT_TYPE allocationId,
47  Fw::MemAllocator& allocator) {
48  FwIndexType currentPriorityIndex = 0;
49  NATIVE_UINT_TYPE totalAllocation = 0;
50 
51  // Store/initialize allocator members
52  this->m_allocator = &allocator;
53  this->m_allocationId = allocationId;
54  this->m_allocation = nullptr;
55 
56  // Initializes the sorted queue metadata list in priority (sorted) order. This is accomplished by walking the
57  // priority values in priority order from 0 to TOTAL_PORT_COUNT. At each priory value, the supplied queue
58  // configuration table is walked and any entry matching the current priority values is used to add queue metadata to
59  // the prioritized list. This results in priority-sorted queue metadata objects that index back into the unsorted
60  // queue data structures.
61  //
62  // The total allocation size is tracked for passing to the allocation call and is a summation of
63  // (depth * message size) for each prioritized metadata object of (depth * message size)
64  for (FwIndexType currentPriority = 0; currentPriority < TOTAL_PORT_COUNT; currentPriority++) {
65  // Walk each queue configuration entry and add them into the prioritized metadata list when matching the current
66  // priority value
67  for (FwIndexType entryIndex = 0; entryIndex < static_cast<FwIndexType>(FW_NUM_ARRAY_ELEMENTS(queueConfig.entries)); entryIndex++) {
68  // Check for valid configuration entry
69  FW_ASSERT(queueConfig.entries[entryIndex].priority < TOTAL_PORT_COUNT,
70  static_cast<FwAssertArgType>(queueConfig.entries[entryIndex].priority),
71  static_cast<FwAssertArgType>(TOTAL_PORT_COUNT),
72  static_cast<FwAssertArgType>(entryIndex));
73 
74  if (currentPriority == queueConfig.entries[entryIndex].priority) {
75  // Set up the queue metadata object in order to track priority, depth, index into the queue list of the
76  // backing queue object, and message size. Both index and message size are calculated where priority and
77  // depth are copied from the configuration object.
78  QueueMetadata& entry = this->m_prioritizedList[currentPriorityIndex];
79  entry.priority = queueConfig.entries[entryIndex].priority;
80  entry.depth = queueConfig.entries[entryIndex].depth;
81  entry.index = entryIndex;
82  // Message size is determined by the type of object being stored, which in turn is determined by the
83  // index of the entry. Those lower than COM_PORT_COUNT are Fw::ComBuffers and those larger Fw::Buffer.
84  entry.msgSize = (entryIndex < COM_PORT_COUNT) ? sizeof(Fw::ComBuffer) : sizeof(Fw::Buffer);
85  totalAllocation += static_cast<NATIVE_UINT_TYPE>(entry.depth * entry.msgSize);
86  currentPriorityIndex++;
87  }
88  }
89  }
90  // Allocate a single chunk of memory from the memory allocator. Memory recover is neither needed nor used.
91  bool recoverable = false;
92  this->m_allocation = this->m_allocator->allocate(this->m_allocationId, totalAllocation, recoverable);
93 
94  // Each of the backing queue objects must be supplied memory to store the queued messages. These data regions are
95  // sub-portions of the total allocated data. This memory is passed out by looping through each queue in prioritized
96  // order and passing out the memory to each queue's setup method.
97  FwSizeType allocationOffset = 0;
98  for (FwIndexType i = 0; i < TOTAL_PORT_COUNT; i++) {
99  // Get current queue's allocation size and safety check the values
100  FwSizeType allocationSize = this->m_prioritizedList[i].depth * this->m_prioritizedList[i].msgSize;
101  FW_ASSERT(this->m_prioritizedList[i].index < static_cast<FwIndexType>(FW_NUM_ARRAY_ELEMENTS(this->m_queues)),
102  this->m_prioritizedList[i].index);
103  FW_ASSERT(
104  (allocationSize + allocationOffset) <= totalAllocation,
105  static_cast<FwAssertArgType>(allocationSize),
106  static_cast<FwAssertArgType>(allocationOffset),
107  static_cast<FwAssertArgType>(totalAllocation));
108 
109  // Setup queue's memory allocation, depth, and message size. Setup is skipped for a depth 0 queue
110  if (allocationSize > 0) {
111  this->m_queues[this->m_prioritizedList[i].index].setup(
112  reinterpret_cast<U8*>(this->m_allocation) + allocationOffset, allocationSize,
113  this->m_prioritizedList[i].depth, this->m_prioritizedList[i].msgSize);
114  }
115  allocationOffset += allocationSize;
116  }
117  // Safety check that all memory was used as expected
118  FW_ASSERT(
119  allocationOffset == totalAllocation,
120  static_cast<FwAssertArgType>(allocationOffset),
121  static_cast<FwAssertArgType>(totalAllocation));
122 }
123 // ----------------------------------------------------------------------
124 // Handler implementations for user-defined typed input ports
125 // ----------------------------------------------------------------------
126 
127 void ComQueue::comQueueIn_handler(const NATIVE_INT_TYPE portNum, Fw::ComBuffer& data, U32 context) {
128  // Ensure that the port number of comQueueIn is consistent with the expectation
129  FW_ASSERT(portNum >= 0 && portNum < COM_PORT_COUNT, portNum);
130  (void)this->enqueue(portNum, QueueType::COM_QUEUE, reinterpret_cast<const U8*>(&data), sizeof(Fw::ComBuffer));
131 }
132 
133 void ComQueue::buffQueueIn_handler(const NATIVE_INT_TYPE portNum, Fw::Buffer& fwBuffer) {
134  const NATIVE_INT_TYPE queueNum = portNum + COM_PORT_COUNT;
135  // Ensure that the port number of buffQueueIn is consistent with the expectation
136  FW_ASSERT(portNum >= 0 && portNum < BUFFER_PORT_COUNT, portNum);
137  FW_ASSERT(queueNum < TOTAL_PORT_COUNT);
138  bool status =
139  this->enqueue(queueNum, QueueType::BUFFER_QUEUE, reinterpret_cast<const U8*>(&fwBuffer), sizeof(Fw::Buffer));
140  if (!status) {
141  this->deallocate_out(portNum, fwBuffer);
142  }
143 }
144 
145 void ComQueue::comStatusIn_handler(const NATIVE_INT_TYPE portNum, Fw::Success& condition) {
146  switch (this->m_state) {
147  // On success, the queue should be processed. On failure, the component should still wait.
148  case WAITING:
149  if (condition.e == Fw::Success::SUCCESS) {
150  this->m_state = READY;
151  this->processQueue();
152  // A message may or may not be sent. Thus, READY or WAITING are acceptable final states.
153  FW_ASSERT((this->m_state == WAITING || this->m_state == READY), this->m_state);
154  } else {
155  this->m_state = WAITING;
156  }
157  break;
158  // Both READY and unknown states should not be possible at this point. To receive a status message we must be
159  // one of the WAITING or RETRY states.
160  default:
161  FW_ASSERT(0, this->m_state);
162  break;
163  }
164 }
165 
166 void ComQueue::run_handler(const NATIVE_INT_TYPE portNum, U32 context) {
167  // Downlink the high-water marks for the Fw::ComBuffer array types
168  ComQueueDepth comQueueDepth;
169  for (U32 i = 0; i < comQueueDepth.SIZE; i++) {
170  comQueueDepth[i] = this->m_queues[i].get_high_water_mark();
171  this->m_queues[i].clear_high_water_mark();
172  }
173  this->tlmWrite_comQueueDepth(comQueueDepth);
174 
175  // Downlink the high-water marks for the Fw::Buffer array types
176  BuffQueueDepth buffQueueDepth;
177  for (U32 i = 0; i < buffQueueDepth.SIZE; i++) {
178  buffQueueDepth[i] = this->m_queues[i + COM_PORT_COUNT].get_high_water_mark();
179  this->m_queues[i + COM_PORT_COUNT].clear_high_water_mark();
180  }
181  this->tlmWrite_buffQueueDepth(buffQueueDepth);
182 }
183 
184 // ----------------------------------------------------------------------
185 // Hook implementations for typed async input ports
186 // ----------------------------------------------------------------------
187 
188 void ComQueue::buffQueueIn_overflowHook(FwIndexType portNum, Fw::Buffer& fwBuffer) {
189  FW_ASSERT(portNum >= 0 && portNum < BUFFER_PORT_COUNT, portNum);
190  this->deallocate_out(portNum, fwBuffer);
191 }
192 
193 // ----------------------------------------------------------------------
194 // Private helper methods
195 // ----------------------------------------------------------------------
196 
197 bool ComQueue::enqueue(const FwIndexType queueNum, QueueType queueType, const U8* data, const FwSizeType size) {
198  // Enqueue the given message onto the matching queue. When no space is available then emit the queue overflow event,
199  // set the appropriate throttle, and move on. Will assert if passed a message for a depth 0 queue.
200  const FwSizeType expectedSize = (queueType == QueueType::COM_QUEUE) ? sizeof(Fw::ComBuffer) : sizeof(Fw::Buffer);
201  const FwIndexType portNum = queueNum - ((queueType == QueueType::COM_QUEUE) ? 0 : COM_PORT_COUNT);
202  bool rvStatus = true;
203  FW_ASSERT(
204  expectedSize == size,
205  static_cast<FwAssertArgType>(size),
206  static_cast<FwAssertArgType>(expectedSize));
207  FW_ASSERT(portNum >= 0, portNum);
208  Fw::SerializeStatus status = this->m_queues[queueNum].enqueue(data, size);
209  if (status == Fw::FW_SERIALIZE_NO_ROOM_LEFT) {
210  if (!this->m_throttle[queueNum]) {
211  this->log_WARNING_HI_QueueOverflow(queueType, static_cast<U32>(portNum));
212  this->m_throttle[queueNum] = true;
213  }
214 
215  rvStatus = false;
216  }
217  // When the component is already in READY state process the queue to send out the next available message immediately
218  if (this->m_state == READY) {
219  this->processQueue();
220  }
221 
222  return rvStatus;
223 }
224 
225 void ComQueue::sendComBuffer(Fw::ComBuffer& comBuffer) {
226  FW_ASSERT(this->m_state == READY);
227  this->comQueueSend_out(0, comBuffer, 0);
228  this->m_state = WAITING;
229 }
230 
231 void ComQueue::sendBuffer(Fw::Buffer& buffer) {
232  // Retry buffer expected to be cleared as we are either transferring ownership or have already deallocated it.
233  FW_ASSERT(this->m_state == READY);
234  this->buffQueueSend_out(0, buffer);
235  this->m_state = WAITING;
236 }
237 
238 void ComQueue::processQueue() {
239  FwIndexType priorityIndex = 0;
240  FwIndexType sendPriority = 0;
241  // Check that we are in the appropriate state
242  FW_ASSERT(this->m_state == READY);
243 
244  // Walk all the queues in priority order. Send the first message that is available in priority order. No balancing
245  // is done within this loop.
246  for (priorityIndex = 0; priorityIndex < TOTAL_PORT_COUNT; priorityIndex++) {
247  QueueMetadata& entry = this->m_prioritizedList[priorityIndex];
248  Types::Queue& queue = this->m_queues[entry.index];
249 
250  // Continue onto next prioritized queue if there is no items in the current queue
251  if (queue.getQueueSize() == 0) {
252  continue;
253  }
254 
255  // Send out the message based on the type
256  if (entry.index < COM_PORT_COUNT) {
257  Fw::ComBuffer comBuffer;
258  queue.dequeue(reinterpret_cast<U8*>(&comBuffer), sizeof(comBuffer));
259  this->sendComBuffer(comBuffer);
260  } else {
261  Fw::Buffer buffer;
262  queue.dequeue(reinterpret_cast<U8*>(&buffer), sizeof(buffer));
263  this->sendBuffer(buffer);
264  }
265 
266  // Update the throttle and the index that was just sent
267  this->m_throttle[entry.index] = false;
268 
269  // Priority used in the next loop
270  sendPriority = entry.priority;
271  break;
272  }
273 
274  // Starting on the priority entry after the one dispatched and continuing through the end of the set of entries that
275  // share the same priority, rotate those entries such that the currently dispatched queue is last and the rest are
276  // shifted up by one. This effectively round-robins the queues of the same priority.
277  for (priorityIndex++;
278  priorityIndex < TOTAL_PORT_COUNT && (this->m_prioritizedList[priorityIndex].priority == sendPriority);
279  priorityIndex++) {
280  // Swap the previous entry with this one.
281  QueueMetadata temp = this->m_prioritizedList[priorityIndex];
282  this->m_prioritizedList[priorityIndex] = this->m_prioritizedList[priorityIndex - 1];
283  this->m_prioritizedList[priorityIndex - 1] = temp;
284  }
285 }
286 } // end namespace Svc
#define FW_ASSERT(...)
Definition: Assert.hpp:14
PlatformIntType NATIVE_INT_TYPE
Definition: BasicTypes.h:55
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.h:30
#define FW_NUM_ARRAY_ELEMENTS(a)
number of elements in an array
Definition: BasicTypes.h:70
PlatformUIntType NATIVE_UINT_TYPE
Definition: BasicTypes.h:56
C++ header for working with basic fprime types.
PlatformAssertArgType FwAssertArgType
Definition: FpConfig.h:39
PlatformSizeType FwSizeType
Definition: FpConfig.h:35
PlatformIndexType FwIndexType
Definition: FpConfig.h:25
virtual void deallocate(const NATIVE_UINT_TYPE identifier, void *ptr)=0
Deallocate memory.
virtual void * allocate(const NATIVE_UINT_TYPE identifier, NATIVE_UINT_TYPE &size, bool &recoverable)=0
Allocate memory.
Success/Failure.
T e
The raw enum value.
@ SUCCESS
Representing success.
Auto-generated base for ComQueue component.
void deallocate_out(FwIndexType portNum, Fw::Buffer &fwBuffer)
Invoke output port deallocate.
void tlmWrite_comQueueDepth(const Svc::ComQueueDepth &arg, Fw::Time _tlmTime=Fw::Time())
void tlmWrite_buffQueueDepth(const Svc::BuffQueueDepth &arg, Fw::Time _tlmTime=Fw::Time())
void buffQueueSend_out(FwIndexType portNum, Fw::Buffer &fwBuffer)
Invoke output port buffQueueSend.
void comQueueSend_out(FwIndexType portNum, Fw::ComBuffer &data, U32 context)
Invoke output port comQueueSend.
void log_WARNING_HI_QueueOverflow(Svc::QueueType queueType, U32 index)
ComQueue(const char *const compName)
Definition: ComQueue.cpp:24
void configure(QueueConfigurationTable queueConfig, NATIVE_UINT_TYPE allocationId, Fw::MemAllocator &allocator)
Definition: ComQueue.cpp:45
void cleanup()
Definition: ComQueue.cpp:38
static const FwIndexType BUFFER_PORT_COUNT
Total count of input buffer ports and thus total queues.
Definition: ComQueue.hpp:29
static const FwIndexType COM_PORT_COUNT
< Count of Fw::Com input ports and thus Fw::Com queues
Definition: ComQueue.hpp:26
static const FwIndexType TOTAL_PORT_COUNT
Definition: ComQueue.hpp:32
NATIVE_UINT_TYPE getQueueSize() const
Definition: Queue.cpp:60
void setup(U8 *const storage, const FwSizeType storage_size, const FwSizeType depth, const FwSizeType message_size)
setup the queue object to setup storage
Definition: Queue.cpp:17
Fw::SerializeStatus dequeue(U8 *const message, const FwSizeType size)
pops a fixed-size message off the front of the queue
Definition: Queue.cpp:38
Fw::SerializeStatus enqueue(const U8 *const message, const FwSizeType size)
pushes a fixed-size message onto the back of the queue
Definition: Queue.cpp:29
NATIVE_UINT_TYPE get_high_water_mark() const
Definition: Queue.cpp:51
void clear_high_water_mark()
Definition: Queue.cpp:56
SerializeStatus
forward declaration for string
@ FW_SERIALIZE_NO_ROOM_LEFT
No room left in the buffer to serialize data.
FwIndexType priority
Priority of the queue [0, TOTAL_PORT_COUNT)
Definition: ComQueue.hpp:47
FwSizeType depth
Depth of the queue [0, infinity)
Definition: ComQueue.hpp:46
configuration table for each queue
Definition: ComQueue.hpp:59
QueueConfigurationEntry entries[TOTAL_PORT_COUNT]
Definition: ComQueue.hpp:60