"""
@brief Packet Decoder class used to parse binary packetized telemetry data
Decoders are responsible for taking in serialized data and parsing it into
objects. Decoders receive serialized data (that had a specific descriptor) from
a distributer that it has been registered to. The distributer will send the
binary data after removing any length and descriptor headers.
Example data that would be sent to a decoder that parses events or channels:
+-------------------+---------------------+------------ - - -
| ID (2 bytes) | Time Tag (11 bytes) | Data....
+-------------------+---------------------+------------ - - -
@date Created July 12, 2018
@author R. Joseph Paetz
@bug No known bugs
"""
from fprime.common.models.serialize.time_type import TimeType
from fprime.common.models.serialize.numerical_types import U16Type
from fprime_gds.common.data_types.ch_data import ChData
from fprime_gds.common.data_types.pkt_data import PktData
from fprime_gds.common.decoders.ch_decoder import ChDecoder
[docs]class PktDecoder(ChDecoder):
"""Decoder class for Packetized Telemetry data"""
def __init__(self, pkt_name_dict, ch_dict):
"""
Constructor
Args:
pkt_name_dict: (dict) Packet dictionary. Pkt names should be keys
and PktTemplate objects should be values
ch_dict: (dict) Channel dictionary. Channel ids shoudl be keys and
ChTemplate objects should be values
Returns:
An initialized PktDecoder object
"""
super().__init__(ch_dict)
self.__dict = pkt_name_dict
[docs] def decode_api(self, data):
"""
Decodes the given data and returns the result.
This function allows for non-registered code to call the same decoding
code as is used to parse data passed to the data_callback function.
Args:
data: (bytearray) Binary packetized telemetry data to decode
Returns:
Parsed version of the input data in the form of a PktData object
or None if the data is not decodable
"""
ptr = 0
# Decode Pkt ID here...
id_obj = U16Type()
id_obj.deserialize(data, ptr)
ptr += id_obj.getSize()
pkt_id = id_obj.val
# Decode time...
pkt_time = TimeType()
pkt_time.deserialize(data, ptr)
ptr += pkt_time.getSize()
if pkt_id not in self.__dict:
# Don't crash if can't find pkt. Just notify and keep going
print(
"Packet decode error: id %d not in dictionary. Time=%s"
% (pkt_id, pkt_time.to_readable())
)
print("Full pkt = \n")
for i in data:
print("0x%02x" % ord(i))
return None
# Retrieve the template instance for this channel
pkt_temp = self.__dict[pkt_id]
ch_temps = pkt_temp.get_ch_list()
ch_data_objs = []
for ch_temp in ch_temps:
val_obj = self.decode_ch_val(data, ptr, ch_temp)
ptr += val_obj.getSize()
ch_data_objs.append(ChData(val_obj, pkt_time, ch_temp))
return PktData(ch_data_objs, pkt_time, pkt_temp)