17ComQueue ::QueueConfigurationTable ::QueueConfigurationTable() {
19 this->entries[i].priority = 0;
20 this->entries[i].depth = 0;
24ComQueue ::ComQueue(
const char*
const compName)
25 : ComQueueComponentBase(compName),
29 m_allocation(nullptr) {
32 this->m_throttle[i] =
false;
36ComQueue ::~ComQueue() {}
39 ComQueueComponentBase::init(queueDepth, instance);
42void ComQueue ::cleanup() {
44 if ((this->m_allocator !=
nullptr) && (this->m_allocation !=
nullptr)) {
45 this->m_allocator->deallocate(this->m_allocationId, this->m_allocation);
56 this->m_allocator = &allocator;
57 this->m_allocationId = allocationId;
58 this->m_allocation =
nullptr;
80 QueueMetadata& entry = this->m_prioritizedList[currentPriorityIndex];
83 entry.index = entryIndex;
87 totalAllocation += entry.depth * entry.msgSize;
88 currentPriorityIndex++;
93 bool recoverable =
false;
94 this->m_allocation = this->m_allocator->
allocate(this->m_allocationId, totalAllocation, recoverable);
102 FwSizeType allocationSize = this->m_prioritizedList[i].depth * this->m_prioritizedList[i].msgSize;
104 this->m_prioritizedList[i].index);
105 FW_ASSERT((allocationSize + allocationOffset) <= totalAllocation, allocationSize, allocationOffset,
109 if (allocationSize > 0) {
110 this->m_queues[this->m_prioritizedList[i].index].
setup(
111 reinterpret_cast<U8*
>(this->m_allocation) + allocationOffset, allocationSize,
112 this->m_prioritizedList[i].depth, this->m_prioritizedList[i].msgSize);
114 allocationOffset += allocationSize;
117 FW_ASSERT(allocationOffset == totalAllocation, allocationOffset, totalAllocation);
126 this->enqueue(portNum, QueueType::COM_QUEUE,
reinterpret_cast<const U8*
>(&data),
sizeof(
Fw::ComBuffer));
134 this->enqueue(queueNum, QueueType::BUFFER_QUEUE,
reinterpret_cast<const U8*
>(&fwBuffer),
sizeof(
Fw::Buffer));
137void ComQueue::comStatusIn_handler(
const NATIVE_INT_TYPE portNum, Fw::Success& condition) {
138 switch (this->m_state) {
141 if (condition.e == Fw::Success::SUCCESS) {
142 this->m_state = READY;
143 this->processQueue();
145 FW_ASSERT((this->m_state == WAITING || this->m_state == READY), this->m_state);
147 this->m_state = WAITING;
160 ComQueueDepth comQueueDepth;
161 for (
FwSizeType i = 0; i < comQueueDepth.SIZE; i++) {
165 this->tlmWrite_comQueueDepth(comQueueDepth);
168 BuffQueueDepth buffQueueDepth;
169 for (
FwSizeType i = 0; i < buffQueueDepth.SIZE; i++) {
173 this->tlmWrite_buffQueueDepth(buffQueueDepth);
180void ComQueue::enqueue(
const FwIndexType queueNum, QueueType queueType,
const U8* data,
const FwSizeType size) {
185 FW_ASSERT(expectedSize == size, size, expectedSize);
189 this->log_WARNING_HI_QueueOverflow(queueType, portNum);
190 this->m_throttle[queueNum] =
true;
193 if (this->m_state == READY) {
194 this->processQueue();
200 this->comQueueSend_out(0, comBuffer, 0);
201 this->m_state = WAITING;
204void ComQueue::sendBuffer(
Fw::Buffer& buffer) {
207 this->buffQueueSend_out(0, buffer);
208 this->m_state = WAITING;
211void ComQueue::processQueue() {
219 for (priorityIndex = 0; priorityIndex <
TOTAL_PORT_COUNT; priorityIndex++) {
220 QueueMetadata& entry = this->m_prioritizedList[priorityIndex];
231 queue.
dequeue(
reinterpret_cast<U8*
>(&comBuffer),
sizeof(comBuffer));
232 this->sendComBuffer(comBuffer);
235 queue.
dequeue(
reinterpret_cast<U8*
>(&buffer),
sizeof(buffer));
236 this->sendBuffer(buffer);
240 this->m_throttle[entry.index] =
false;
243 sendPriority = entry.priority;
250 for (priorityIndex++;
251 priorityIndex <
TOTAL_PORT_COUNT && (this->m_prioritizedList[priorityIndex].priority == sendPriority);
254 QueueMetadata temp = this->m_prioritizedList[priorityIndex];
255 this->m_prioritizedList[priorityIndex] = this->m_prioritizedList[priorityIndex - 1];
256 this->m_prioritizedList[priorityIndex - 1] = temp;
PlatformIntType NATIVE_INT_TYPE
uint8_t U8
8-bit unsigned integer
#define FW_NUM_ARRAY_ELEMENTS(a)
number of elements in an array
PlatformUIntType NATIVE_UINT_TYPE
C++ header for working with basic fprime types.
PlatformSizeType FwSizeType
PlatformIndexType FwIndexType
virtual void * allocate(const NATIVE_UINT_TYPE identifier, NATIVE_UINT_TYPE &size, bool &recoverable)=0
Allocate memory.
void configure(QueueConfigurationTable queueConfig, NATIVE_UINT_TYPE allocationId, Fw::MemAllocator &allocator)
static const FwIndexType BUFFER_PORT_COUNT
Total count of input buffer ports and thus total queues.
static const FwIndexType COM_PORT_COUNT
< Count of Fw::Com input ports and thus Fw::Com queues
static const FwIndexType TOTAL_PORT_COUNT
NATIVE_UINT_TYPE getQueueSize() const
void setup(U8 *const storage, const FwSizeType storage_size, const FwSizeType depth, const FwSizeType message_size)
setup the queue object to setup storage
Fw::SerializeStatus dequeue(U8 *const message, const FwSizeType size)
pops a fixed-size message off the front of the queue
Fw::SerializeStatus enqueue(const U8 *const message, const FwSizeType size)
pushes a fixed-size message onto the back of the queue
NATIVE_UINT_TYPE get_high_water_mark() const
void clear_high_water_mark()
SerializeStatus
forward declaration for string
@ FW_SERIALIZE_NO_ROOM_LEFT
No room left in the buffer to serialize data.
FwIndexType priority
Priority of the queue [0, TOTAL_PORT_COUNT)
FwSizeType depth
Depth of the queue [0, infinity)
configuration table for each queue
QueueConfigurationEntry entries[TOTAL_PORT_COUNT]