F´ Flight Software - C/C++ Documentation  devel
A framework for building embedded system applications to NASA flight quality standards.
BufferAccumulator.cpp
Go to the documentation of this file.
1 // ======================================================================
2 // \title BufferAccumulator.cpp
3 // \author bocchino
4 // \brief BufferAccumulator implementation
5 //
6 // \copyright
7 // Copyright (C) 2017 California Institute of Technology.
8 // ALL RIGHTS RESERVED. United States Government Sponsorship
9 // acknowledged.
10 //
11 // ======================================================================
12 
14 
15 #include <sys/time.h>
16 
17 #include "Fw/Types/BasicTypes.hpp"
18 
19 
20 namespace Svc {
21 
22 // ----------------------------------------------------------------------
23 // Construction, initialization, and destruction
24 // ----------------------------------------------------------------------
25 
27  BufferAccumulator(const char* const compName)
28  : BufferAccumulatorComponentBase(compName),
29  m_mode(BufferAccumulator_OpState::ACCUMULATE),
30  m_bufferMemory(nullptr),
31  m_bufferQueue(),
32  m_send(false),
33  m_waitForBuffer(false),
34  m_numWarnings(0u),
35  m_numDrained(0u),
36  m_numToDrain(0u),
37  m_opCode(),
38  m_cmdSeq(0u),
39  m_allocatorId(0) {
40 }
41 
43  const NATIVE_INT_TYPE instance) {
44  BufferAccumulatorComponentBase::init(queueDepth, instance);
45 }
46 
48 
49 // ----------------------------------------------------------------------
50 // Public methods
51 // ----------------------------------------------------------------------
52 
54  NATIVE_INT_TYPE identifier, Fw::MemAllocator& allocator,
55  NATIVE_UINT_TYPE maxNumBuffers
56 ) {
57 
58  this->m_allocatorId = identifier;
59  NATIVE_UINT_TYPE memSize = static_cast<NATIVE_UINT_TYPE>(sizeof(Fw::Buffer) * maxNumBuffers);
60  bool recoverable = false;
61  this->m_bufferMemory = static_cast<Fw::Buffer*>(
62  allocator.allocate(static_cast<NATIVE_UINT_TYPE>(identifier), memSize, recoverable));
63  //TODO: Fail gracefully here
64  m_bufferQueue.init(this->m_bufferMemory, maxNumBuffers);
65 }
66 
68  allocator.deallocate(static_cast<NATIVE_UINT_TYPE>(this->m_allocatorId), this->m_bufferMemory);
69 }
70 
71 // ----------------------------------------------------------------------
72 // Handler implementations for user-defined typed input ports
73 // ----------------------------------------------------------------------
74 
75 void BufferAccumulator ::bufferSendInFill_handler(const NATIVE_INT_TYPE portNum,
76  Fw::Buffer& buffer) {
77 
78  const bool status = this->m_bufferQueue.enqueue(buffer);
79  if (status) {
80  if (this->m_numWarnings > 0) {
82  }
83  this->m_numWarnings = 0;
84  } else {
85  if (this->m_numWarnings == 0) {
87  }
88  m_numWarnings++;
89  }
90  if (this->m_send) {
91  this->sendStoredBuffer();
92  }
93 
94  this->tlmWrite_BA_NumQueuedBuffers(this->m_bufferQueue.getSize());
95 }
96 
97 void BufferAccumulator ::bufferSendInReturn_handler(
98  const NATIVE_INT_TYPE portNum, Fw::Buffer& buffer) {
99 
100  this->bufferSendOutReturn_out(0, buffer);
101  this->m_waitForBuffer = false;
102  if ((this->m_mode == BufferAccumulator_OpState::DRAIN) || // we are draining ALL buffers
103  (this->m_numDrained < this->m_numToDrain)) { // OR we aren't done draining some buffers
104  // in a partial drain
105  this->m_send = true;
106  this->sendStoredBuffer();
107  }
108 }
109 
110 void BufferAccumulator ::pingIn_handler(const NATIVE_INT_TYPE portNum,
111  U32 key) {
112  this->pingOut_out(0, key);
113 }
114 
115 // ----------------------------------------------------------------------
116 // Command handler implementations
117 // ----------------------------------------------------------------------
118 
119 void BufferAccumulator ::BA_SetMode_cmdHandler(const FwOpcodeType opCode,
120  const U32 cmdSeq,
121  BufferAccumulator_OpState mode) {
122 
123  // cancel an in-progress partial drain
124  if (this->m_numToDrain > 0) {
125  // reset counters for partial buffer drain
126  this->m_numToDrain = 0;
127  this->m_numDrained = 0;
128  // respond to the original command
129  this->cmdResponse_out(this->m_opCode, this->m_cmdSeq, Fw::CmdResponse::OK);
130  }
131 
132  this->m_mode = mode;
133  if (mode == BufferAccumulator_OpState::DRAIN) {
134  if (!this->m_waitForBuffer) {
135  this->m_send = true;
136  this->sendStoredBuffer();
137  }
138  } else {
139  this->m_send = false;
140  }
141  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::OK);
142 }
143 
144 void BufferAccumulator ::BA_DrainBuffers_cmdHandler(
145  const FwOpcodeType opCode, const U32 cmdSeq, U32 numToDrain,
146  BufferAccumulator_BlockMode blockMode) {
147 
148  if (this->m_numDrained < this->m_numToDrain) {
149  this->log_WARNING_HI_BA_StillDraining(this->m_numDrained, this->m_numToDrain);
150  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::BUSY);
151  return;
152  }
153 
154  if (this->m_mode == BufferAccumulator_OpState::DRAIN) {
156  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::VALIDATION_ERROR);
157  return;
158  }
159 
160  if (numToDrain == 0) {
162  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::OK);
163  return;
164  }
165 
166  this->m_opCode = opCode;
167  this->m_cmdSeq = cmdSeq;
168  this->m_numDrained = 0;
169  this->m_numToDrain = numToDrain;
170 
171  if (blockMode == BufferAccumulator_BlockMode::NOBLOCK) {
172  U32 numBuffers = this->m_bufferQueue.getSize();
173 
174  if (numBuffers < numToDrain) {
175  this->m_numToDrain = numBuffers;
176  this->log_WARNING_LO_BA_NonBlockDrain(this->m_numToDrain, numToDrain);
177  }
178 
179  /* OK if there were 0 buffers queued, and we
180  * end up setting numToDrain to 0
181  */
182  if (0 == this->m_numToDrain) {
184  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::OK);
185  return;
186  }
187  }
188 
189  // We are still waiting for a buffer from last time
190  if (!this->m_waitForBuffer) {
191  this->m_send = true;
192  this->sendStoredBuffer(); // kick off the draining;
193  }
194 }
195 
196 // ----------------------------------------------------------------------
197 // Private helper methods
198 // ----------------------------------------------------------------------
199 
200 void BufferAccumulator ::sendStoredBuffer() {
201 
202  FW_ASSERT(this->m_send);
203  Fw::Buffer buffer;
204  if ((this->m_numToDrain == 0) || // we are draining ALL buffers
205  (this->m_numDrained < this->m_numToDrain)) { // OR we aren't done draining some buffers in a
206  // partial drain
207  const bool status = this->m_bufferQueue.dequeue(buffer);
208  if (status) { // a buffer was dequeued
209  this->m_numDrained++;
210  this->bufferSendOutDrain_out(0, buffer);
211  this->m_waitForBuffer = true;
212  this->m_send = false;
213  } else if (this->m_numToDrain > 0) {
214  this->log_WARNING_HI_BA_DrainStalled(this->m_numDrained, this->m_numToDrain);
215  }
216  }
217 
218  /* This used to be "else if", but then you wait for all
219  * drained buffers in a partial drain to be RETURNED before returning OK.
220  * Correct thing is to return OK once they are SENT
221  */
222  if ((this->m_numToDrain > 0) && // we are doing a partial drain
223  (this->m_numDrained == this->m_numToDrain)) { // AND we just finished draining
224  //
225  this->log_ACTIVITY_HI_BA_PartialDrainDone(this->m_numDrained);
226  // reset counters for partial buffer drain
227  this->m_numToDrain = 0;
228  this->m_numDrained = 0;
229  this->m_send = false;
230  this->cmdResponse_out(this->m_opCode, this->m_cmdSeq, Fw::CmdResponse::OK);
231  }
232 
233  this->tlmWrite_BA_NumQueuedBuffers(this->m_bufferQueue.getSize());
234 }
235 
236 } // namespace Svc
#define FW_ASSERT(...)
Definition: Assert.hpp:14
PlatformIntType NATIVE_INT_TYPE
Definition: BasicTypes.h:51
PlatformUIntType NATIVE_UINT_TYPE
Definition: BasicTypes.h:52
C++ header for working with basic fprime types.
U32 FwOpcodeType
Definition: FpConfig.h:78
@ VALIDATION_ERROR
Command failed validation.
@ OK
Command successfully executed.
@ BUSY
Component busy.
virtual void deallocate(const NATIVE_UINT_TYPE identifier, void *ptr)=0
Deallocate memory.
virtual void * allocate(const NATIVE_UINT_TYPE identifier, NATIVE_UINT_TYPE &size, bool &recoverable)=0
Allocate memory.
void init()
Object initializer.
Definition: ObjBase.cpp:27
Auto-generated base for BufferAccumulator component.
void bufferSendOutReturn_out(FwIndexType portNum, Fw::Buffer &fwBuffer)
Invoke output port bufferSendOutReturn.
void log_WARNING_HI_BA_StillDraining(U32 numDrained, U32 numToDrain)
void bufferSendOutDrain_out(FwIndexType portNum, Fw::Buffer &fwBuffer)
Invoke output port bufferSendOutDrain.
void log_WARNING_LO_BA_NonBlockDrain(U32 numWillDrain, U32 numReqDrain)
void log_WARNING_HI_BA_DrainStalled(U32 numDrained, U32 numToDrain)
void pingOut_out(FwIndexType portNum, U32 key)
Invoke output port pingOut.
void cmdResponse_out(FwOpcodeType opCode, U32 cmdSeq, Fw::CmdResponse response)
Emit command response.
void tlmWrite_BA_NumQueuedBuffers(U32 arg, Fw::Time _tlmTime=Fw::Time())
void allocateQueue(NATIVE_INT_TYPE identifier, Fw::MemAllocator &allocator, NATIVE_UINT_TYPE maxNumBuffers)
void deallocateQueue(Fw::MemAllocator &allocator)
Return allocated queue. Should be done during shutdown.
BufferAccumulator(const char *const compName)