"""
uart.py:
This is the adapter for projects that would choose to use a serial UART interface for sending data from an F prime
deployment. This handles sending and receiving data from the things like 'LinuxSerialDriver' and other standard UART
drivers.
@author lestarch
"""
import logging
import fprime_gds.common.communication.adapters.base
import serial
from serial.tools import list_ports
[docs]LOGGER = logging.getLogger("serial_adapter")
[docs]class SerialAdapter(fprime_gds.common.communication.adapters.base.BaseAdapter):
"""
Supplies a data source adapter that is pulling data off from a UART wire using PySerial. This is setup using a
device handle and a baudrate for the given serial device.
"""
[docs] BAUDS = [
50,
75,
110,
134,
150,
200,
300,
600,
1200,
1800,
2400,
4800,
9600,
19200,
38400,
57600,
115200,
230400,
460800,
500000,
576000,
921600,
1000000,
1152000,
1500000,
2000000,
2500000,
3000000,
3500000,
4000000,
]
def __init__(self, device, baud):
"""
Initialize the serial adapter using the default settings. This does not open the serial port, but sets up all
the internal variables used when opening the device.
"""
self.device = device
self.baud = baud
self.serial = None
[docs] def open(self):
"""
Opens the serial port based on previously supplied settings. If the port is already open, then close it first.
Then open the port up again.
"""
self.close()
self.serial = serial.Serial(self.device, self.baud)
return self.serial is not None
[docs] def close(self):
"""
Close the serial device, and ignore any errors that might arrive when attempting that closure.
"""
try:
if self.serial is not None:
self.serial.close()
finally:
self.serial = None
[docs] def write(self, frame):
"""
Send a given framed bit of data by sending it out the serial interface. It will attempt to reconnect if there is
was a problem previously. This function will return true on success, or false on error.
:param frame: framed data packet to send out
:return: True, when data was sent through the UART. False otherwise.
"""
try:
if self.serial is None:
self.open()
written = self.serial.write(frame)
# Not believed to be possible to not send everything without getting a timeout exception
assert written == len(frame)
return True
except serial.serialutil.SerialException as exc:
LOGGER.warning("Serial exception caught: %s. Reconnecting.", (str(exc)))
self.close()
return False
[docs] def read(self, timeout=0.500):
"""
Read up to a given count in bytes from the UART adapter. This may return less than the full requested size but
is expected to return some data.
:param timeout: timeout for reading data from the serial.
:return: data successfully read
"""
data = b""
try:
if self.serial is None:
self.open()
# Read as much data as possible, while ensuring to block if no data is available at this time. Note: as much
# data is read as possible to avoid a long-return time to this call. Minimum data to read is one byte in
# order to block this function while data is incoming.
self.serial.timeout = timeout
data = self.serial.read(1) # Force a block for at least 1 character
while self.serial.in_waiting:
data += self.serial.read(
self.serial.in_waiting
) # Drain the incoming data queue
except serial.serialutil.SerialException as exc:
LOGGER.warning("Serial exception caught: %s. Reconnecting.", (str(exc)))
self.close()
return data
@classmethod
[docs] def get_arguments(cls):
"""
Returns a dictionary of flag to argparse-argument dictionaries for use with argparse to setup arguments.
:return: dictionary of flag to argparse arguments for use with argparse
"""
available = list(
map(lambda info: info.device, list_ports.comports(include_links=True))
)
default = "/dev/ttyACM0" if not available else available[-1]
return {
("--uart-device",): {
"dest": "device",
"type": str,
"default": default,
"help": "UART device representing the FSW. Default: %(default)s",
},
("--uart-baud",): {
"dest": "baud",
"type": int,
"default": 9600,
"help": "Baud rate of the serial device. Default: %(default)s",
},
}
@classmethod
[docs] def check_arguments(cls, args):
"""
Code that should check arguments of this adapter. If there is a problem with this code, then a "ValueError"
should be raised describing the problem with these arguments.
:param args: arguments as dictionary
"""
ports = map(lambda info: info.device, list_ports.comports(include_links=True))
if not args["device"] in ports:
raise ValueError(
"Serial port '{}' not valid. Available ports: {}".format(
args["device"], ports
)
)
# Note: baud rate may not *always* work. These are a superset
baud = 0
try:
baud = int(args["baud"])
except ValueError:
raise ValueError(
"Serial baud rate '{}' not integer. Use one of: {}".format(
args["baud"], SerialAdapter.BAUDS
)
)
if not int(baud) in SerialAdapter.BAUDS:
raise ValueError(
"Serial baud rate '{}' not supported. Use one of: {}".format(
baud, SerialAdapter.BAUDS
)
)