import collections.abc
import datetime
import glob
import logging
import os
import re
import shutil
import string
import sys
import textwrap
import numpy as np
import upsp
from . import io
log = logging.getLogger(__name__)
def _datapoint_processing_config(cfg: dict, step_name: str, run_number: str):
return cfg["processing"][run_number][step_name]
def _camera_filenames(cfg, run_number, exts=[".cine", ".dyn", ".mraw"]):
# Finds individual camera video files for this run number in its "camera_video_dir".
# Valid camera video file extensions are supplied via the "exts" parameter.
# Assumes camera files are named as "<datapoint><camera number>.<extension>"
camera_dir = cfg["datapoints"][run_number]["camera_video_dir"]
candidate_filenames = [
f
for f in glob.glob(os.path.join(camera_dir, "%s*" % run_number))
if os.path.splitext(f)[1] in exts
]
# sort all files into a list per camera number
camera_candidate_filenames = {}
for f in candidate_filenames:
n = _camera_number_from_filename(f)
if n not in camera_candidate_filenames:
camera_candidate_filenames[n] = []
camera_candidate_filenames[n].append(f)
# then, return first file from list for each camera.
# warn if there's more than one element (ambiguous which one to use)
nlst = sorted(camera_candidate_filenames.keys())
if len(nlst) != max(nlst):
log.warning("Missing one or more camera files (found %s)", nlst)
camera_filenames = [""] * len(nlst)
for n, lst in camera_candidate_filenames.items():
if len(lst) > 1:
log.warning(
"More than one file found for %s, camera %02d: %s", run_number, n, lst
)
camera_filenames[n - 1] = lst[0]
return camera_filenames
def _camera_number_from_filename(filename):
# <datapoint><camera number>.<extension>
return int(os.path.splitext(os.path.basename(filename))[0][6:])
def _camera_name(filename):
return "cam%02d" % _camera_number_from_filename(filename)
_ADD_FIELD_EXE = shutil.which("add_field")
_DEFAULT_TASK_EXE_NAME = "task.sh"
_NAS_NCPUS_BY_MODEL = {
"cas_ait": 40,
"sky_ele": 40,
"bro_ele": 28,
"bro": 28,
"has": 24,
"ivy": 20,
"san": 16,
}
[docs]class Error(Exception):
pass
[docs]def ensure_unique(values: list[str], prefixes=None):
# Ensure a list of strings is unique. If not, add prefixes to non-unique elements.
# If prefixes is not specified, the list index (0, 1, 2, ...) is used.
prefixes = list(range(len(values))) if prefixes is None else prefixes
prefixes = [str(s) for s in prefixes]
add_prefix = np.array([False, False, False, False], dtype=bool)
uniq_values, inv_idxs = np.unique(values, return_inverse=True)
for ii in range(len(uniq_values)):
duplicate_check = inv_idxs == ii
if np.count_nonzero(duplicate_check) > 1:
add_prefix[duplicate_check] = True
new_values = np.where(
add_prefix, list(map(''.join, zip(prefixes, values))), values
)
return new_values
[docs]def copy_files_ensure_unique_names(src_filenames, dst_dir, src_prefixes=None):
src_basenames = [os.path.basename(fn) for fn in src_filenames]
dst_basenames = ensure_unique(src_basenames, prefixes=src_prefixes)
for src_fn, dst_bn in zip(src_filenames, dst_basenames):
dst_fn = os.path.join(dst_dir, dst_bn)
shutil.copy(src_fn, dst_fn)
print("Copied:", src_fn, '->', dst_fn)
# Create a processing tree for uPSP raw data.
#
# The processing tree is a hierarchy to contain
# - Launcher scripts to run portions (or all of) the processing steps
# - Logs and diagnostics from processing steps
# - Output data artifacts from processing steps
#
# The create() routine is not itself intended to run
# any "heavyweight" processing, but rather to create the *environment*
# for performing the processing.
#
[docs]def create(
output_dir: os.PathLike,
data_config_filename: os.PathLike,
user_config_filename: os.PathLike,
proc_config_filename: os.PathLike,
plot_config_filename: os.PathLike,
):
dat = io.json_load_or_die(data_config_filename)
usr = io.json_load_or_die(user_config_filename)
swr = io.json_load_or_die(proc_config_filename)
plt = io.json_load_or_die(plot_config_filename)
proc = _resolve_parameter_overlays(swr["processing"], dat["datapoints"])
nas = _resolve_nas_config(usr["nas"])
cfg = {
"datapoints": dat["datapoints"],
"nas": nas,
"root": output_dir,
"processing": proc,
"plotting": plt["plotting"],
"__meta__": {
"datapoints": dat["__meta__"],
"nas": usr["__meta__"],
"processing": swr["__meta__"],
"plotting": plt["__meta__"],
"__date__": datetime.date.today().strftime("%b-%d-%Y"),
},
}
dirname = _create(cfg)
# Write the final cfg state out to an index file
ctx_filename = os.path.join(dirname, "context.json")
io.json_write_or_die(cfg, ctx_filename, indent=2)
# Write the origin JSON files to the config directory
# ... if their basenames aren't unique, MAKE them unique
# (to avoid overwriting eachother. It's possible they have
# the same basename but are located in different folders)
_cfgs = {
"data-": data_config_filename, "user-": user_config_filename,
"proc-": proc_config_filename, "plot-": plot_config_filename
}
copy_files_ensure_unique_names(
list(_cfgs.values()),
_configuration_path(cfg),
src_prefixes=list(_cfgs.keys())
)
def _resolve_nas_config(nas: dict):
# Resolve NAS launch parameters for each pipeline step
#
# - Output is a dictionary containing launch parameters for each pipeline step
# - Input is the dictionary containing config JSON
#
# Applies defaults to each step, then overlays step-specific values.
out = {}
for name in [k for k in nas.keys() if k != "__defaults__"]:
o = {k: v for k, v in nas["__defaults__"].items()}
o.update(nas[name])
out[name] = o
return out
def _resolve_parameter_overlays(processing: dict, datapoints: dict):
# Resolve processing parameters for each datapoint
#
# - Output is a dictionary containing the parameter values for each
# datapoint.
#
# - Inputs are the processing JSON and the datapoint index.
#
# Example showing how to override one parameter for pipeline 'appA' based on
# whether the datapoint grid file matches a given naming convention.
#
# - Processing JSON:
#
# ```json
# {
# "defaults": {"appA": {"pa1": 1, "pa2": 2}, "appB": {"pb1": 3, "pb2": 4}},
# "config1": {"appA": {"pa1": 7}},
# "__overlays__": [
# ("defaults", {".*": ".*"}),
# ("config1", {"grid_file": ".*config1.grid"})
# ]
# }
# ```
#
# - Datapoint index JSON
#
# ```json
# {
# "000000": {"grid_file": "/path/to/config0.grid", ...},
# "111111": {"grid_file": "/path/to/config1.grid", ...}
# }
# ```
#
# - Output context JSON:
#
# ```json
# {
# "000000": {"appA": {"pa1": 1, "pa2": 2}, "appB": {"pb1": 3, "pb2": 4}},
# "111111": {"appA": {"pa1": 7, "pa2": 2}, "appB": {"pb1": 3, "pb2": 4}}
# }
# ```
#
def _match_all_patterns(patterns, dp_name, dp):
# Return True if ALL patterns match
# A pattern matches if:
# - its key matches at least one input name, AND
# - for each key match, its value matches the input value
def _match_pattern(d, pkey, pval):
pattern_key_matched_once = False
for dkey, dval in d.items():
if re.fullmatch(pkey, dkey):
pattern_key_matched_once = True
if re.fullmatch(pval, dval):
return True
if not pattern_key_matched_once:
log.warning(
"%s: Overlay pattern '%s' does not match any input names (%s)",
dp_name,
pkey,
list(d.keys()),
)
return False
matches = [_match_pattern(dp, *item) for item in patterns.items()]
return all(matches)
def _update(d, u):
for k, v in u.items():
if isinstance(v, collections.abc.Mapping):
d[k] = _update(d.get(k, {}), v)
else:
d[k] = v
return d
def _validate_overlay_syntax(overlays):
def _validate_re(s):
try:
re.compile(s)
except re.error as e:
err = "In __overlays__: invalid regular expression '%s' (%s)" % (
s,
str(e),
)
raise Error(err)
for ov in overlays:
for pkey, pval in ov[1].items():
_validate_re(pkey)
_validate_re(pval)
param_sets = {k: v for k, v in processing.items() if k != "__overlays__"}
overlays = processing.get("__overlays__", {})
proc = {k: {} for k in datapoints.keys()}
_validate_overlay_syntax(overlays)
for ov in overlays:
overlay_matched_once = False
for dp_name, dp in datapoints.items():
name, patterns = ov
if _match_all_patterns(patterns, dp_name, dp):
overlay_matched_once = True
proc[dp_name] = _update(proc[dp_name], param_sets[name])
if not overlay_matched_once:
log.warning("Overlay '%s' matched no datapoints, will be a no-op", ov)
return proc
def _launcher_env_sh(cfg: dict):
"""Returns string containing sh environment setup commands
- Load any required system libraries that aren't available
by default... for example, system-provided MPI libraries.
- Prefix the PATH with the directory of our current python
interpreter. Ensures any scripts keying off "/usr/bin/env python"
resolve the correct interpreter at runtime.
"""
env_sh_lines = [
"source /usr/local/lib/global.profile",
"module purge",
"module load mpi-hpe/mpt.2.25",
"export PATH=%s:$PATH" % (os.path.dirname(shutil.which("python")))
]
return "\n".join(env_sh_lines)
def _create_qsub_step_launcher(cfg: dict):
filename = _launchers_path(cfg, "qsub-step")
env_sh = _launcher_env_sh(cfg)
exe_sh = textwrap.dedent(
f"""
SCRIPT_DIR=$( cd -- "$( dirname -- "${{BASH_SOURCE[0]}}" )" &> /dev/null && pwd )
STEP_LAUNCHER_FILENAME=$(realpath $1)
STEP_LAUNCHER_BASENAME=$(basename $STEP_LAUNCHER_FILENAME)
STEP_NAME="$(echo ${{STEP_LAUNCHER_BASENAME}} | cut -d+ -f2)"
D=$SCRIPT_DIR/../04_processing/01_exec/$STEP_NAME
ALL_DATAPOINTS=($(ls -1 $D))
INP_DATAPOINTS=("${{@:2}}")
SEL_DATAPOINTS=()
if (( ${{#INP_DATAPOINTS[@]}} )); then
SEL_DATAPOINTS+=( "${{INP_DATAPOINTS[@]}}" )
else
SEL_DATAPOINTS+=( "${{ALL_DATAPOINTS[@]}}" )
fi
readarray -t UPSP_QSUB_ARGS_OUT < \
<({shutil.which("upsp-qsub-args")} "$SCRIPT_DIR/.." $STEP_NAME ${{SEL_DATAPOINTS[@]}})
NLINES="${{#UPSP_QSUB_ARGS_OUT[@]}}"
IDX=0
while [ $IDX -lt $NLINES ]; do
THIS_QSUB_ARGS=${{UPSP_QSUB_ARGS_OUT[$IDX]}}
IDX=$(( $IDX + 1 ))
THIS_DATAPOINTS=${{UPSP_QSUB_ARGS_OUT[$IDX]}}
IDX=$(( $IDX + 1 ))
CMD="$STEP_LAUNCHER_FILENAME $THIS_DATAPOINTS"
EX="qsub $THIS_QSUB_ARGS -- $CMD"
echo "$EX"
$EX
done
"""
)
filename = _launchers_path(cfg, "qsub-step")
_create_launcher(filename, pbs_sh="", env_sh=env_sh, exe_sh=exe_sh)
def _create_step_launcher(cfg: dict, step_name: str, exe_name=""):
if exe_name:
exe_alias = os.path.splitext(exe_name)[0].replace("run-", "")
filename = _launchers_path(cfg, "step+%s+%s" % (step_name, exe_alias))
else:
exe_name = "task.sh"
filename = _launchers_path(cfg, "step+%s" % step_name)
launcher_type = cfg["nas"][step_name]["launcher"]
TEMPLATE_FILE = os.path.join(
os.path.dirname(__file__),
"templates",
"run-step-%s.sh.template" % (launcher_type,),
)
with open(TEMPLATE_FILE, "r") as fp:
ts = fp.read()
outp = string.Template(ts).substitute(
__pipeline_root_dir__=_root_path(cfg),
__pipeline_step_name__=step_name,
__pipeline_step_exe_name__=exe_name,
__pbs_sh__="",
)
with open(filename, "w") as fp:
fp.write(outp)
os.chmod(filename, io._CHMOD_URWXGR_XO__)
def _create_launcher(filename, pbs_sh="", env_sh="", exe_sh=""):
TEMPLATE_FILE = os.path.join(
os.path.dirname(__file__), "templates", "launcher.sh.template"
)
with open(TEMPLATE_FILE, "r") as fp:
ts = fp.read()
outp = string.Template(ts).substitute(
__pbs_sh__=pbs_sh, __env_sh__=env_sh, __exe_sh__=exe_sh
)
with open(filename, "w") as fp:
fp.write(outp)
os.chmod(filename, io._CHMOD_URWXGR_XO__)
def _create_plotting_config_file(cfg: dict, name: str):
_create_dir_and_log(_app_cache_path(cfg, name))
filename = _app_cache_path(cfg, name, name + ".json")
subcfg = cfg["plotting"][name]
io.json_write_or_die(subcfg, filename, indent=2)
return filename
def _create_add_field_file(cfg: dict, step_name: str, run_number: str, exe_name: str):
# post-processing step using 'add_field' executable, to add flat file data
# into the partially-complete H5 file. Should run AFTER psp_process finishes.
# This creates a shell script that wraps said call to 'add_field' and writes
# useful debug info.
env_sh = _launcher_env_sh(cfg)
TEMPLATE_FILE = os.path.join(
os.path.dirname(__file__), "templates", "add-field.sh.template"
)
with open(TEMPLATE_FILE, "r") as fp:
ts = fp.read()
exe_sh = string.Template(ts).substitute(
add_field_exe=_ADD_FIELD_EXE,
datapoint_logs_dir=_app_logs_path(cfg, step_name, run_number),
datapoint_output_dir=_app_products_path(cfg, step_name, run_number),
)
add_field_filename = _app_exec_path(cfg, step_name, run_number, exe_name)
_create_launcher(add_field_filename, env_sh=env_sh, exe_sh=exe_sh)
return add_field_filename
def _create_pbs_file(cfg: dict, step_name: str, run_number: str, exe_name: str):
"""Generate the job script for a single datapoint
Also create directories needed by the job at runtime
"""
this_run = cfg["datapoints"][run_number]
pcfg = _datapoint_processing_config(cfg, step_name, run_number)
nas = cfg["nas"][step_name]
ncpus = _NAS_NCPUS_BY_MODEL[nas["node_model"]]
run_name = _run_name(cfg, run_number)
exe_prefix = os.path.splitext(exe_name)[0]
stdout = _app_logs_path(cfg, step_name, run_number, exe_prefix + ".stdout")
stderr = _app_logs_path(cfg, step_name, run_number, exe_prefix + ".stderr")
pbs_sh_rows = [
"#PBS -N %s" % (run_name),
"#PBS -q %s" % (nas["queue"]),
"#PBS -W group_list=%s" % (nas["charge_group"]),
"#PBS -l select=%s:model=%s" % (nas["number_nodes"], nas["node_model"]),
"#PBS -l walltime=%s" % (nas["wall_time"]),
"#PBS -o %s" % stdout,
"#PBS -e %s" % stderr,
"export MPI_DSM_DISTRIBUTE=off",
"export OMP_STACKSIZE=250M",
"export OMP_NUM_THREADS=%s" % (ncpus - 4),
]
pbs_sh = "\n".join(pbs_sh_rows)
env_sh = _launcher_env_sh(cfg)
h5_filename = _app_products_path(
cfg, step_name, run_number, "pressure_transpose.h5"
)
log_filename = _app_logs_path(cfg, step_name, run_number, "psp-process.out")
exe_rows = [
"# By default, core dumps are written out to the current working",
"# directory of the process. We want these core dumps to be written",
"# to the process logs directory.",
"cd %s" % _app_logs_path(cfg, step_name, run_number),
"mpiexec %s \\" % (shutil.which("psp_process"),),
" -cutoff_x_max=%s \\" % (pcfg["cutoff_x_max"]),
" -input_file=%s \\" % (_inp_filename(cfg, step_name, run_number)),
" -h5_out=%s \\" % (h5_filename),
" -paint_cal=%s \\" % (this_run["paint_calibration_file"]),
" -steady_p3d=%s \\" % (this_run["steady_psp_file"]),
" -frames=%s \\" % (pcfg["number_frames"]),
" -code_version=%s \\" % (upsp.__version__),
" > %s 2>&1\n" % (log_filename),
]
exe_sh = "\n".join(exe_rows)
filename = _app_exec_path(cfg, step_name, run_number, exe_name)
_create_launcher(filename, pbs_sh=pbs_sh, env_sh=env_sh, exe_sh=exe_sh)
_create_dir_and_log(os.path.dirname(stdout))
_create_dir_and_log(os.path.dirname(stderr))
_create_dir_and_log(os.path.dirname(h5_filename))
return filename
def _create_input_file(
cfg: dict, step_name: str, run_number: str, external_calibration_info: dict
):
"""Generate input deck file for run number"""
this_run = cfg["datapoints"][run_number]
pcfg = _datapoint_processing_config(cfg, step_name, run_number)
camera_files = _camera_filenames(cfg, run_number)
wind_tunnel_str = "ames_unitary"
frame_rate = 10000
fstop = 2.8
input_rows = [
"%%Version %s" % upsp.__version__,
"%%Date_Created: %s" % cfg["__meta__"]["__date__"],
"",
"@general",
"\ttest = %s" % _test_name(cfg),
"\trun = %s" % run_number[0:4],
"\tsequence = %s" % run_number[4:6],
"\ttunnel = %s" % wind_tunnel_str,
"\tframerate = %d" % frame_rate,
"\tfstop = %4.1f" % fstop,
"@all",
"\tgrid = %s" % this_run["grid_file"],
"\tsds = %s" % this_run["wtd_file"],
"\ttargets = %s" % this_run["targets_file"],
"\tnormals = %s" % this_run.get("normals_file", ""),
]
for idx, filename in enumerate(camera_files):
calibration_filename = external_calibration_info["calibration_filename"][idx]
input_rows += [
"@camera",
"\tnumber = %d" % (idx + 1,),
"\tcine = %s" % filename,
"\tcalibration = %s" % calibration_filename,
]
input_rows += [
"@options",
"\ttarget_patcher = %s" % (pcfg["target_patcher"]),
"\tregistration = %s" % (pcfg["registration"]),
"\tfilter = %s" % (pcfg["filter"]),
"\tfilter_size = %s" % (pcfg["filter_size"]),
"\toblique_angle = %s" % (pcfg["oblique_angle"]),
"\tnumber_frames = -1", # number_frames will be set in PBS file cmdline args
]
input_rows += [
"@output",
"\tdir = %s" % (_app_products_path(cfg, step_name, run_number)),
"\tname = %s" % (run_number),
]
input_str = "\n".join(input_rows)
filename = _inp_filename(cfg, step_name, run_number)
_create_dir_and_log(os.path.dirname(filename))
with open(filename, "w") as fid:
fid.write(input_str)
os.chmod(filename, io._CHMOD_URW_GR__O__)
def _test_name(cfg: dict):
return cfg["__meta__"]["datapoints"]["test_name"]
def _configuration_name(cfg: dict):
# The overall processing configuration name is a combination of:
# - The alias for configurable input files for each datapoint
# (e.g., the steady-state pressure input files, or the model grid files)
# (input files that are NOT configurable are, e.g., raw cine files)
# - The alias for the uPSP processing software parameter values
alias_input_files = cfg["__meta__"]["datapoints"]["config_name"]
alias_processing_params = cfg["__meta__"]["processing"]["name"]
if alias_input_files == alias_processing_params:
return alias_input_files
else:
return "+".join([alias_input_files, alias_processing_params])
def _version_configuration_name(cfg: dict):
# Combine software version and configuration name into a single string.
return "+".join([upsp.__version__, _configuration_name(cfg)])
def _root_path(cfg: dict, *args):
return os.path.join(
cfg["root"], _test_name(cfg), _version_configuration_name(cfg), *args
)
def _configuration_path(cfg: dict, *args):
return _root_path(cfg, "02_configuration", *args)
def _launchers_path(cfg: dict, *args):
return _root_path(cfg, "03_launchers", *args)
def _processing_path(cfg: dict, *args):
return _root_path(cfg, "04_processing", *args)
def _products_path(cfg: dict, *args):
return _root_path(cfg, "05_products", *args)
def _processing_exec_path(cfg: dict, *args):
return _processing_path(cfg, "01_exec", *args)
def _processing_logs_path(cfg: dict, *args):
return _processing_path(cfg, "02_logs", *args)
def _app_exec_path(cfg: dict, app_name: str, *args):
return _processing_exec_path(cfg, app_name, *args)
def _app_logs_path(cfg: dict, app_name: str, *args):
return _processing_logs_path(cfg, app_name, *args)
def _app_products_path(cfg: dict, app_name: str, *args):
return _products_path(cfg, "00_data", app_name, *args)
def _app_cache_path(cfg: dict, app_name: str, *args):
return _products_path(cfg, "00_data", "cache", app_name, *args)
def _inp_filename(cfg: dict, step_name: str, run_number: str):
return _app_exec_path(cfg, step_name, run_number, "psp-process.inp")
def _create_dir_and_log(path: str):
if not os.path.exists(path):
os.makedirs(path)
log.info("Created %s", path)
def _create_root_dir(cfg: dict):
root_dir = _root_path(cfg)
if os.path.exists(root_dir):
io.user_confirm_or_exit("'%s' exists, force-overwrite?" % root_dir)
try:
_create_dir_and_log(root_dir)
except PermissionError as err:
log.error(err)
log.error("Could not create output directory.")
log.error("Check user 'data_processing_dir' parameter.")
sys.exit(1)
def _run_name(cfg: dict, run_number: str):
return "-".join([run_number, _version_configuration_name(cfg)])
def _create_task_render_images(cfg: dict, step_name: str, run_number: str):
# - For each datapoint, step to render views of model using Tecplot
# (uses NAS tecplot license)
cfg["plotting"]["render-images"]["runtime"] = {
"table_generator_exe": "xyz_scalar_to_tbl",
"images_dir": _app_products_path(cfg, "images"),
"cache_dir": _app_cache_path(cfg, "tecplot"),
"root_dir": _root_path(cfg),
}
# TODO This file will be overwritten every time this is run per-datapoint.
# Inefficient but not a huge deal.
cfg_filename = _create_plotting_config_file(cfg, "render-images")
exe_sh = "%s render-images %s %s" % (
shutil.which("upsp-plotting"), cfg_filename, run_number
)
env_sh = _launcher_env_sh(cfg)
exe_filename = _app_exec_path(cfg, step_name, run_number, _DEFAULT_TASK_EXE_NAME)
_create_dir_and_log(_app_logs_path(cfg, step_name, run_number))
_create_dir_and_log(_app_exec_path(cfg, step_name, run_number))
_create_launcher(exe_filename, exe_sh=exe_sh, env_sh=env_sh)
return None, [_DEFAULT_TASK_EXE_NAME]
def _create_task_extract_first_frame(cfg: dict, step_name: str, run_number: str):
# - For each datapoint, step to dump first frame from each camera
img_ext = "png"
img_frame_number = 1
def _first_frames_info(cfg: dict, run_number: str):
"""Paths to data products - first frame from each camera video file.
Intermediate data product output by "extract-first-frames"
and used by "external-calibration". Should be kept
in-sync with output naming convention of upsp-extract-frames.
"""
img_dir = _app_products_path(cfg, "first-frame", run_number)
info = {"src_filename": [], "img_prefix": [], "img_filename": []}
for camera_filename in _camera_filenames(cfg, run_number):
camera_name = _camera_name(camera_filename)
prefix = os.path.join(img_dir, camera_name)
filename = "%s.%05d.%s" % (prefix, img_frame_number, img_ext)
info["src_filename"].append(camera_filename)
info["img_prefix"].append(prefix)
info["img_filename"].append(filename)
return info
exe_sh_lines = [
"# By default, core dumps are written out to the current working",
"# directory of the process. We want core dumps to be written",
"# to the process logs directory.",
"cd %s" % _app_logs_path(cfg, step_name, run_number),
]
info = _first_frames_info(cfg, run_number)
for src_filename, img_prefix, img_filename in zip(
info["src_filename"], info["img_prefix"], info["img_filename"]
):
src_name = os.path.basename(src_filename)
log_filename = _app_logs_path(cfg, step_name, run_number, "%s.out" % src_name)
exe_sh_lines.extend(
[
"%s \\" % (shutil.which("upsp-extract-frames"),),
" -input=%s \\" % src_filename,
" -output=%s.%s \\" % (img_prefix, img_ext),
" -start=%d \\" % (img_frame_number),
" -count=1 \\",
" -dumpheader \\", # log additional video file info
" > %s 2>&1\n" % log_filename,
]
)
exe_sh = "\n".join(exe_sh_lines)
env_sh = _launcher_env_sh(cfg)
exe_filename = _app_exec_path(cfg, step_name, run_number, _DEFAULT_TASK_EXE_NAME)
_create_dir_and_log(_app_logs_path(cfg, step_name, run_number))
_create_dir_and_log(_app_exec_path(cfg, step_name, run_number))
for img_filename in info["img_filename"]:
_create_dir_and_log(os.path.dirname(img_filename))
_create_launcher(exe_filename, exe_sh=exe_sh, env_sh=env_sh)
return info, [_DEFAULT_TASK_EXE_NAME]
def _create_task_external_calibration(
cfg: dict, step_name: str, run_number: str, first_frames_info: dict
):
this_run = cfg["datapoints"][run_number]
external_cal_cfg_filename = _app_exec_path(cfg, step_name, run_number, "cfg.json")
exe_sh_lines = []
log_filename = _app_logs_path(cfg, step_name, run_number, "%s.out" % run_number)
out_dir = _app_products_path(cfg, step_name, run_number)
img_lines = [" --img %s \\" % f for f in first_frames_info["img_filename"]]
exe_sh_lines.extend(
[
"# By default, core dumps are written out to the current working",
"# directory of the process. We want core dumps to be written",
"# to the process logs directory.",
"cd %s" % _app_logs_path(cfg, step_name, run_number),
"%s \\" % (shutil.which("upsp-external-calibration"),),
" --tgts %s \\" % this_run["targets_file"],
" --grd %s \\" % this_run["grid_file"],
" --wtd %s \\" % this_run["wtd_file"],
" --cfg %s \\" % external_cal_cfg_filename,
" --cal_dir %s \\" % this_run["camera_tunnel_calibration_dir"],
" --out_dir %s \\" % out_dir,
]
)
exe_sh_lines.extend(img_lines)
exe_sh_lines.extend([" > %s 2>&1\n" % log_filename])
_create_dir_and_log(os.path.dirname(log_filename))
_create_dir_and_log(out_dir)
exe_sh = "\n".join(exe_sh_lines)
env_sh = _launcher_env_sh(cfg)
exe_filename = _app_exec_path(cfg, step_name, run_number, _DEFAULT_TASK_EXE_NAME)
_create_dir_and_log(os.path.dirname(exe_filename))
_create_launcher(exe_filename, exe_sh=exe_sh, env_sh=env_sh)
pcfg = _datapoint_processing_config(cfg, "external-calibration", run_number)
io.json_write_or_die(pcfg, external_cal_cfg_filename, indent=2)
info = {"calibration_filename": []}
for camera_filename in _camera_filenames(cfg, run_number):
camera_name = _camera_name(camera_filename)
filename = os.path.join(out_dir, "%s-to-model.json" % camera_name)
info["calibration_filename"].append(filename)
return info, [_DEFAULT_TASK_EXE_NAME]
# cfg: dict, step_name: str, run_number: str, first_frames_info: dict
def _create_task_psp_process(
cfg: dict, step_name: str, run_number: str, external_calibration_info: dict
):
_create_input_file(cfg, step_name, run_number, external_calibration_info)
_create_pbs_file(cfg, step_name, run_number, "run-psp-process.pbs")
_create_add_field_file(cfg, step_name, run_number, "run-add-field.sh")
return None, ["run-psp-process.pbs", "run-add-field.sh"]
def _create_per_datapoint_step(cfg: dict, step_name: str, fn, inputs=None):
output = {}
for run_number, _ in cfg["datapoints"].items():
args = inputs[run_number][0] if inputs else []
kwargs = inputs[run_number][1] if inputs else {}
output[run_number], exe_names = fn(cfg, step_name, run_number, *args, **kwargs)
for name in exe_names:
if name == _DEFAULT_TASK_EXE_NAME:
_create_step_launcher(cfg, step_name)
else:
_create_step_launcher(cfg, step_name, exe_name=name)
_create_dir_and_log(_app_logs_path(cfg, step_name, "jobs"))
return output
def _create(cfg: dict):
_create_root_dir(cfg)
_create_dir_and_log(_configuration_path(cfg))
_create_dir_and_log(_launchers_path(cfg))
_create_dir_and_log(_processing_path(cfg))
_create_dir_and_log(_products_path(cfg))
first_frame_info = _create_per_datapoint_step(
cfg, "extract-first-frame", _create_task_extract_first_frame
)
external_calibration_info = _create_per_datapoint_step(
cfg,
"external-calibration",
_create_task_external_calibration,
{dp: [[v], {}] for dp, v in first_frame_info.items()},
)
_create_per_datapoint_step(
cfg,
"psp_process",
_create_task_psp_process,
{dp: [[v], {}] for dp, v in external_calibration_info.items()},
)
# TODO: re-enable this task, which is a step to render views of the
# model colored by certain scalars output by psp_process (the
# demo application made use of the NAS-provided TecPlot install,
# but has not been made general enough to include in the batch
# processing autogeneration tools). We will likely want to rewrite
# the step to make use of the PyTecplot interface, which is a
# lightweight Python package that connects to a Tecplot backend
# provided it is running and then provides an API for plotting.
# _create_per_datapoint_step(
# cfg,
# "render-images",
# _create_task_render_images,
# )
_create_qsub_step_launcher(cfg)
return _root_path(cfg)