Source code for fprime_gds.common.pipeline.encoding

"""
encoding.py:

This file sets up the encoding and decoding handlers for use with the standard pipeline. This encapsulates the encoding
and decoding into a single component that the be composed into the standard pipeline.

@mstarch
"""
import fprime_gds.common.decoders.ch_decoder
import fprime_gds.common.decoders.event_decoder
import fprime_gds.common.decoders.file_decoder
import fprime_gds.common.decoders.pkt_decoder
import fprime_gds.common.encoders.cmd_encoder

# Encoders and Decoders
import fprime_gds.common.encoders.file_encoder


[docs]class EncodingDecoding: """ Sets up and runs the encoding and decoding for the standard pipeline. This include the following encoders and decoders for standard setups: 1. Events decoding 2. Channel decoding 3. File decoding 4. Command encoding 5. File encoding """ def __init__(self): """ Setup the encoders and decoder member variables. """ # Encoder and decoder items self.file_encoder = None self.command_encoder = None self.event_decoder = None self.channel_decoder = None self.file_decoder = None self.packet_decoder = None self.command_subscribers = []
[docs] def setup_coders(self, dictionaries, distributor, sender): """ Sets up the encoder and decoder layer of the GDS pipeline. This requires a dictionary set that has loaded the dictionaries needed for the decoders to work correctly. This will register then register the decoders with a supplied distributor to handle known types with the known decoders. Lastly, the sender will be registered to the encoder to handle the encoded data going out. :param dictionaries: a dictionaries handling object holding dictionaries :param distributor: distributor of data to register to """ # Create encoders and decoders using dictionaries self.file_encoder = fprime_gds.common.encoders.file_encoder.FileEncoder() self.command_encoder = fprime_gds.common.encoders.cmd_encoder.CmdEncoder() self.event_decoder = fprime_gds.common.decoders.event_decoder.EventDecoder( dictionaries.event_id ) self.channel_decoder = fprime_gds.common.decoders.ch_decoder.ChDecoder( dictionaries.channel_id ) self.file_decoder = fprime_gds.common.decoders.file_decoder.FileDecoder() self.packet_decoder = None if dictionaries.packet is not None: self.packet_decoder = fprime_gds.common.decoders.pkt_decoder.PktDecoder( dictionaries.packet, dictionaries.channel_id ) # Register client socket to encoder self.command_encoder.register(sender) self.file_encoder.register(sender) # Register the event and channel decoders to the distributor for their # respective data types distributor.register("FW_PACKET_LOG", self.event_decoder) distributor.register("FW_PACKET_TELEM", self.channel_decoder) distributor.register("FW_PACKET_FILE", self.file_decoder) if self.packet_decoder is not None: distributor.register("FW_PACKET_PACKETIZED_TLM", self.packet_decoder)
[docs] def send_command(self, command): """ Sends a command to the registered command encoder, and further down the stream. Note: this contains a local loopback to any command consumers to ensure that histories and logging are updated. :param command: command object to send """ for loopback in self.command_subscribers: loopback.data_callback(command) self.command_encoder.data_callback(command)
[docs] def register_event_consumer(self, consumer): """ Registers a history with the event decoder. :param consumer: consumer of events """ self.event_decoder.register(consumer)
[docs] def remove_event_consumer(self, consumer): """ Removes a history from the event decoder. Will raise an error if the history was not previously registered. :param consumer: consumer of events :return: a boolean indicating if the consumer was removed. """ return self.event_decoder.deregister(consumer)
[docs] def register_channel_consumer(self, consumer): """ Registers a history with the telemetry decoder. :param consumer: consumer of channels """ self.channel_decoder.register(consumer)
[docs] def remove_channel_consumer(self, consumer): """ Removes a history from the telemetry decoder. Will raise an error if the history was not previously registered. :param consumer: consumer of channels :return: a boolean indicating if the consumer was removed. """ return self.channel_decoder.deregister(consumer)
[docs] def register_command_consumer(self, consumer): """ Registers a history with the standard pipeline. :param consumer: consumer of commands """ self.command_subscribers.append(consumer)
[docs] def remove_command_consumer(self, consumer): """ Removes a history that is subscribed to command data. Will raise an error if the history was not previously registered. :param consumer: consumer of commands :return: a boolean indicating if the consumer was removed. """ try: self.command_subscribers.remove(consumer) return True except ValueError: return False
[docs] def register_packet_consumer(self, consumer): """ Registers a history with the standard pipeline. :param consumer: consumer of packets """ if self.packet_decoder is not None: self.packet_decoder.register(consumer)
[docs] def deregister_packet_consumer(self, consumer): """ Removes a history that is subscribed to command data. Will raise an error if the history was not previously registered. :param consumer: consumer of packets :return: a boolean indicating if the consumer was removed. """ if self.packet_decoder is not None: return self.packet_decoder.deregister(consumer)