F´ Flight Software - C/C++ Documentation  NASA-v2.0.1
A framework for building embedded system applications to NASA flight quality standards.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
UdpReceiverComponentImpl.cpp
Go to the documentation of this file.
1 // ======================================================================
2 // \title UdpReceiverImpl.cpp
3 // \author tcanham
4 // \brief cpp file for UdpReceiver component implementation class
5 //
6 // \copyright
7 // Copyright 2009-2015, by the California Institute of Technology.
8 // ALL RIGHTS RESERVED. United States Government Sponsorship
9 // acknowledged.
10 //
11 // ======================================================================
12 
13 
15 #include "Fw/Types/BasicTypes.hpp"
16 #include <sys/types.h>
17 #include <string.h>
18 #include <errno.h>
19 #include <stdlib.h>
20 #include <unistd.h>
21 #include <sys/socket.h>
22 #include <arpa/inet.h>
24 
25 //#define DEBUG_PRINT(x,...) printf(x,##__VA_ARGS__)
26 #define DEBUG_PRINT(x,...)
27 
28 namespace Svc {
29 
30  // ----------------------------------------------------------------------
31  // Construction, initialization, and destruction
32  // ----------------------------------------------------------------------
33 
36  const char *const compName
37  ) : UdpReceiverComponentBase(compName),
38  m_fd(-1),
39  m_packetsReceived(0),
40  m_bytesReceived(0),
41  m_packetsDropped(0),
42  m_decodeErrors(0),
43  m_firstSeq(true),
44  m_currSeq(0)
45  {
46 
47  }
48 
51  const NATIVE_INT_TYPE instance
52  )
53  {
54  UdpReceiverComponentBase::init(instance);
55  }
56 
59  {
60  if (this->m_fd != -1) {
61  close(this->m_fd);
62  }
63  }
64 
66  const char* port
67  ) {
68 
69  //create a UDP socket
70  this->m_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
71  if (-1 == this->m_fd) {
72  Fw::LogStringArg arg(strerror(errno));
73  this->log_WARNING_HI_UR_SocketError(arg);
74  }
75 
76  sockaddr_in saddr;
77  // zero out the structure
78  memset((char *) &saddr, 0, sizeof(saddr));
79 
80  saddr.sin_family = AF_INET;
81  saddr.sin_port = htons(atoi(port));
82  saddr.sin_addr.s_addr = htonl(INADDR_ANY);
83 
84  //bind socket to port
85  NATIVE_INT_TYPE status = bind(this->m_fd , (struct sockaddr*)&saddr, sizeof(saddr));
86  if (-1 == status) {
87  Fw::LogStringArg arg(strerror(errno));
88  this->log_WARNING_HI_UR_BindError(arg);
89  close(this->m_fd);
90  this->m_fd = -1;
91  } else {
92  this->log_ACTIVITY_HI_UR_PortOpened(atoi(port));
93  }
94  }
95 
97  NATIVE_UINT_TYPE priority,
98  NATIVE_UINT_TYPE stackSize,
99  NATIVE_UINT_TYPE affinity
100  ) {
101  Fw::EightyCharString name(this->getObjName());
102  Os::Task::TaskStatus stat = this->m_socketTask.start(
103  name,
104  0,
105  priority,
106  stackSize,
107  UdpReceiverComponentImpl::workerTask,
108  this,
109  affinity);
110  FW_ASSERT(Os::Task::TASK_OK == stat,stat);
111  }
112 
113 
114  // ----------------------------------------------------------------------
115  // Handler implementations for user-defined typed input ports
116  // ----------------------------------------------------------------------
117 
118  void UdpReceiverComponentImpl ::
119  Sched_handler(
120  const NATIVE_INT_TYPE portNum,
121  NATIVE_UINT_TYPE context
122  )
123  {
124  this->tlmWrite_UR_BytesReceived(this->m_bytesReceived);
125  this->tlmWrite_UR_PacketsReceived(this->m_packetsReceived);
126  this->tlmWrite_UR_PacketsDropped(this->m_packetsDropped);
127  }
128 
129  void UdpReceiverComponentImpl::workerTask(void* ptr) {
130  UdpReceiverComponentImpl *compPtr = static_cast<UdpReceiverComponentImpl*>(ptr);
131  while (1) {
132  compPtr->doRecv();
133  }
134  }
135 
136  void UdpReceiverComponentImpl::doRecv(void) {
137 
138  // wait for data from the socket
139  NATIVE_INT_TYPE psize = recvfrom(
140  this->m_fd,
141  this->m_recvBuff.getBuffAddr(),
142  this->m_recvBuff.getBuffCapacity(),
143  MSG_WAITALL,
144  0,
145  0);
146  if (-1 == psize) {
147  if (errno != EINTR) {
148  Fw::LogStringArg arg(strerror(errno));
149  this->log_WARNING_HI_UR_RecvError(arg);
150  }
151  return;
152  }
153  // reset buffer for deserialization
154  Fw::SerializeStatus stat = this->m_recvBuff.setBuffLen(psize);
155  FW_ASSERT(Fw::FW_SERIALIZE_OK == stat, stat);
156 
157  // get sequence number
158  U8 seqNum;
159  stat = this->m_recvBuff.deserialize(seqNum);
160  // check for deserialization error or port number too high
161  if (stat != Fw::FW_SERIALIZE_OK) {
162  this->log_WARNING_HI_UR_DecodeError(DECODE_SEQ,stat);
163  this->m_decodeErrors++;
164  return;
165  }
166 
167  // track sequence number
168  if (this->m_firstSeq) {
169  // first time, set tracked sequence number equal to the one received
170  this->m_currSeq = seqNum;
171  this->m_firstSeq = false;
172  } else {
173  // make sure sequence number has gone up by one
174  if (seqNum != ++this->m_currSeq) {
175  // will only be right if it rolls over only once, but better than nothing
176  U8 diff = seqNum - this->m_currSeq;
177  this->m_packetsDropped += diff;
178  // send EVR
179  this->log_WARNING_HI_UR_DroppedPacket(diff);
180  // reset to current sequence
181  this->m_currSeq = seqNum;
182  }
183  }
184 
185  // get port number
186  U8 portNum;
187  stat = this->m_recvBuff.deserialize(portNum);
188  // check for deserialization error or port number too high
189  if (stat != Fw::FW_SERIALIZE_OK or portNum > this->getNum_PortsOut_OutputPorts()) {
190  this->log_WARNING_HI_UR_DecodeError(DECODE_PORT,stat);
191  this->m_decodeErrors++;
192  return;
193  }
194  // get buffer for port
195 
196  stat = this->m_recvBuff.deserialize(this->m_portBuff);
197  if (stat != Fw::FW_SERIALIZE_OK) {
198  this->log_WARNING_HI_UR_DecodeError(DECODE_BUFFER,stat);
199  this->m_decodeErrors++;
200  return;
201  }
202 
203  // call output port
204  DEBUG_PRINT("Calling port %d with %d bytes.\n",portNum,this->m_portBuff.getBuffLength());
205  if (this->isConnected_PortsOut_OutputPort(portNum)) {
206 
207  Fw::SerializeStatus stat = this->PortsOut_out(portNum,this->m_portBuff);
208 
209  // If had issues deserializing the data, then report it:
210  if (stat != Fw::FW_SERIALIZE_OK) {
211 
212  this->log_WARNING_HI_UR_DecodeError(PORT_SEND,stat);
213  this->m_decodeErrors++;
214  }
215 
216  this->m_packetsReceived++;
217  this->m_bytesReceived += psize;
218 
219  }
220  }
221 
222 #ifdef BUILD_UT
223  void UdpReceiverComponentImpl::UdpSerialBuffer::operator=(const UdpReceiverComponentImpl::UdpSerialBuffer& other) {
224  this->resetSer();
225  this->serialize(other.getBuffAddr(),other.getBuffLength(),true);
226  }
227 
228  UdpReceiverComponentImpl::UdpSerialBuffer::UdpSerialBuffer(
229  const Fw::SerializeBufferBase& other) : Fw::SerializeBufferBase() {
230  FW_ASSERT(sizeof(this->m_buff)>= other.getBuffLength(),sizeof(this->m_buff),other.getBuffLength());
231  memcpy(this->m_buff,other.getBuffAddr(),other.getBuffLength());
232  this->setBuffLen(other.getBuffLength());
233  }
234 
235  UdpReceiverComponentImpl::UdpSerialBuffer::UdpSerialBuffer(
236  const UdpReceiverComponentImpl::UdpSerialBuffer& other) : Fw::SerializeBufferBase() {
237  FW_ASSERT(sizeof(this->m_buff)>= other.getBuffLength(),sizeof(this->m_buff),other.getBuffLength());
238  memcpy(this->m_buff,other.m_buff,other.getBuffLength());
239  this->setBuffLen(other.getBuffLength());
240  }
241 
242  UdpReceiverComponentImpl::UdpSerialBuffer::UdpSerialBuffer(): Fw::SerializeBufferBase() {
243 
244  }
245 
246 #endif
247 
248 
249 
250 } // end namespace Svc
Fw::SerializeBufferBase
Definition: Serializable.hpp:43
Fw::SerializeStatus
SerializeStatus
forward declaration for string
Definition: Serializable.hpp:14
Fw::LogStringArg
Definition: LogString.hpp:11
U8
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.hpp:76
Fw::EightyCharString
Definition: EightyCharString.hpp:10
Svc::UdpReceiverComponentImpl::init
void init(const NATIVE_INT_TYPE instance=0)
Definition: UdpReceiverComponentImpl.cpp:50
Fw::FW_SERIALIZE_OK
@ FW_SERIALIZE_OK
Serialization/Deserialization operation was successful.
Definition: Serializable.hpp:15
Fw::SerializeBufferBase::getBuffAddr
virtual U8 * getBuffAddr(void)=0
gets buffer address for data filling
EightyCharString.hpp
Os::Task::TASK_OK
@ TASK_OK
message sent/received okay
Definition: Task.hpp:19
Os::Task::TaskStatus
TaskStatus
Definition: Task.hpp:18
NATIVE_UINT_TYPE
unsigned int NATIVE_UINT_TYPE
native unsigned integer type declaration
Definition: BasicTypes.hpp:30
Svc::UdpReceiverComponentImpl::~UdpReceiverComponentImpl
~UdpReceiverComponentImpl(void)
Definition: UdpReceiverComponentImpl.cpp:58
FW_ASSERT
#define FW_ASSERT(...)
Definition: Assert.hpp:9
Svc::UdpReceiverComponentImpl::open
void open(const char *port)
Open the connection.
Definition: UdpReceiverComponentImpl.cpp:65
Fw::SerializeBufferBase::getBuffLength
NATIVE_UINT_TYPE getBuffLength() const
returns current buffer size
Definition: Serializable.cpp:587
Svc::UdpReceiverComponentImpl::startThread
void startThread(NATIVE_UINT_TYPE priority, NATIVE_UINT_TYPE stackSize, NATIVE_UINT_TYPE affinity)
start worker thread
Definition: UdpReceiverComponentImpl.cpp:96
UdpReceiverComponentImpl.hpp
Svc::UdpReceiverComponentImpl::UdpReceiverComponentImpl
UdpReceiverComponentImpl(const char *const compName)
Definition: UdpReceiverComponentImpl.cpp:35
Os::Task::start
TaskStatus start(const Fw::StringBase &name, NATIVE_INT_TYPE identifier, NATIVE_INT_TYPE priority, NATIVE_INT_TYPE stackSize, taskRoutine routine, void *arg, NATIVE_INT_TYPE cpuAffinity=-1)
start the task
Definition: Task.cpp:16
Svc
Definition: ActiveRateGroupImplCfg.hpp:18
DEBUG_PRINT
#define DEBUG_PRINT(x,...)
Definition: UdpReceiverComponentImpl.cpp:26
BasicTypes.hpp
Declares ISF basic types.
NATIVE_INT_TYPE
int NATIVE_INT_TYPE
native integer type declaration
Definition: BasicTypes.hpp:29
Fw
Definition: Buffer.cpp:21