Source code for fprime_gds.common.decoders.ch_decoder

"""
@brief Channel Decoder class used to parse binary channel telemetry data

Decoders are responsible for taking in serialized data and parsing it into
objects. Decoders receive serialized data (that had a specific descriptor) from
a distributer that it has been registered to. The distributer will send the
binary data after removing any length and descriptor headers.

Example data that would be sent to a decoder that parses channels:
    +-------------------+---------------------+------------ - - -
    | ID (4 bytes) | Time Tag (11 bytes) | Data....
    +-------------------+---------------------+------------ - - -

@date Created July 11, 2018
@author R. Joseph Paetz

@bug No known bugs
"""

import copy

from fprime.common.models.serialize.time_type import TimeType
from fprime.common.models.serialize.numerical_types import U32Type
from fprime_gds.common.data_types.ch_data import ChData
from fprime_gds.common.decoders.decoder import Decoder


[docs]class ChDecoder(Decoder): """Decoder class for Channel data""" def __init__(self, ch_dict): """ ChDecoder class constructor Args: ch_dict: Channel telemetry dictionary. Channel IDs should be keys and ChTemplate objects should be values Returns: An initialized channel decoder object. """ super().__init__() self.__dict = ch_dict
[docs] def decode_api(self, data): """ Decodes the given data and returns the result. This function allows for non-registered code to call the same decoding code as is used to parse data passed to the data_callback function. Args: data: Binary telemetry channel data to decode Returns: Parsed version of the channel telemetry data in the form of a ChData object or None if the data is not decodable """ ptr = 0 # Decode Ch ID here... id_obj = U32Type() id_obj.deserialize(data, ptr) ptr += id_obj.getSize() ch_id = id_obj.val # Decode time... ch_time = TimeType() ch_time.deserialize(data, ptr) ptr += ch_time.getSize() if ch_id in self.__dict: # Retrieve the template instance for this channel ch_temp = self.__dict[ch_id] val_obj = self.decode_ch_val(data, ptr, ch_temp) return ChData(val_obj, ch_time, ch_temp) else: print("Channel decode error: id %d not in dictionary" % ch_id) return None
[docs] def decode_ch_val(self, val_data, offset, template): """ Decodes the given channel's value from the given data Args: val_data: Data to parse the value out of offset: Location in val_data to start the parsing template: Channel Template object for the channel Returns: The channel's value as an instance of a class derived from the BaseType class. The val_data has been deserialized using this object, and so the channel value can be retrieved from the obj's val field. """ # This line creates a new object of the same type as the template's # type_obj. This allows us to use the new object to deserialize and # store the data value. If we did not do this, the template's object # would be used to deserialize multiple times and channel objects # referencing the template's type object would seem to have their value # changed randomly val_obj = copy.deepcopy(template.get_type_obj()) val_obj.deserialize(val_data, offset) return val_obj