F´ Flight Software - C/C++ Documentation  devel
A framework for building embedded system applications to NASA flight quality standards.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
BufferAccumulator.cpp
Go to the documentation of this file.
1 // ======================================================================
2 // \title BufferAccumulator.cpp
3 // \author bocchino
4 // \brief BufferAccumulator implementation
5 //
6 // \copyright
7 // Copyright (C) 2017 California Institute of Technology.
8 // ALL RIGHTS RESERVED. United States Government Sponsorship
9 // acknowledged.
10 //
11 // ======================================================================
12 
14 
15 #include <sys/time.h>
16 
17 #include "Fw/Types/BasicTypes.hpp"
18 
19 
20 namespace Svc {
21 
22 // ----------------------------------------------------------------------
23 // Construction, initialization, and destruction
24 // ----------------------------------------------------------------------
25 
27  BufferAccumulator(const char* const compName)
28  : BufferAccumulatorComponentBase(compName),
29  m_mode(BufferAccumulator_OpState::ACCUMULATE),
30  m_bufferMemory(nullptr),
31  m_bufferQueue(),
32  m_send(false),
33  m_waitForBuffer(false),
34  m_numWarnings(0u),
35  m_numDrained(0u),
36  m_numToDrain(0u),
37  m_opCode(),
38  m_cmdSeq(0u),
39  m_allocatorId(0) {
40 }
41 
43 
44 // ----------------------------------------------------------------------
45 // Public methods
46 // ----------------------------------------------------------------------
47 
49  NATIVE_INT_TYPE identifier, Fw::MemAllocator& allocator,
50  NATIVE_UINT_TYPE maxNumBuffers
51 ) {
52 
53  this->m_allocatorId = identifier;
54  NATIVE_UINT_TYPE memSize = static_cast<NATIVE_UINT_TYPE>(sizeof(Fw::Buffer) * maxNumBuffers);
55  bool recoverable = false;
56  this->m_bufferMemory = static_cast<Fw::Buffer*>(
57  allocator.allocate(static_cast<NATIVE_UINT_TYPE>(identifier), memSize, recoverable));
58  //TODO: Fail gracefully here
59  m_bufferQueue.init(this->m_bufferMemory, maxNumBuffers);
60 }
61 
63  allocator.deallocate(static_cast<NATIVE_UINT_TYPE>(this->m_allocatorId), this->m_bufferMemory);
64 }
65 
66 // ----------------------------------------------------------------------
67 // Handler implementations for user-defined typed input ports
68 // ----------------------------------------------------------------------
69 
70 void BufferAccumulator ::bufferSendInFill_handler(const NATIVE_INT_TYPE portNum,
71  Fw::Buffer& buffer) {
72 
73  const bool status = this->m_bufferQueue.enqueue(buffer);
74  if (status) {
75  if (this->m_numWarnings > 0) {
77  }
78  this->m_numWarnings = 0;
79  } else {
80  if (this->m_numWarnings == 0) {
82  }
83  m_numWarnings++;
84  }
85  if (this->m_send) {
86  this->sendStoredBuffer();
87  }
88 
89  this->tlmWrite_BA_NumQueuedBuffers(this->m_bufferQueue.getSize());
90 }
91 
92 void BufferAccumulator ::bufferSendInReturn_handler(
93  const NATIVE_INT_TYPE portNum, Fw::Buffer& buffer) {
94 
95  this->bufferSendOutReturn_out(0, buffer);
96  this->m_waitForBuffer = false;
97  if ((this->m_mode == BufferAccumulator_OpState::DRAIN) || // we are draining ALL buffers
98  (this->m_numDrained < this->m_numToDrain)) { // OR we aren't done draining some buffers
99  // in a partial drain
100  this->m_send = true;
101  this->sendStoredBuffer();
102  }
103 }
104 
105 void BufferAccumulator ::pingIn_handler(const NATIVE_INT_TYPE portNum,
106  U32 key) {
107  this->pingOut_out(0, key);
108 }
109 
110 // ----------------------------------------------------------------------
111 // Command handler implementations
112 // ----------------------------------------------------------------------
113 
114 void BufferAccumulator ::BA_SetMode_cmdHandler(const FwOpcodeType opCode,
115  const U32 cmdSeq,
116  BufferAccumulator_OpState mode) {
117 
118  // cancel an in-progress partial drain
119  if (this->m_numToDrain > 0) {
120  // reset counters for partial buffer drain
121  this->m_numToDrain = 0;
122  this->m_numDrained = 0;
123  // respond to the original command
124  this->cmdResponse_out(this->m_opCode, this->m_cmdSeq, Fw::CmdResponse::OK);
125  }
126 
127  this->m_mode = mode;
128  if (mode == BufferAccumulator_OpState::DRAIN) {
129  if (!this->m_waitForBuffer) {
130  this->m_send = true;
131  this->sendStoredBuffer();
132  }
133  } else {
134  this->m_send = false;
135  }
136  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::OK);
137 }
138 
139 void BufferAccumulator ::BA_DrainBuffers_cmdHandler(
140  const FwOpcodeType opCode, const U32 cmdSeq, U32 numToDrain,
141  BufferAccumulator_BlockMode blockMode) {
142 
143  if (this->m_numDrained < this->m_numToDrain) {
144  this->log_WARNING_HI_BA_StillDraining(this->m_numDrained, this->m_numToDrain);
145  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::BUSY);
146  return;
147  }
148 
149  if (this->m_mode == BufferAccumulator_OpState::DRAIN) {
151  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::VALIDATION_ERROR);
152  return;
153  }
154 
155  if (numToDrain == 0) {
157  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::OK);
158  return;
159  }
160 
161  this->m_opCode = opCode;
162  this->m_cmdSeq = cmdSeq;
163  this->m_numDrained = 0;
164  this->m_numToDrain = numToDrain;
165 
166  if (blockMode == BufferAccumulator_BlockMode::NOBLOCK) {
167  U32 numBuffers = this->m_bufferQueue.getSize();
168 
169  if (numBuffers < numToDrain) {
170  this->m_numToDrain = numBuffers;
171  this->log_WARNING_LO_BA_NonBlockDrain(this->m_numToDrain, numToDrain);
172  }
173 
174  /* OK if there were 0 buffers queued, and we
175  * end up setting numToDrain to 0
176  */
177  if (0 == this->m_numToDrain) {
179  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::OK);
180  return;
181  }
182  }
183 
184  // We are still waiting for a buffer from last time
185  if (!this->m_waitForBuffer) {
186  this->m_send = true;
187  this->sendStoredBuffer(); // kick off the draining;
188  }
189 }
190 
191 // ----------------------------------------------------------------------
192 // Private helper methods
193 // ----------------------------------------------------------------------
194 
195 void BufferAccumulator ::sendStoredBuffer() {
196 
197  FW_ASSERT(this->m_send);
198  Fw::Buffer buffer;
199  if ((this->m_numToDrain == 0) || // we are draining ALL buffers
200  (this->m_numDrained < this->m_numToDrain)) { // OR we aren't done draining some buffers in a
201  // partial drain
202  const bool status = this->m_bufferQueue.dequeue(buffer);
203  if (status) { // a buffer was dequeued
204  this->m_numDrained++;
205  this->bufferSendOutDrain_out(0, buffer);
206  this->m_waitForBuffer = true;
207  this->m_send = false;
208  } else if (this->m_numToDrain > 0) {
209  this->log_WARNING_HI_BA_DrainStalled(this->m_numDrained, this->m_numToDrain);
210  }
211  }
212 
213  /* This used to be "else if", but then you wait for all
214  * drained buffers in a partial drain to be RETURNED before returning OK.
215  * Correct thing is to return OK once they are SENT
216  */
217  if ((this->m_numToDrain > 0) && // we are doing a partial drain
218  (this->m_numDrained == this->m_numToDrain)) { // AND we just finished draining
219  //
220  this->log_ACTIVITY_HI_BA_PartialDrainDone(this->m_numDrained);
221  // reset counters for partial buffer drain
222  this->m_numToDrain = 0;
223  this->m_numDrained = 0;
224  this->m_send = false;
225  this->cmdResponse_out(this->m_opCode, this->m_cmdSeq, Fw::CmdResponse::OK);
226  }
227 
228  this->tlmWrite_BA_NumQueuedBuffers(this->m_bufferQueue.getSize());
229 }
230 
231 } // namespace Svc
#define FW_ASSERT(...)
Definition: Assert.hpp:14
PlatformIntType NATIVE_INT_TYPE
Definition: BasicTypes.h:55
PlatformUIntType NATIVE_UINT_TYPE
Definition: BasicTypes.h:56
C++ header for working with basic fprime types.
U32 FwOpcodeType
Definition: FpConfig.h:91
@ VALIDATION_ERROR
Command failed validation.
@ OK
Command successfully executed.
@ BUSY
Component busy.
virtual void deallocate(const NATIVE_UINT_TYPE identifier, void *ptr)=0
Deallocate memory.
virtual void * allocate(const NATIVE_UINT_TYPE identifier, NATIVE_UINT_TYPE &size, bool &recoverable)=0
Allocate memory.
Auto-generated base for BufferAccumulator component.
void bufferSendOutReturn_out(FwIndexType portNum, Fw::Buffer &fwBuffer)
Invoke output port bufferSendOutReturn.
void log_WARNING_HI_BA_StillDraining(U32 numDrained, U32 numToDrain)
void bufferSendOutDrain_out(FwIndexType portNum, Fw::Buffer &fwBuffer)
Invoke output port bufferSendOutDrain.
void log_WARNING_LO_BA_NonBlockDrain(U32 numWillDrain, U32 numReqDrain)
void log_WARNING_HI_BA_DrainStalled(U32 numDrained, U32 numToDrain)
void pingOut_out(FwIndexType portNum, U32 key)
Invoke output port pingOut.
void cmdResponse_out(FwOpcodeType opCode, U32 cmdSeq, Fw::CmdResponse response)
Emit command response.
void tlmWrite_BA_NumQueuedBuffers(U32 arg, Fw::Time _tlmTime=Fw::Time())
void allocateQueue(NATIVE_INT_TYPE identifier, Fw::MemAllocator &allocator, NATIVE_UINT_TYPE maxNumBuffers)
void deallocateQueue(Fw::MemAllocator &allocator)
Return allocated queue. Should be done during shutdown.
BufferAccumulator(const char *const compName)