Source code for fprime_gds.common.files.uplinker

"""
uplinker.py:

Contains the code necessary to uplink files through the F prime ground system to a running deployment. The file uplink
process will read in a file from the OS, and uplink it in chunks. The system throttles chunks of the file requiring a
handshake packet in return before the next chunk is sent out.

@author lestarch
"""
import os
import queue
import threading
import time

import fprime_gds.common.handlers
from fprime_gds.common.data_types.file_data import (
    CancelPacketData,
    DataPacketData,
    EndPacketData,
    StartPacketData,
)
from fprime_gds.common.files.helpers import (
    FileStates,
    Timeout,
    TransmitFile,
    file_to_dict,
)


[docs]class UplinkQueue: """ Handles queuing of files to send to the uplinker. This offloads the work of determining when done, and what to do while waiting. It also owns the thread that starts uplink. This thread watches for the current uplink to finish, and then it starts the next one and returns to a quiescent state. """ def __init__(self, uplinker): """ Constructs the uplink queue with a reference to the object that run the uplink. :param uplinker: uplinker to callback into """ self.running = True self.uplinker = uplinker self.busy = threading.Semaphore() self.queue = queue.Queue() self.__file_store = [] self.__exit = threading.Event() self.__thread = threading.Thread(target=self.run, args=()) self.__thread.start()
[docs] def enqueue(self, filepath, destination): """ Enqueue the file and destination pair onto the queue :param filepath: filepath to upload to the given destination :param destination: destination path to upload the filepath to """ file_obj = TransmitFile(filepath, destination) self.queue.put(file_obj) self.__file_store.append(file_obj)
[docs] def pause(self): """ Pause the uplinker, setting the running flag off, then holding the busy semaphore """ if self.running: self.running = False self.busy.acquire()
[docs] def unpause(self): """ Unpause the uplinker, releasing the busy semaphore and then restoring the running flag """ if not self.running: self.busy.release() self.running = True
[docs] def is_running(self): """ Check if the uplink is running """ return self.running
[docs] def remove(self, source): """ Remove a file by its source from the queue and the permanent uplink list. This will touch each record in the queue one time, as they are pulled off the queue, inspected, and the replaced. :param source: source file to remove """ try: first = None found = self.queue.get_nowait() while found != first: if found.source == source: break elif first is None: first = found self.queue.put_nowait(found) found = self.queue.get_nowait() except queue.Empty: return self.__file_store.remove(found)
[docs] def run(self): """ A thread that will uplink files on after another until all files that have been enqueued are properly processed. To stop this thread, set the exit event. """ while not self.__exit.is_set(): # While running (not paused) continue to get - uplink - wait while self.running: file_obj = self.queue.get() # Exit condition is a None message, which unblocks the above get if file_obj is None or not self.running: self.queue.put(file_obj) break # Once a file is obtained from the queue, aquire the busy lock before starting. This prevents a in-use # or paused queue from uplinking. self.busy.acquire() self.uplinker.start(file_obj) # Wait until the file is finished, then release again waiting for more files self.busy.acquire() self.busy.release() time.sleep(0.010) # Don't busy spin, but yield and sleep
[docs] def current(self): """ Gets a set of current files. This will create a copy, to prevent messing with uplink. :return: copy of current files transformed into a JSONable dictionary """ return file_to_dict(self.__file_store)
[docs] def exit(self): """ Exit event to shutdown the thread """ self.__exit.set() self.running = False self.queue.put( None
) # Force an end to the wait for a file, if an uplink is not in-progress
[docs] def join(self): """ Join with this uplinker """ self.__thread.join()
[docs]class FileUplinker(fprime_gds.common.handlers.DataHandler): """ File uplinking component. Keeps track of the currently uplinking file, and registers as a receiver for the handshake packets that are send back on each packet. """
[docs] CHUNK_SIZE = 256
def __init__(self, file_encoder, chunk=CHUNK_SIZE, timeout=20): """ Constructor to build the file uplinker. """ self.state = FileStates.IDLE self.queue = UplinkQueue(self) self.active = None self.sequence = 0 self.chunk = chunk self.file_encoder = file_encoder self.__destination_dir = "/" self.__expected = [] self.__timeout = Timeout() self.__timeout.setup(self.timeout, timeout)
[docs] def enqueue(self, filepath, destination=None): """ Enqueue files for the upload. This tunnels into the upload queue, which unblocks once files have been enqueued and begins to upload each file sequentially. :param filepath: filepath to upload to the system :param destination: (optional) destination to uplink to. Default: current destination + file's basename """ if destination is None: destination = os.path.join( self.__destination_dir, os.path.basename(filepath) ) self.queue.enqueue(filepath, destination)
[docs] def exit(self): """ Exit this uplinker by killing the thread """ self.queue.exit() self.cancel() self.queue.join()
[docs] def is_running(self): """ Check if the queue is running """ return self.queue.is_running()
[docs] def pause(self): """ Pause uplink by canceling the uplink, and then pausing the uplink queue """ self.cancel() self.queue.pause()
[docs] def unpause(self): """ Unpauses the uplink by unpausing the internal queue """ self.queue.unpause()
[docs] def cancel_remove(self, file): """ Cancel/remove the uplink of the given file. If uplinking, it will cancel the uplink, and if queued it will remove the file from the queue. Unknown files will be ignored. :param file: file to remove from the uplinker """ if self.active is not None and file == self.active.source: self.cancel() else: self.queue.remove(file)
[docs] def current_files(self): """ Returns the current set of files held by the uplink queue. :return: current files as a list in JSONable format """ return self.queue.current()
[docs] def start(self, file_obj): """ Starts a file uplink to a running F prime deployment. This will open of the file, save the file handle, and emit the start file uplink packet. It will also store the state for the file uplink progress. If already uplinking then a FileUplinkerBusyException will be raised. :param file_obj: file object to upload :raises FileUplinkerBusyException: when an upload is already running :raises OSError: when file is inaccessible """ # Prevent multiple uplinks at once if self.state != FileStates.IDLE: raise FileUplinkerBusyException( "Currently uplinking file '{}' cannot start uplinking '{}'".format( self.active.source, file_obj.source ) ) self.state = FileStates.RUNNING self.active = file_obj self.active.open() self.send( StartPacketData( self.get_next_sequence(), self.active.size, self.active.source, self.active.destination,
) )
[docs] def send(self, packet_data): """ A function to send the packet out. Starts timeout and then pushes the packet to the file encoder. :param packet_data: packet data to send that will be pushed to the encoder """ self.__timeout.restart() self.__expected = self.file_encoder.data_callback(packet_data)[8:]
[docs] def data_callback(self, data, sender=None): """ Process incoming handshake data, and if it is the expected handshake, then it uplinks the next packet. In the file. Invalid handshakes are ignore. When finished a returning handshake puts the uplinker into idle state. :param data: data from handshake packet to be verified against that previously sent """ # Ignore handshakes not for us if not self.valid_handshake(data): return # If it is an end-wait or a cancel state, respond without reading next chunk if self.state == FileStates.END_WAIT: self.active.state = ( "FINISHED" if self.active.state != "CANCELED" else "CANCELED" ) self.state = FileStates.IDLE self.queue.busy.release() # Allow the queue to continue self.__timeout.stop() return elif self.state == FileStates.CANCELED: self.send(CancelPacketData(self.get_next_sequence())) self.finish() return # Read next chunk of data. b'' means the file is empty outgoing = self.active.read(self.chunk) if outgoing == b"": self.send( EndPacketData(self.get_next_sequence(), self.active.checksum.value) ) self.finish() else: self.active.checksum.update(outgoing, self.active.seek) self.send( DataPacketData(self.get_next_sequence(), self.active.seek, outgoing) ) self.active.seek += len(outgoing)
[docs] def cancel(self): """ Cancels the currently active file uplink. Will emit a cancel packet to inform the deployment that the file is canceled. This merely sets the state to "canceled" and will handle this later. This implies that in the case of a timeout, cancels may be ignored. """ if self.state == FileStates.RUNNING: self.state = FileStates.CANCELED self.active.state = "CANCELED"
# self.queue.pause()
[docs] def timeout(self): """ Handles timeout o file packet by finishing the upload immediately, and setting the state to timeout """ self.finish(False) self.active.state = "TIMEOUT"
[docs] def finish(self, wait_for_handshake=True): """ Finishes the current file uplink by closing the file, and starting the end processs. If the uplinker should immediately terminate (like during a timeout) then set wait_for_handshake to False. :param wait_for_handshake: (optional) wrap up cleanly by waiting for handshake. Default: True, clean wait """ self.active.close() self.sequence = 0 self.state = FileStates.END_WAIT os.remove(self.active.source) # Immediate termination items if not wait_for_handshake: self.state = FileStates.IDLE self.__expected = [] self.queue.busy.release() self.__timeout.stop()
[docs] def get_next_sequence(self): """ Gets the next sequence number """ tmp = self.sequence self.sequence = self.sequence + 1 return tmp
[docs] def valid_handshake(self, data): """ Check the handshake data and ensure that it is as expected. This will allow us to only handle handshakes that we expected. This will ensure that the handshake data is an exact match of the send data. :param data: data to check against what was transmitted :return: True, if proper handshake, False otherwise """ return data == self.__expected
@property
[docs] def destination_dir(self): """ Get the destination directory that files will be uplinked to, if not currently specified. :return: value of destination """ return self.__destination_dir
@destination_dir.setter def destination_dir(self, destination): """ Set the destination directory that files will be uplinked to. :param destination: new destination """ self.__destination_dir = destination
[docs]class FileUplinkerBusyException(Exception):
""" File uplinker is busy and cannot uplink more files """