Source code for fprime_gds.common.files.downlinker

"""
@brief file writer to write decoded objects into files

This file writer takes in decoded data and writes the data
to the correct log files and destination filse

@date Created August 8, 2019
@author Blake A. Harriman

@bug No known bugs
"""

import logging
import os

import fprime.constants
import fprime_gds.common.handlers
from fprime_gds.common.data_types.file_data import FilePacketType
from fprime_gds.common.files.helpers import FileStates, TransmitFile, file_to_dict

[docs]LOGGER = logging.getLogger("downlink")
LOGGER.setLevel(logging.INFO)
[docs]class FileDownlinker(fprime_gds.common.handlers.DataHandler): """File writer class for decoded packets""" def __init__(self, directory, timeout=20.0, log_dir=None): """ FileWriter class constructor Args: directory: directory in which to write downlinked files timeout: This is used to keep track of how long the program should wait before throwing a timeout exception: Returns: An initialized FileWriter object. """ super().__init__() self.__directory = directory self.__log_dir = log_dir if log_dir is not None else directory self.active = None self.files = [] self.state = FileStates.IDLE self.sequence = 0 # Keeps track of what the current sequence ID should be self.timer = fprime_gds.common.files.helpers.Timeout() self.timer.setup(self.timeout, timeout) os.makedirs(self.__directory, exist_ok=True)
[docs] def data_callback(self, data, sender=None): """ Function called to pass data to the writer class Args: data: Binary data that has been decoded and passed to the correct consumer """ self.timer.restart() # Start or restart the timer packet_type = data.packetType # Check the packet type, and route to the appropriate sub-function if packet_type == FilePacketType.START: self.handle_start(data) elif packet_type == FilePacketType.DATA: self.handle_data(data) elif packet_type == FilePacketType.END: self.handle_end(data) elif packet_type == FilePacketType.CANCEL: self.handle_cancel(data) else: LOGGER.warning("Invalid file detected descriptor detected: %d", packet_type)
[docs] def handle_start(self, data): """ Handle a start packet data type. :param data: data packet that is a start packet """ # Initialize all relevant START packet attributes into variables from file_data size = data.size source_path = data.sourcePath.decode(fprime.constants.DATA_ENCODING) dest_path = data.destPath.decode(fprime.constants.DATA_ENCODING) if self.state != FileStates.IDLE: LOGGER.warning("File transfer already inprogress. Aborting original.") self.finish() # Create the destination file where the DATA packet data will be stored assert self.active is None, "File is already open, something went wrong" self.active = TransmitFile( source_path, os.path.join(self.__directory, self.sanitize(dest_path)), size, self.__log_dir, ) self.active.open("wb+") LOGGER.addHandler(self.active.log_handler) message = "Received START packet with metadata:\n" message += "\tSize: %d\n" message += "\tSource: %s\n" message += "\tDestination: %s" LOGGER.info(message, size, source_path, dest_path) self.files.append(self.active) self.state = FileStates.RUNNING self.sequence += 1
[docs] def handle_data(self, data): """ Handle the data packet. :param data: data packet """ # Initialize all relevant DATA packet attributes into variables from file_data offset = data.offset data_bytes = data.dataVar if self.state != FileStates.RUNNING: LOGGER.warning("Received unexpected data packet for offset: %d", offset) else: if data.seqID != self.sequence: LOGGER.warning( "Data packet has unexpected sequence id. Expected: %d got %d", self.sequence, data.seqID, ) # Write the data information to the file self.active.write(data_bytes, offset) self.active.seek = offset + len(data_bytes) LOGGER.info( "Received DATA packet writing %d bytes to offset %s", len(data_bytes), offset, ) self.sequence += 1
[docs] def handle_cancel(self, _): """ Handle cancel packet. :param data: cancel packet, ignored. :return: """ # CANCEL Packets have no data if self.state != FileStates.RUNNING: LOGGER.warning("Received unexpected cancel packet") else: LOGGER.info("Received CANCEL packet, stopping downlink") self.finish()
[docs] def handle_end(self, data): """ Handle the end packet. :param data: end packet """ # Initialize all relevant END packet attributes into varibles from file_data # hashValue attribute is not relevent right now, but might be in the future if self.state != FileStates.RUNNING: LOGGER.warning("Received unexpected END packet") else: if data.seqID != self.sequence: LOGGER.warning( "End packet has unexpected sequence id. Expected: %d got %d", self.sequence, data.seqID, ) LOGGER.info("Received END packet, finishing downlink") self.finish()
[docs] def timeout(self): """ Timeout the current downlink """ LOGGER.warning("Timeout while downlinking file, aborting") self.finish()
[docs] def finish(self): """ Finish the file downlink """ self.timer.stop() self.state = FileStates.IDLE self.sequence = 0 self.active.state = "FINISHED" LOGGER.removeHandler(self.active.log_handler) self.active.close() self.active = None
[docs] def current_files(self): """ Return the current list of downlinked files""" return file_to_dict(self.files, uplink=False)
@staticmethod
[docs] def sanitize(filename): """ Sannitize the given filename by removing slashes that would make new directories. :param filename: filename to sanitize :return: sanitized filename """ return filename.replace(os.sep, "_")
@property
[docs] def directory(self): return self.__directory