F´ Flight Software - C/C++ Documentation  NASA-v1.6.0
A framework for building embedded system applications to NASA flight quality standards.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
UdpReceiverComponentImpl.cpp
Go to the documentation of this file.
1 // ======================================================================
2 // \title UdpReceiverImpl.cpp
3 // \author tcanham
4 // \brief cpp file for UdpReceiver component implementation class
5 //
6 // \copyright
7 // Copyright 2009-2015, by the California Institute of Technology.
8 // ALL RIGHTS RESERVED. United States Government Sponsorship
9 // acknowledged.
10 //
11 // ======================================================================
12 
13 
15 #include "Fw/Types/BasicTypes.hpp"
16 #include <sys/types.h>
17 #include <cstring>
18 #include <cerrno>
19 #include <cstdlib>
20 #include <unistd.h>
21 #include <sys/socket.h>
22 #include <arpa/inet.h>
23 #include <Os/TaskString.hpp>
24 
25 //#define DEBUG_PRINT(x,...) printf(x,##__VA_ARGS__)
26 #define DEBUG_PRINT(x,...)
27 
28 namespace Svc {
29 
30  // ----------------------------------------------------------------------
31  // Construction, initialization, and destruction
32  // ----------------------------------------------------------------------
33 
36  const char *const compName
37  ) : UdpReceiverComponentBase(compName),
38  m_fd(-1),
39  m_packetsReceived(0),
40  m_bytesReceived(0),
41  m_packetsDropped(0),
42  m_decodeErrors(0),
43  m_firstSeq(true),
44  m_currSeq(0)
45  {
46 
47  }
48 
51  const NATIVE_INT_TYPE instance
52  )
53  {
54  UdpReceiverComponentBase::init(instance);
55  }
56 
59  {
60  if (this->m_fd != -1) {
61  close(this->m_fd);
62  }
63  }
64 
66  const char* port
67  ) {
68 
69  //create a UDP socket
70  this->m_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
71  if (-1 == this->m_fd) {
72  Fw::LogStringArg arg(strerror(errno));
73  this->log_WARNING_HI_UR_SocketError(arg);
74  }
75 
76  sockaddr_in saddr;
77  // zero out the structure
78  memset(&saddr, 0, sizeof(saddr));
79 
80  saddr.sin_family = AF_INET;
81  saddr.sin_port = htons(atoi(port));
82  saddr.sin_addr.s_addr = htonl(INADDR_ANY);
83 
84  //bind socket to port
85  NATIVE_INT_TYPE status = bind(this->m_fd , (struct sockaddr*)&saddr, sizeof(saddr));
86  if (-1 == status) {
87  Fw::LogStringArg arg(strerror(errno));
88  this->log_WARNING_HI_UR_BindError(arg);
89  close(this->m_fd);
90  this->m_fd = -1;
91  } else {
92  this->log_ACTIVITY_HI_UR_PortOpened(atoi(port));
93  }
94  }
95 
97  NATIVE_UINT_TYPE priority,
98  NATIVE_UINT_TYPE stackSize,
99  NATIVE_UINT_TYPE affinity
100  ) {
101  Os::TaskString name(this->getObjName());
102  Os::Task::TaskStatus stat = this->m_socketTask.start(
103  name,
104  0,
105  priority,
106  stackSize,
107  UdpReceiverComponentImpl::workerTask,
108  this,
109  affinity);
110  FW_ASSERT(Os::Task::TASK_OK == stat,stat);
111  }
112 
113 
114  // ----------------------------------------------------------------------
115  // Handler implementations for user-defined typed input ports
116  // ----------------------------------------------------------------------
117 
118  void UdpReceiverComponentImpl ::
119  Sched_handler(
120  const NATIVE_INT_TYPE portNum,
121  NATIVE_UINT_TYPE context
122  )
123  {
124  this->tlmWrite_UR_BytesReceived(this->m_bytesReceived);
125  this->tlmWrite_UR_PacketsReceived(this->m_packetsReceived);
126  this->tlmWrite_UR_PacketsDropped(this->m_packetsDropped);
127  }
128 
129  void UdpReceiverComponentImpl::workerTask(void* ptr) {
130  UdpReceiverComponentImpl *compPtr = static_cast<UdpReceiverComponentImpl*>(ptr);
131  while (true) {
132  compPtr->doRecv();
133  }
134  }
135 
136  void UdpReceiverComponentImpl::doRecv() {
137 
138  // wait for data from the socket
139  NATIVE_INT_TYPE psize = recvfrom(
140  this->m_fd,
141  this->m_recvBuff.getBuffAddr(),
142  this->m_recvBuff.getBuffCapacity(),
143  MSG_WAITALL,
144  0,
145  0);
146  if (-1 == psize) {
147  if (errno != EINTR) {
148  Fw::LogStringArg arg(strerror(errno));
149  this->log_WARNING_HI_UR_RecvError(arg);
150  }
151  return;
152  }
153  // reset buffer for deserialization
154  Fw::SerializeStatus stat = this->m_recvBuff.setBuffLen(psize);
155  FW_ASSERT(Fw::FW_SERIALIZE_OK == stat, stat);
156 
157  // get sequence number
158  U8 seqNum;
159  stat = this->m_recvBuff.deserialize(seqNum);
160  // check for deserialization error or port number too high
161  if (stat != Fw::FW_SERIALIZE_OK) {
162  this->log_WARNING_HI_UR_DecodeError(DECODE_SEQ,stat);
163  this->m_decodeErrors++;
164  return;
165  }
166 
167  // track sequence number
168  if (this->m_firstSeq) {
169  // first time, set tracked sequence number equal to the one received
170  this->m_currSeq = seqNum;
171  this->m_firstSeq = false;
172  } else {
173  // make sure sequence number has gone up by one
174  if (seqNum != ++this->m_currSeq) {
175  // will only be right if it rolls over only once, but better than nothing
176  U8 diff = seqNum - this->m_currSeq;
177  this->m_packetsDropped += diff;
178  // send EVR
179  this->log_WARNING_HI_UR_DroppedPacket(diff);
180  // reset to current sequence
181  this->m_currSeq = seqNum;
182  }
183  }
184 
185  // get port number
186  U8 portNum;
187  stat = this->m_recvBuff.deserialize(portNum);
188  // check for deserialization error or port number too high
189  if (stat != Fw::FW_SERIALIZE_OK or portNum > this->getNum_PortsOut_OutputPorts()) {
190  this->log_WARNING_HI_UR_DecodeError(DECODE_PORT,stat);
191  this->m_decodeErrors++;
192  return;
193  }
194  // get buffer for port
195 
196  stat = this->m_recvBuff.deserialize(this->m_portBuff);
197  if (stat != Fw::FW_SERIALIZE_OK) {
198  this->log_WARNING_HI_UR_DecodeError(DECODE_BUFFER,stat);
199  this->m_decodeErrors++;
200  return;
201  }
202 
203  // call output port
204  DEBUG_PRINT("Calling port %d with %d bytes.\n",portNum,this->m_portBuff.getBuffLength());
205  if (this->isConnected_PortsOut_OutputPort(portNum)) {
206 
207  Fw::SerializeStatus stat = this->PortsOut_out(portNum,this->m_portBuff);
208 
209  // If had issues deserializing the data, then report it:
210  if (stat != Fw::FW_SERIALIZE_OK) {
211 
212  this->log_WARNING_HI_UR_DecodeError(PORT_SEND,stat);
213  this->m_decodeErrors++;
214  }
215 
216  this->m_packetsReceived++;
217  this->m_bytesReceived += psize;
218 
219  }
220  }
221 
222 #ifdef BUILD_UT
223  UdpReceiverComponentImpl::UdpSerialBuffer& UdpReceiverComponentImpl::UdpSerialBuffer::operator=(const UdpReceiverComponentImpl::UdpSerialBuffer& other) {
224  this->resetSer();
225  this->serialize(other.getBuffAddr(),other.getBuffLength(),true);
226  return *this;
227  }
228 
229  UdpReceiverComponentImpl::UdpSerialBuffer::UdpSerialBuffer(
230  const Fw::SerializeBufferBase& other) : Fw::SerializeBufferBase() {
231  FW_ASSERT(sizeof(this->m_buff)>= other.getBuffLength(),sizeof(this->m_buff),other.getBuffLength());
232  memcpy(this->m_buff,other.getBuffAddr(),other.getBuffLength());
233  this->setBuffLen(other.getBuffLength());
234  }
235 
236  UdpReceiverComponentImpl::UdpSerialBuffer::UdpSerialBuffer(
237  const UdpReceiverComponentImpl::UdpSerialBuffer& other) : Fw::SerializeBufferBase() {
238  FW_ASSERT(sizeof(this->m_buff)>= other.getBuffLength(),sizeof(this->m_buff),other.getBuffLength());
239  memcpy(this->m_buff,other.m_buff,other.getBuffLength());
240  this->setBuffLen(other.getBuffLength());
241  }
242 
243  UdpReceiverComponentImpl::UdpSerialBuffer::UdpSerialBuffer(): Fw::SerializeBufferBase() {
244 
245  }
246 
247 #endif
248 
249 
250 
251 } // end namespace Svc
Fw::SerializeBufferBase
Definition: Serializable.hpp:43
Fw::SerializeStatus
SerializeStatus
forward declaration for string
Definition: Serializable.hpp:14
Fw::LogStringArg
Definition: LogString.hpp:11
U8
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.hpp:73
NATIVE_UINT_TYPE
unsigned int NATIVE_UINT_TYPE
native unsigned integer type declaration
Definition: BasicTypes.hpp:28
Svc::UdpReceiverComponentImpl::init
void init(const NATIVE_INT_TYPE instance=0)
Definition: UdpReceiverComponentImpl.cpp:50
Fw::SerializeBufferBase::getBuffAddr
virtual U8 * getBuffAddr(void)=0
gets buffer address for data filling
NATIVE_INT_TYPE
int NATIVE_INT_TYPE
native integer type declaration
Definition: BasicTypes.hpp:27
Os::Task::TASK_OK
@ TASK_OK
message sent/received okay
Definition: Task.hpp:20
Os::TaskString
Definition: TaskString.hpp:10
Os::Task::TaskStatus
TaskStatus
Definition: Task.hpp:19
Svc::UdpReceiverComponentImpl::~UdpReceiverComponentImpl
~UdpReceiverComponentImpl()
Definition: UdpReceiverComponentImpl.cpp:58
Svc::UdpReceiverComponentImpl::open
void open(const char *port)
Open the connection.
Definition: UdpReceiverComponentImpl.cpp:65
Fw::SerializeBufferBase::getBuffLength
NATIVE_UINT_TYPE getBuffLength() const
returns current buffer size
Definition: Serializable.cpp:594
Svc::UdpReceiverComponentImpl::startThread
void startThread(NATIVE_UINT_TYPE priority, NATIVE_UINT_TYPE stackSize, NATIVE_UINT_TYPE affinity)
start worker thread
Definition: UdpReceiverComponentImpl.cpp:96
TaskString.hpp
FW_ASSERT
#define FW_ASSERT(...)
Definition: Assert.hpp:9
UdpReceiverComponentImpl.hpp
Svc::UdpReceiverComponentImpl::UdpReceiverComponentImpl
UdpReceiverComponentImpl(const char *const compName)
Definition: UdpReceiverComponentImpl.cpp:35
Os::Task::start
TaskStatus start(const Fw::StringBase &name, taskRoutine routine, void *arg, NATIVE_UINT_TYPE priority=TASK_DEFAULT, NATIVE_UINT_TYPE stackSize=TASK_DEFAULT, NATIVE_UINT_TYPE cpuAffinity=TASK_DEFAULT, NATIVE_UINT_TYPE identifier=TASK_DEFAULT)
start the task
Definition: Task.cpp:18
Svc
Definition: ActiveRateGroupCfg.hpp:18
DEBUG_PRINT
#define DEBUG_PRINT(x,...)
Definition: UdpReceiverComponentImpl.cpp:26
Fw::FW_SERIALIZE_OK
@ FW_SERIALIZE_OK
Serialization/Deserialization operation was successful.
Definition: Serializable.hpp:15
Fw
Definition: SerIds.hpp:20