Source code for fprime_gds.common.encoders.file_encoder

"""
@brief Encoder for file data

This encoder takes in file objects, serializes them, and sends the results
to all registered senders.

Serialized command format:
    +--------------------------------+          -
    | Header = "A5A5 "               |          |
    | (5 byte string)                |          |
    +--------------------------------+      Added by
    | Destination = "GUI " or "FSW " |       Sender
    | (4 byte string)                |          |
    +--------------------------------+          -
    | Command descriptor             |
    | (0x????????)                   |
    | (4 byte number)                |
    +--------------------------------+
    | Length of descriptor, opcode,  |
    | and argument data              |
    | (4 bytes)                      |
    +--------------------------------+
    | Packet type                    |
    | (4 bytes)                      |
    +--------------------------------+
    | Sequence ID                    |
    | (4 bytes)                      |
    +--------------------------------+
    | Type specific                  |
    +--------------------------------+
    | Type specific                  |
    +--------------------------------+
    | ...                            |
    +--------------------------------+
    | Type specific                  |
    +--------------------------------+

@date Created February 2020-02-05
@author mstarch

@bug No known bugs
"""

import struct

from fprime.common.models.serialize.numerical_types import U32Type
from fprime.constants import DATA_ENCODING
from fprime_gds.common.data_types.file_data import FilePacketType
from fprime_gds.common.utils.data_desc_type import DataDescType

from . import encoder


[docs]class FileEncoder(encoder.Encoder): """ Encodes the file data. This plugs into the uplink system and allows for data to be sent to the spacecraft. """ def __init__(self, config=None): """ Constructs a file encoder. Defaults to FSW as destination.. Args: config (ConfigManager, default=None): Object with configuration data for the sizes of fields in the binary data. If None passed, defaults are used. """ super().__init__(config) self.len_obj = self.config.get_type("msg_len")
[docs] def encode_api(self, data): """ Encodes specific file packets. This will allow the data to be sent out. :param data: FilePacket type to send. :return: encoded bytes data """ out_data = struct.pack(">BI", int(data.packetType.value), data.seqID) # Packet Type determines the variables following the seqID if data.packetType == FilePacketType.START: file_src_enc = data.sourcePath.encode(DATA_ENCODING) file_dst_enc = data.destPath.encode(DATA_ENCODING) out_data += struct.pack(">IB", data.size, len(file_src_enc)) out_data += file_src_enc out_data += struct.pack(">B", len(file_dst_enc)) out_data += file_dst_enc elif data.packetType == FilePacketType.DATA: out_data += struct.pack(">IH", data.offset, data.length) out_data += data.dataVar elif data.packetType == FilePacketType.END: out_data += struct.pack(">I", data.hashValue) elif data.packetType != FilePacketType.CANCEL: raise Exception( "Invalid packet type found while encoding: {}".format(data.packetType) ) descriptor = U32Type(DataDescType["FW_PACKET_FILE"].value).serialize() length_obj = self.config.get_type("msg_len") length_obj.val = len(descriptor) + len(out_data) header = U32Type(0x5A5A5A5A).serialize() + length_obj.serialize() + descriptor return header + out_data