"""
base.py:
This file specifies the base-adapter for the F prime comm-layer. This class defines the basic methods needed to interact
with various wire formats as they are supported by the F prime comm-layer. This file defines a single abstract base
class representing the core features of the adapter class that must be implemented by every implementation of the
adapter for use with the comm-layer.
@author lestarch
"""
import abc
[docs]class BaseAdapter(abc.ABC):
"""
Base adapter for adapting the communications layer. This essentially breaks down to providing the ability to read
data from, and write to the necessary wire-format. The children of this class must at least implement the 'read' and
'write' functions to ensure that data can be read and written. 'open' and 'close' are also provided as a helper to
the subclass implementor to place resource initialization and release code, however; these implementations are
defaulted not overridden.
"""
"""Null default implementation """
"""Null default implementation """
@abc.abstractmethod
[docs] def read(self, timeout=0.500):
"""
Read from the interface. Must be overridden by the child adapter. Throw no fatal errors, reconnect instead. This
call is expected to block waiting on incoming data.
:param size: maximum size of data to read before breaking
:param timeout: timeout for the block, default: 0.500 (500ms) as blocking w/o timeout may be uninterruptable
:return: byte array of data, or b'' if no data was read
"""
@abc.abstractmethod
[docs] def write(self, frame):
"""
Write to the interface. Must be overridden by the child adapter. Throw no fatal errors, reconnect instead.
:param frame: framed data to uplink
:return: True if data sent through adapter, False otherwise
"""
@classmethod
@abc.abstractmethod
[docs] def get_arguments(cls):
"""
Returns a set of arguments consumed by this adapter. This will be consumed by the CLI layer in order to provide
command line arguments to the user. Note: these should be globally unique args, e.g. --ip-address
:return: dictionary, keys of tuple of arg flags and value of list of other arguments to argparse's add_argument
"""
@classmethod
@abc.abstractmethod
[docs] def check_arguments(cls, args):
"""
Code that should check arguments of this adapter. If there is a problem with this code, then a "ValueError"
should be raised describing the problem with these arguments.
:param args: arguments as dictionary
"""
@classmethod
[docs] def get_adapters(cls):
"""
Get all known adapters of this base class. These must be imported into the comm-layer to be available to the
system, however; they only need to be imported. Once imported this function will find them and make them
available to the comm-layer in the standard way.
:return: list of all imported comm adapters.
"""
adapter_map = {}
for adapter in cls.__subclasses__():
# Get two versions of names
adapter_name = adapter.__module__
adapter_short = adapter_name.split(".")[-1]
# Check to use long or short name
if not adapter_short in adapter_map:
adapter_name = adapter_short
adapter_map[adapter_name] = adapter
return adapter_map
@staticmethod
[docs] def process_arguments(clazz, args):
"""
Process arguments incoming from the command line and construct a dictionary of kwargs to supply to the chosen
adapter at creation time. This will allow the user to choose any imported adapters at runtime.
:param args: arguments to process
:return: dictionary of constructor keyword arguments
"""
kwargs = {}
for value in clazz.get_arguments().values():
kwargs[value["dest"]] = getattr(args, value["dest"])
return kwargs
@classmethod
[docs] def construct_adapter(cls, adapter_name, args):
"""
Constructs a new adapter, from the given adapter name and the given namespace of argument inputs. This is a
wrapper of "get_adapters" and "process_arguments" to help build a new, fully configured, adapter. This is a
factory method.
:param adapter_name: name of the adapter to build
:param args: namespace of arg value to help build an adapter
:return: newly constructed adapter
"""
adapter = cls.get_adapters()[adapter_name]
return adapter(**cls.process_arguments(adapter, args))