####
# run_deployment.py:
#
# Runs a deployment. Starts a GUI, a TCPServer, and the deployment application.
####
import os
import sys
import platform
import sys
import webbrowser
import fprime_gds.executables.cli
import fprime_gds.executables.utils
[docs]def get_args():
"""
Gets an argument parsers to read the command line and process the arguments. Return
the arguments in their namespace.
:param args: arguments to supply
"""
# Get custom handlers for all executables we are running
arg_handlers = [
fprime_gds.executables.cli.LogDeployParser,
fprime_gds.executables.cli.GdsParser,
fprime_gds.executables.cli.MiddleWareParser,
fprime_gds.executables.cli.BinaryDeployment,
fprime_gds.executables.cli.CommParser,
]
# Parse the arguments, and refine through all handlers
try:
args, parser = fprime_gds.executables.cli.ParserBase.parse_args(
arg_handlers, "Run F prime deployment and GDS"
)
# Special checks
if args.config.get_file_path() is None and args.gui == "wx":
raise ValueError("Must supply --config when using 'wx' GUI.")
# On ValueError print error, help and exit
except ValueError as vexc:
print("[ERROR] {}".format(str(vexc)), file=sys.stderr, end="\n\n")
parser.print_help(sys.stderr)
sys.exit(-1)
return args
[docs]def launch_process(cmd, logfile=None, name=None, env=None, launch_time=5):
"""
Launch a child subprocess. This subprocess will allow the child to run outside of the memory context of Python.
:param cmd: list of command arguments to run by handing to subprocess.
:param logfile: (optional) place to redirect output to for purposes of logging. Default: None, screen.
:param name: (optional) short name for printing messages.
:param env: (optional) environment to run in. Allows for special environment contexts.
:param launch_time: (optional) time to launch the process, before rendering an error.
:return: running process
"""
if name is None:
name = str(cmd)
print(
"[INFO] Ensuring {} is stable for at least {} seconds".format(name, launch_time)
)
try:
return fprime_gds.executables.utils.run_wrapped_application(
cmd, logfile, env, launch_time
)
except fprime_gds.executables.utils.AppWrapperException as awe:
print("[ERROR] {}.".format(str(awe)), file=sys.stderr)
try:
if logfile is not None:
with open(logfile) as file_handle:
for line in file_handle.readlines():
print(" [LOG] {}".format(line.strip()), file=sys.stderr)
except Exception:
pass
raise fprime_gds.executables.utils.AppWrapperException(
"Failed to run {}".format(name)
)
[docs]def launch_tts(tts_port, tts_addr, logs, **_):
"""
Launch the Threaded TCP Server
:param tts_port: port to attach to
:param tts_addr: address to bind to
:param logs: logs output directory
:return: process
"""
# Open log, and prepare to close it cleanly on exit
tts_log = os.path.join(logs, "ThreadedTCP.log")
# Launch the tcp server
tts_cmd = [
"python3",
"-u",
"-m",
"fprime_gds.executables.tcpserver",
"--port",
str(tts_port),
"--host",
str(tts_addr),
]
return launch_process(tts_cmd, logfile=tts_log, name="TCP Server")
[docs]def launch_wx(port, dictionary, connect_address, log_dir, config, **_):
"""
Launch the GDS gui
:param port: port to connect to
:param dictionary: dictionary to look at
:param connect_address: address to connect to
:param log_dir: directory to place logs
:param config: configuration to use
:return: process
"""
gse_args = [
"python3",
"-u",
"-m",
"fprime_gds.wxgui.tools.gds",
"--port",
str(port),
]
if os.path.isfile(dictionary):
gse_args.extend(["-x", dictionary])
elif os.path.isdir(dictionary):
gse_args.extend(["--dictionary", dictionary])
else:
print(
"[ERROR] Dictionary invalid, must be XML or PY dicts: {}".format(
dictionary
),
file=sys.stderr,
)
# For OSX, add in the wx wrapper
if platform.system() == "Darwin":
gse_args.insert(
0,
os.path.join(
os.path.dirname(__file__),
"..",
"..",
"..",
"bin",
"osx",
"wx-wrapper.bash",
),
)
gse_args.extend(
["--addr", connect_address, "-L", log_dir, "--config", config.get_file_path()]
)
return launch_process(gse_args, name="WX GUI")
[docs]def launch_html(tts_port, dictionary, connect_address, logs, **extras):
"""
Launch the flask server and a browser pointed at the HTML page.
:param tts_port: port to connect to
:param dictionary: dictionary to look at
:param connect_address: address to connect to
:param logs: directory to place logs
:return: process
"""
gse_env = os.environ.copy()
gse_env.update(
{
"DICTIONARY": str(dictionary),
"FLASK_APP": "fprime_gds.flask.app",
"TTS_PORT": str(tts_port),
"TTS_ADDR": connect_address,
"LOG_DIR": logs,
"SERVE_LOGS": "YES",
}
)
gse_args = ["python3", "-u", "-m", "flask", "run"]
ret = launch_process(gse_args, name="HTML GUI", env=gse_env, launch_time=2)
if extras["gui"] == "html":
webbrowser.open("http://localhost:5000/", new=0, autoraise=True)
return ret
[docs]def launch_app(app, port, address, logs, **_):
"""
Launch the app
:param app: application to launch
:param port: port to connect to
:param address: address to connect to
:param log_dir: log directory to place files into
:return: process
"""
app_name = os.path.basename(app)
logfile = os.path.join(logs, "{}.log".format(app_name))
app_cmd = [os.path.abspath(app), "-p", str(port), "-a", address]
return launch_process(
app_cmd, name="{} Application".format(app_name), logfile=logfile, launch_time=1
)
[docs]def launch_comm(comm_adapter, tts_port, connect_address, logs, **all_args):
"""
:return:
"""
app_cmd = [
"python3",
"-u",
"-m",
"fprime_gds.executables.comm",
"--tts-addr",
connect_address,
"--tts-port",
str(tts_port),
"-l",
logs,
"--log-directly",
"--comm-adapter",
all_args["adapter"],
]
# Manufacture arguments for the selected adapter
for arg in comm_adapter.get_arguments().keys():
definition = comm_adapter.get_arguments()[arg]
destination = definition["dest"]
app_cmd.append(arg[0])
app_cmd.append(str(all_args[destination]))
return launch_process(
app_cmd,
name="{} Application".format("comm[{}]".format(all_args["adapter"])),
launch_time=1,
)
[docs]def main():
"""
Main function used to launch processes.
"""
args = vars(get_args())
# Launch a gui, if specified
args["connect_address"] = (
args["tts_addr"] if args["tts_addr"] != "0.0.0.0" else "127.0.0.1"
)
# List of things to launch, in order.
launchers = [launch_tts, launch_comm]
# Add app, if possible
if args.get("app", None) is not None and args.get("adapter", "") == "ip":
launchers.append(launch_app)
elif args.get("app", None) is not None:
print("[WARNING] App cannot be auto-launched without IP adapter")
# Launch the desired GUI package
gui = args.get("gui", "none")
if gui == "wx":
launchers.append(launch_wx)
elif gui == "html" or gui == "none":
launchers.append(launch_html)
# elif gui == "none":
# print("[WARNING] No GUI specified, running headless", file=sys.stderr)
else:
raise Exception("Invalid GUI specified: {}".format(args["gui"]))
# Launch launchers and wait for the last app to finish
try:
procs = []
for launcher in launchers:
procs.append(launcher(**args))
print("[INFO] F prime is now running. CTRL-C to shutdown all components.")
procs[-1].wait()
except KeyboardInterrupt:
print("[INFO] CTRL-C received. Exiting.")
except Exception as exc:
print(
"[INFO] Shutting down F prime due to error. {}".format(str(exc)),
file=sys.stderr,
)
return 1
# Processes are killed atexit
return 0
if __name__ == "__main__":
sys.exit(main())