F´ Flight Software - C/C++ Documentation  devel
A framework for building embedded system applications to NASA flight quality standards.
BufferAccumulator.cpp
Go to the documentation of this file.
1 // ======================================================================
2 // \title BufferAccumulator.cpp
3 // \author bocchino
4 // \brief BufferAccumulator implementation
5 //
6 // \copyright
7 // Copyright (C) 2017 California Institute of Technology.
8 // ALL RIGHTS RESERVED. United States Government Sponsorship
9 // acknowledged.
10 //
11 // ======================================================================
12 
14 
15 #include <sys/time.h>
16 
17 #include "Fw/Types/BasicTypes.hpp"
18 
19 
20 namespace Svc {
21 
22 // ----------------------------------------------------------------------
23 // Construction, initialization, and destruction
24 // ----------------------------------------------------------------------
25 
27  BufferAccumulator(const char* const compName)
28  : BufferAccumulatorComponentBase(compName),
29  m_mode(BufferAccumulator_OpState::ACCUMULATE),
30  m_bufferMemory(nullptr),
31  m_bufferQueue(),
32  m_send(false),
33  m_waitForBuffer(false),
34  m_numWarnings(0u),
35  m_numDrained(0u),
36  m_numToDrain(0u),
37  m_opCode(),
38  m_cmdSeq(0u),
39  m_allocatorId(0) {
40 }
41 
43 
44 // ----------------------------------------------------------------------
45 // Public methods
46 // ----------------------------------------------------------------------
47 
49  NATIVE_INT_TYPE identifier, Fw::MemAllocator& allocator,
50  NATIVE_UINT_TYPE maxNumBuffers
51 ) {
52 
53  this->m_allocatorId = identifier;
54  NATIVE_UINT_TYPE memSize = static_cast<NATIVE_UINT_TYPE>(sizeof(Fw::Buffer) * maxNumBuffers);
55  bool recoverable = false;
56  this->m_bufferMemory = static_cast<Fw::Buffer*>(
57  allocator.allocate(static_cast<NATIVE_UINT_TYPE>(identifier), memSize, recoverable));
58  //TODO: Fail gracefully here
59  m_bufferQueue.init(this->m_bufferMemory, maxNumBuffers);
60 }
61 
63  allocator.deallocate(static_cast<NATIVE_UINT_TYPE>(this->m_allocatorId), this->m_bufferMemory);
64 }
65 
66 // ----------------------------------------------------------------------
67 // Handler implementations for user-defined typed input ports
68 // ----------------------------------------------------------------------
69 
70 void BufferAccumulator ::bufferSendInFill_handler(const NATIVE_INT_TYPE portNum,
71  Fw::Buffer& buffer) {
72 
73  const bool status = this->m_bufferQueue.enqueue(buffer);
74  if (status) {
75  if (this->m_numWarnings > 0) {
77  }
78  this->m_numWarnings = 0;
79  } else {
80  if (this->m_numWarnings == 0) {
82  }
83  m_numWarnings++;
84  }
85  if (this->m_send) {
86  this->sendStoredBuffer();
87  }
88 
89  this->tlmWrite_BA_NumQueuedBuffers(this->m_bufferQueue.getSize());
90 }
91 
92 void BufferAccumulator ::bufferSendInReturn_handler(
93  const NATIVE_INT_TYPE portNum, Fw::Buffer& buffer) {
94 
95  this->bufferSendOutReturn_out(0, buffer);
96  this->m_waitForBuffer = false;
97  if ((this->m_mode == BufferAccumulator_OpState::DRAIN) || // we are draining ALL buffers
98  (this->m_numDrained < this->m_numToDrain)) { // OR we aren't done draining some buffers
99  // in a partial drain
100  this->m_send = true;
101  this->sendStoredBuffer();
102  }
103 }
104 
105 void BufferAccumulator ::pingIn_handler(const NATIVE_INT_TYPE portNum,
106  U32 key) {
107  this->pingOut_out(0, key);
108 }
109 
110 // ----------------------------------------------------------------------
111 // Command handler implementations
112 // ----------------------------------------------------------------------
113 
114 void BufferAccumulator ::BA_SetMode_cmdHandler(const FwOpcodeType opCode,
115  const U32 cmdSeq,
116  BufferAccumulator_OpState mode) {
117 
118  // cancel an in-progress partial drain
119  if (this->m_numToDrain > 0) {
120  // reset counters for partial buffer drain
121  this->m_numToDrain = 0;
122  this->m_numDrained = 0;
123  // respond to the original command
124  this->cmdResponse_out(this->m_opCode, this->m_cmdSeq, Fw::CmdResponse::OK);
125  }
126 
127  this->m_mode = mode;
128  if (mode == BufferAccumulator_OpState::DRAIN) {
129  if (!this->m_waitForBuffer) {
130  this->m_send = true;
131  this->sendStoredBuffer();
132  }
133  } else {
134  this->m_send = false;
135  }
136  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::OK);
137 }
138 
139 void BufferAccumulator ::BA_DrainBuffers_cmdHandler(
140  const FwOpcodeType opCode, const U32 cmdSeq, U32 numToDrain,
141  BufferAccumulator_BlockMode blockMode) {
142 
143  if (this->m_numDrained < this->m_numToDrain) {
144  this->log_WARNING_HI_BA_StillDraining(this->m_numDrained, this->m_numToDrain);
145  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::BUSY);
146  return;
147  }
148 
149  if (this->m_mode == BufferAccumulator_OpState::DRAIN) {
151  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::VALIDATION_ERROR);
152  return;
153  }
154 
155  if (numToDrain == 0) {
157  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::OK);
158  return;
159  }
160 
161  this->m_opCode = opCode;
162  this->m_cmdSeq = cmdSeq;
163  this->m_numDrained = 0;
164  this->m_numToDrain = numToDrain;
165 
166  if (blockMode == BufferAccumulator_BlockMode::NOBLOCK) {
167  U32 numBuffers = this->m_bufferQueue.getSize();
168 
169  if (numBuffers < numToDrain) {
170  this->m_numToDrain = numBuffers;
171  this->log_WARNING_LO_BA_NonBlockDrain(this->m_numToDrain, numToDrain);
172  }
173 
174  /* OK if there were 0 buffers queued, and we
175  * end up setting numToDrain to 0
176  */
177  if (0 == this->m_numToDrain) {
179  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::OK);
180  return;
181  }
182  }
183 
184  // We are still waiting for a buffer from last time
185  if (!this->m_waitForBuffer) {
186  this->m_send = true;
187  this->sendStoredBuffer(); // kick off the draining;
188  }
189 }
190 
191 // ----------------------------------------------------------------------
192 // Private helper methods
193 // ----------------------------------------------------------------------
194 
195 void BufferAccumulator ::sendStoredBuffer() {
196 
197  FW_ASSERT(this->m_send);
198  Fw::Buffer buffer;
199  if ((this->m_numToDrain == 0) || // we are draining ALL buffers
200  (this->m_numDrained < this->m_numToDrain)) { // OR we aren't done draining some buffers in a
201  // partial drain
202  const bool status = this->m_bufferQueue.dequeue(buffer);
203  if (status) { // a buffer was dequeued
204  this->m_numDrained++;
205  this->bufferSendOutDrain_out(0, buffer);
206  this->m_waitForBuffer = true;
207  this->m_send = false;
208  } else if (this->m_numToDrain > 0) {
209  this->log_WARNING_HI_BA_DrainStalled(this->m_numDrained, this->m_numToDrain);
210  }
211  }
212 
213  /* This used to be "else if", but then you wait for all
214  * drained buffers in a partial drain to be RETURNED before returning OK.
215  * Correct thing is to return OK once they are SENT
216  */
217  if ((this->m_numToDrain > 0) && // we are doing a partial drain
218  (this->m_numDrained == this->m_numToDrain)) { // AND we just finished draining
219  //
220  this->log_ACTIVITY_HI_BA_PartialDrainDone(this->m_numDrained);
221  // reset counters for partial buffer drain
222  this->m_numToDrain = 0;
223  this->m_numDrained = 0;
224  this->m_send = false;
225  this->cmdResponse_out(this->m_opCode, this->m_cmdSeq, Fw::CmdResponse::OK);
226  }
227 
228  this->tlmWrite_BA_NumQueuedBuffers(this->m_bufferQueue.getSize());
229 }
230 
231 } // namespace Svc
#define FW_ASSERT(...)
Definition: Assert.hpp:14
PlatformIntType NATIVE_INT_TYPE
Definition: BasicTypes.h:55
PlatformUIntType NATIVE_UINT_TYPE
Definition: BasicTypes.h:56
C++ header for working with basic fprime types.
U32 FwOpcodeType
Definition: FpConfig.h:91
@ VALIDATION_ERROR
Command failed validation.
@ OK
Command successfully executed.
@ BUSY
Component busy.
virtual void deallocate(const NATIVE_UINT_TYPE identifier, void *ptr)=0
Deallocate memory.
virtual void * allocate(const NATIVE_UINT_TYPE identifier, NATIVE_UINT_TYPE &size, bool &recoverable)=0
Allocate memory.
Auto-generated base for BufferAccumulator component.
void bufferSendOutReturn_out(FwIndexType portNum, Fw::Buffer &fwBuffer)
Invoke output port bufferSendOutReturn.
void log_WARNING_HI_BA_StillDraining(U32 numDrained, U32 numToDrain)
void bufferSendOutDrain_out(FwIndexType portNum, Fw::Buffer &fwBuffer)
Invoke output port bufferSendOutDrain.
void log_WARNING_LO_BA_NonBlockDrain(U32 numWillDrain, U32 numReqDrain)
void log_WARNING_HI_BA_DrainStalled(U32 numDrained, U32 numToDrain)
void pingOut_out(FwIndexType portNum, U32 key)
Invoke output port pingOut.
void cmdResponse_out(FwOpcodeType opCode, U32 cmdSeq, Fw::CmdResponse response)
Emit command response.
void tlmWrite_BA_NumQueuedBuffers(U32 arg, Fw::Time _tlmTime=Fw::Time())
void allocateQueue(NATIVE_INT_TYPE identifier, Fw::MemAllocator &allocator, NATIVE_UINT_TYPE maxNumBuffers)
void deallocateQueue(Fw::MemAllocator &allocator)
Return allocated queue. Should be done during shutdown.
BufferAccumulator(const char *const compName)