#!/usr/bin/env python3
Module defining the Base PGE interfaces from which all other PGEs are derived.
import html
import json
import os
from collections import OrderedDict
from datetime import datetime
from fnmatch import fnmatch
from functools import lru_cache
from os.path import abspath, basename, exists, join, splitext
import numpy as np
from pkg_resources import resource_filename
import yamale
from yamale import YamaleError
import yaml
import opera
from opera.util.error_codes import ErrorCode
from opera.util.logger import PgeLogger
from opera.util.logger import default_log_file_name
from opera.util.metfile import MetFile
from opera.util.render_jinja2 import (python_type_to_xml_type,
from opera.util.run_utils import create_qa_command_line
from opera.util.run_utils import create_sas_command_line
from opera.util.run_utils import get_checksum
from opera.util.run_utils import time_and_execute
from opera.util.time import get_catalog_metadata_datetime_str
from opera.util.time import get_time_for_filename
from .runconfig import RunConfig
[docs]class PreProcessorMixin:
Mixin class which is responsible for handling all pre-processing steps for
the PGE. The pre-processing phase is defined as all steps necessary prior
to SAS execution.
This class is intended for use as a Mixin for use with the PgeExecutor
class and its inheritors, and as such, this class assumes access to the
instance attributes defined by PgeExecutor.
Inheritors of PreProcessorMixin may provide overloaded implementations
for any of the exiting pre-processing steps, and even provide additional
steps as necessary.
_pre_mixin_name = "PreProcessorMixin"
def _initialize_logger(self):
Creates the logger object used by the PGE.
The logger is created using a default name, as the proper filename
cannot be determined until the RunConfig is parsed and validated.
if not self.logger:
self.logger = PgeLogger()
self.logger.info(self.name, ErrorCode.LOG_FILE_CREATED,
f'New Log file initialized to {self.logger.get_file_name()}')
self.logger.info(self.name, ErrorCode.LOG_FILE_CREATED,
f'Log file passed from pge_main: {self.logger.get_file_name()}')
def _initialize_qa_logger(self):
Creates the qa logger object used by the PGE when QA is enabled.
The logger is created using a default name, as the proper filename
cannot be determined until the RunConfig is parsed and validated.
if self.runconfig.qa_enabled:
self.qa_logger = PgeLogger(
workflow="qa_logger", error_code_base=PgeLogger.QA_LOGGER_CODE_BASE)
self.logger.info(self.name, ErrorCode.LOG_FILE_CREATED,
f'New QA Log file initialized to {self.qa_logger.get_file_name()}')
def _load_runconfig(self):
Loads the RunConfig file provided to the PGE into an in-memory
self.logger.info(self.name, ErrorCode.LOADING_RUN_CONFIG_FILE,
f'Loading RunConfig file {self.runconfig_path}')
self.runconfig = RunConfig(self.runconfig_path)
def _validate_runconfig(self):
Validates the parsed RunConfig against the appropriate schema(s).
If the RunConfig fails validation.
self.logger.info(self.name, ErrorCode.VALIDATING_RUN_CONFIG_FILE,
f'Validating RunConfig file {self.runconfig.filename}')
except (RuntimeError, YamaleError) as error:
# Since we can't rely on directories from RunConfig existing yet,
# dump log to /tmp, where we should have write permissions
self.logger.move(join('/tmp', default_log_file_name()))
error_msg = (f'Validation of RunConfig file {self.runconfig.filename} '
f'failed, reason(s): \n{str(error)}')
self.name, ErrorCode.RUN_CONFIG_VALIDATION_FAILED, error_msg
def _setup_directories(self):
Creates the output/scratch directory locations referenced by the
RunConfig if they don't exist already.
output_product_path = abspath(self.runconfig.output_product_path)
scratch_path = abspath(self.runconfig.scratch_path)
if not exists(output_product_path):
self.logger.info(self.name, ErrorCode.CREATING_WORKING_DIRECTORY,
f'Creating output product directory {output_product_path}')
os.makedirs(output_product_path, exist_ok=True)
if not exists(scratch_path):
self.logger.info(self.name, ErrorCode.CREATING_WORKING_DIRECTORY,
f'Creating scratch directory {scratch_path}')
os.makedirs(scratch_path, exist_ok=True)
self.logger.info(self.name, ErrorCode.DIRECTORY_SETUP_COMPLETE,
'Directory setup complete')
except OSError as error:
# Since we can't rely on directories from RunConfig existing yet,
# dump log to /tmp, where we should have write permissions
self.logger.move(join('/tmp', default_log_file_name()))
error_msg = (f'Could not create one or more working directories. '
f'reason: \n{str(error)}')
self.name, ErrorCode.DIRECTORY_CREATION_FAILED, error_msg
def _configure_logger(self):
Configures the logger used by the PGE using information from the
parsed and validated RunConfig.
self.logger.error_code_base = self.runconfig.error_code_base
self.logger.workflow = f'{self.runconfig.pge_name}::{basename(__file__)}'
# Relocate the output destination for the log file now that we
# can access output_product_path from the parsed RunConfig
self.logger.move(join(self.runconfig.output_product_path, default_log_file_name()))
self.logger.info(self.name, ErrorCode.LOG_FILE_INIT_COMPLETE,
'Log file configuration complete')
if self.runconfig.qa_enabled:
self.qa_logger.move(join(self.runconfig.output_product_path, default_log_file_name()))
self.qa_logger.info(self.name, ErrorCode.LOG_FILE_INIT_COMPLETE,
'Log file configuration complete')
def _validate_iso_descriptions(self):
"""If given, check if the run-config description file exists and is valid"""
description_file = self.runconfig.iso_measured_parameter_descriptions
if description_file is not None:
description_file = abspath(description_file)
if not exists(description_file):
msg = f'Could not load description file {description_file}, file does not exist.'
self.logger.critical(self.name, ErrorCode.ISO_METADATA_DESCRIPTIONS_CONFIG_NOT_FOUND, msg)
schema_file = resource_filename(
schema = yamale.make_schema(schema_file)
data = yamale.make_data(description_file)
yamale.validate(schema, data)
except yamale.YamaleError as error:
msg = f'The provided measured parameters description config is of invalid format: {error}'
self.logger.critical(self.name, ErrorCode.ISO_METADATA_DESCRIPTIONS_CONFIG_INVALID, msg)
msg = 'Measured parameters descriptions were not provided'
self.logger.warning(self.name, ErrorCode.ISO_METADATA_NO_DESCRIPTIONS, msg)
[docs] def run_preprocessor(self, **kwargs): # pylint: disable=unused-argument
Executes the pre-processing steps for PGE initialization.
Inheritors of this Mixin may override this function to tailor the
order of pre-processing steps.
**kwargs : dict
Any keyword arguments needed by the pre-processor
print(f'Running preprocessor for {self._pre_mixin_name}')
[docs]class PostProcessorMixin:
Mixin class which is responsible for handling all post-processing steps for
the PGE. The post-processing phase is defined as all steps necessary after
SAS execution has completed.
This class is intended for use as a Mixin for use with the PgeExecutor
class and its inheritors, and as such, this class assumes access to the
instance attributes defined by PgeExecutor.
Inheritors of PostProcessorMixin may provide overloaded implementations
for any of the exiting pre-processing steps, and even provide additional
steps as necessary.
_post_mixin_name = "PostProcessorMixin"
def _run_sas_qa_executable(self):
Executes an optional Quality Assurance (QA) application which may be bundled
with a SAS delivery. QA execution is controlled by settings within the
provided RunConfig.
If enabled, execution time for the QA application is collected and logged by
this method.
if self.runconfig.qa_enabled:
qa_program_path = self.runconfig.qa_program_path
qa_program_options = self.runconfig.qa_program_options
command_line = create_qa_command_line(qa_program_path, qa_program_options)
except OSError as err:
self.logger.critical(self.name, ErrorCode.QA_SAS_PROGRAM_FAILED,
f'Failed to create QA command line, reason: {str(err)}')
self.qa_logger.debug(self.name, ErrorCode.SAS_QA_COMMAND_LINE,
f'QA EXE command line: {" ".join(command_line)}')
self.qa_logger.info(self.name, ErrorCode.QA_SAS_PROGRAM_STARTING,
'Starting SAS QA executable')
elapsed_time = time_and_execute(
command_line, self.qa_logger, self.runconfig.execute_via_shell
self.qa_logger.info(self.name, ErrorCode.QA_SAS_PROGRAM_COMPLETED,
'SAS QA executable complete')
self.qa_logger.log_one_metric(self.name, 'sas.qa.elapsed_seconds', elapsed_time)
self.logger.info(self.name, ErrorCode.QA_SAS_PROGRAM_DISABLED,
'SAS QA is disabled, skipping')
def _checksum_output_products(self):
Generates a dictionary mapping output product file names to the
corresponding MD5 checksum digest of the file's contents.
The output products to generate checksums for is determined by scanning
the output product location specified by the RunConfig. Any files
within the directory that have the expected file extensions for output
products are then picked up for checksum generation.
checksums : dict
Mapping of output product file names to MD5 checksums of said
output_products = self.runconfig.get_output_product_filenames()
# Filter out any files that were not renamed by the PGE
filtered_output_products = filter(
lambda product: basename(product) in self.renamed_files.values(),
# Generate checksums on the filtered product list
checksums = {
basename(output_product): get_checksum(output_product)
for output_product in filtered_output_products
return checksums
def _create_catalog_metadata(self):
Returns the catalog metadata as a MetFile instance. Once generated, the
catalog metadata is cached for the life of the PGE instance.
catalog_metadata = {
'PGE_Name': self.runconfig.pge_name,
'PGE_Version': self.PGE_VERSION,
'SAS_Version': self.SAS_VERSION,
'Input_Files': list(map(basename, self.runconfig.get_input_filenames())),
'Ancillary_Files': list(map(basename, self.runconfig.get_ancillary_filenames())),
'Production_DateTime': get_catalog_metadata_datetime_str(self.production_datetime),
'Output_Product_Checksums': self._checksum_output_products()
return MetFile(catalog_metadata)
def _create_iso_metadata(self): # pylint: disable=no-self-use
Creates the ISO metadata utilized by the DAAC's for indexing output
products submitted by OPERA. Inheritors of PostProcessorMixin must
provide their own implementations, as ISO metadata is not applicable
to the base PGE. Rather, this implementation may be optionally invoked
by inheritors to perform common validation checks on the ISO template to
be used with the PGE.
iso_template_path = self.runconfig.iso_template_path
if iso_template_path is None:
msg = "ISO template path not provided in runconfig"
self.logger.critical(self.name, ErrorCode.ISO_METADATA_TEMPLATE_NOT_PROVIDED_WHEN_NEEDED, msg)
iso_template_path = os.path.abspath(iso_template_path)
if not os.path.exists(iso_template_path):
msg = f"Could not load ISO template {iso_template_path}, file does not exist"
self.logger.critical(self.name, ErrorCode.ISO_METADATA_TEMPLATE_NOT_FOUND, msg)
# Base PGE does not produce ISO metadata.
return None
def _finalize_log(self, logger):
Finalizes the provided logger such that the execution summary is logged before
the log file is closed. This should typically be one of the last functions
invoked by a post-processor, since the log file will be unavailable for
writing after this function is called.
logger : PgeLogger
The PgeLogger instance to finalize.
logger.info(self.name, ErrorCode.CLOSING_LOG_FILE,
f"Closing log file {logger.get_file_name()}")
def _core_filename(self, inter_filename=None): # pylint: disable=unused-argument
Returns the core file name component for products produced by the
Base PGE. This function should typically be overridden by inheritors
of PostProcessorMixin to accomplish the specific file-naming conventions
required by the PGE.
The core file name component of the Base PGE consists of:
Callers of this function are responsible for assignment of any other
product-specific fields, such as the file extension.
inter_filename : str, optional
The intermediate filename of the output product to generate the
core filename for. This parameter may be used to inspect the file
in order to derive any necessary components of the returned filename.
For the base PGE, this parameter is unused and may be omitted.
core_filename : str
The core file name component to assign to products created by this PGE.
time_tag = get_time_for_filename(self.production_datetime)
return f"{self.PROJECT}_{self.LEVEL}_{self.NAME}_{time_tag}"
def _geotiff_filename(self, inter_filename):
Returns the file name to use for GeoTIFF's produced by the Base PGE.
The GeoTIFF filename for the Base PGE consists of:
<Core filename>_<inter_filename>.tif
Where <Core filename> is returned by PostProcessorMixin._core_filename()
inter_filename : str
The intermediate filename of the output GeoTIFF to generate
a filename for. This parameter may be used to inspect the file
in order to derive any necessary components of the returned filename.
geotiff_filename : str
The file name to assign to GeoTIFF product(s) created by this PGE.
base_filename = splitext(basename(inter_filename))[0]
return self._core_filename() + f"_{base_filename}.tif"
def _catalog_metadata_filename(self):
Returns the file name to use for Catalog Metadata produced by the Base PGE.
The Catalog Metadata file name for the Base PGE consists of:
<Core filename>.catalog.json
Where <Core filename> is returned by PostProcessorMixin._core_filename()
catalog_metadata_filename : str
The file name to assign to the Catalog Metadata product created by this PGE.
return self._core_filename() + ".catalog.json"
def _iso_metadata_filename(self):
Returns the file name to use for ISO Metadata produced by the Base PGE.
The ISO Metadata file name for the Base PGE consists of:
<Core filename>.iso.xml
Where <Core filename> is returned by PostProcessorMixin._core_filename()
iso_metadata_filename : str
The file name to assign to the ISO Metadata product created by this PGE.
return self._core_filename() + ".iso.xml"
def _log_filename(self):
Returns the file name to use for the PGE/SAS log file produced by the Base PGE.
The log file name for the Base PGE consists of:
<Core filename>.log
Where <Core filename> is returned by PostProcessorMixin._core_filename()
log_filename : str
The file name to assign to the PGE/SAS log created by this PGE.
return self._core_filename() + ".log"
def _qa_log_filename(self):
Returns the file name to use for the Quality Assurance application log
file produced by the Base PGE.
The log file name for the Base PGE consists of:
<Core filename>.qa.log
Where <Core filename> is returned by PostProcessorMixin._core_filename()
log_filename : str
The file name to assign to the QA log created by this PGE.
return self._core_filename() + ".qa.log"
def _assign_filename(self, input_filepath, output_dir):
Assigns the appropriate file name which meets the file-naming conventions
for the PGE to the provided input file on disk.
The file name function used to assign is determined based on a unix-style
pattern match of the provided input file name against the patterns
configured in the renaming map for the PGE class.
If no file name assignment function is configured
for a given extension, the file name assignment is skipped.
input_filepath : str
Absolute path to the file on disk to be renamed by this function.
output_dir : str
The output directory destination for the renamed file.
file_name = os.path.basename(input_filepath)
# Lookup the specific rename function configured for the current filename
for file_pattern, rename_function in self.rename_by_pattern_map.items():
if fnmatch(file_name, file_pattern):
final_filename = rename_function(input_filepath)
self.renamed_files[input_filepath] = final_filename
msg = f'No rename function configured for file "{basename(input_filepath)}", skipping assignment'
self.logger.warning(self.name, ErrorCode.NO_RENAME_FUNCTION_FOR_EXTENSION, msg)
# Generate the final file name to assign
final_filepath = os.path.join(output_dir, final_filename)
self.logger.info(self.name, ErrorCode.MOVING_LOG_FILE,
f"Renaming output file {input_filepath} to {final_filepath}")
os.rename(input_filepath, final_filepath)
except OSError as err:
msg = f"Failed to rename output file {basename(input_filepath)}, reason: {str(err)}"
self.logger.critical(self.name, ErrorCode.FILE_MOVE_FAILED, msg)
def _stage_output_files(self):
Ensures that all output products produced by both the SAS and this PGE
are staged to the output location defined by the RunConfig. This includes
reassignment of file names to meet the file-naming conventions required
by the PGE.
In addition to staging of the output products created by the SAS, this
function is also responsible for ensuring the catalog metadata, ISO
metadata, and combined PGE/SAS log are also written to the expected
output product location with the appropriate file names.
# Gather the list of output files produced by the SAS
output_products = self.runconfig.get_output_product_filenames()
# For each output file name, assign the final file name matching the
# expected conventions
for output_product in output_products:
self._assign_filename(output_product, self.runconfig.output_product_path)
# Write the catalog metadata to disk with the appropriate filename
catalog_metadata = self._create_catalog_metadata()
if not catalog_metadata.validate(catalog_metadata.get_schema_file_path()):
msg = f"Failed to create valid catalog metadata, reason(s):\n {catalog_metadata.get_error_msg()}"
self.logger.critical(self.name, ErrorCode.INVALID_CATALOG_METADATA, msg)
cat_meta_filename = self._catalog_metadata_filename()
cat_meta_filepath = join(self.runconfig.output_product_path, cat_meta_filename)
self.logger.info(self.name, ErrorCode.CREATING_CATALOG_METADATA,
f"Writing Catalog Metadata to {cat_meta_filepath}")
except OSError as err:
msg = f"Failed to write catalog metadata file {cat_meta_filepath}, reason: {str(err)}"
self.logger.critical(self.name, ErrorCode.CATALOG_METADATA_CREATION_FAILED, msg)
# Generate the ISO metadata for use with product submission to DAAC(s)
iso_metadata = self._create_iso_metadata()
iso_meta_filename = self._iso_metadata_filename()
iso_meta_filepath = join(self.runconfig.output_product_path, iso_meta_filename)
if iso_metadata:
self.logger.info(self.name, ErrorCode.RENDERING_ISO_METADATA,
f"Writing ISO Metadata to {iso_meta_filepath}")
with open(iso_meta_filepath, 'w', encoding='utf-8') as outfile:
# Write the QA application log to disk with the appropriate filename,
# if necessary
if self.runconfig.qa_enabled:
qa_log_filename = self._qa_log_filename()
qa_log_filepath = join(self.runconfig.output_product_path, qa_log_filename)
except OSError as err:
msg = f"Failed to write QA log file to {qa_log_filepath}, reason: {str(err)}"
self.logger.critical(self.name, ErrorCode.LOG_FILE_CREATION_FAILED, msg)
# Lastly, write the combined PGE/SAS log to disk with the appropriate filename
log_filename = self._log_filename()
log_filepath = join(self.runconfig.output_product_path, log_filename)
except OSError as err:
msg = f"Failed to write log file to {log_filepath}, reason: {str(err)}"
# Log stream might be closed by this point so raise an Exception instead
raise RuntimeError(msg)
[docs] def augment_measured_parameters(self, measured_parameters):
Augment the measured parameters dict into a dict of dicts containing the needed
fields for the MeasuredParameters section of the ISO XML file.
measured_parameters : dict
The GeoTIFF metadata from the output product. See get_geotiff_metadata()
augmented_parameters : dict
The metadata fields converted to a list with name, value, types, etc
augmented_parameters = dict()
descriptions_file = self.runconfig.iso_measured_parameter_descriptions
if descriptions_file is not None:
with open(descriptions_file) as f:
descriptions = yaml.safe_load(f)
missing_description_value = UNDEFINED_ERROR
descriptions = dict()
missing_description_value = UNDEFINED_WARNING
for name, value in measured_parameters.items():
if isinstance(value, np.generic):
value = value.item()
if isinstance(value, np.ndarray):
value = value.tolist()
if isinstance(value, (list, dict)):
value = json.dumps(value, cls=NumpyEncoder)
guessed_data_type = python_type_to_xml_type(value)
guessed_attr_name = guess_attribute_display_name(name)
descriptions.setdefault(name, dict())
attr_description = descriptions[name].get('description', missing_description_value)
data_type = descriptions[name].get('attribute_data_type', guessed_data_type)
attr_type = descriptions[name].get('attribute_type', UNDEFINED_ERROR)
attr_name = descriptions[name].get('display_name', guessed_attr_name)
escape_html = descriptions[name].get('escape_html', False)
if escape_html:
value = html.escape(value)
augmented_parameters[name] = (
dict(name=attr_name, value=value, attr_type=attr_type,
attr_description=attr_description, data_type=data_type)
return augmented_parameters
[docs] def run_postprocessor(self, **kwargs): # pylint: disable=unused-argument
Executes the post-processing steps for PGE job completion.
Inheritors of this Mixin may override this function to tailor the
order of post-processing steps.
**kwargs : dict
Any keyword arguments needed by the post-processor
print(f'Running postprocessor for {self._post_mixin_name}')
[docs]class PgeExecutor(PreProcessorMixin, PostProcessorMixin):
Main class for execution of a PGE, including the SAS layer.
The PgeExecutor class is primarily responsible for defining the interface
for PGE execution and managing the actual execution of the SAS executable
within a subprocess. PGE's also define pre- and post-processing stages,
which are invoked by PgeExecutor, but whose implementations are defined
by use of Mixin classes.
The use of Mixin classes allows for flexibility of PGE design, where
inheritors of PgeExecutor can compose a custom PGE by providing overloaded
implementations of the Mixin classes to tailor the behavior of the pre-
and post-processing phases, where necessary, while still inheriting any
common functionality from this class.
"""Name of the project associated to this PGE"""
NAME = "BasePge"
"""Short name for the Base PGE"""
LEVEL = "L0"
"""Processing Level for Base PGE Products (dummy value)"""
PGE_VERSION = opera.__version__
"""Version of the PGE (tracks repo version by default)"""
"""Version of the SAS wrapped by this PGE (dummy value)"""
def __init__(self, pge_name, runconfig_path, **kwargs):
Creates a new instance of PgeExecutor
pge_name : str
Name to associate with this PGE.
runconfig_path : str
Path to the RunConfig to be used with this PGE.
kwargs : dict
Any additional keyword arguments needed by the PGE.
Supported kwargs include:
- logger : An existing instance of PgeLogger for this PgeExecutor
to use, rather than creating its own.
self.name = self.NAME
self.pge_name = pge_name
self.runconfig_path = runconfig_path
self.runconfig = None
self.logger = kwargs.get('logger')
self.production_datetime = datetime.now()
# Mapping of unix-style file name patterns to function pointers
# used to rename said file
self.rename_by_pattern_map = OrderedDict(
'*.tif*': self._geotiff_filename
# Keeps track of the files that were renamed by the PGE
self.renamed_files = OrderedDict()
def _isolate_sas_runconfig(self):
Isolates the SAS-specific portion of the RunConfig into its own
YAML file, so it may be fed into the SAS executable without unneeded
PGE configuration settings.
sas_config = self.runconfig.sas_config
pge_runconfig_filename = basename(self.runconfig.filename)
pge_runconfig_fileparts = splitext(pge_runconfig_filename)
sas_runconfig_filename = f'{pge_runconfig_fileparts[0]}_sas{pge_runconfig_fileparts[1]}'
sas_runconfig_filepath = join(self.runconfig.scratch_path, sas_runconfig_filename)
with open(sas_runconfig_filepath, 'w', encoding='utf-8') as outfile:
yaml.safe_dump(sas_config, outfile, sort_keys=False)
except OSError as err:
self.logger.critical(self.name, ErrorCode.SAS_CONFIG_CREATION_FAILED,
f'Failed to create SAS config file {sas_runconfig_filepath}, '
f'reason: {str(err)}')
self.logger.info(self.name, ErrorCode.CREATED_SAS_CONFIG,
f'SAS RunConfig created at {sas_runconfig_filepath}')
return sas_runconfig_filepath
[docs] def run_sas_executable(self, **kwargs): # pylint: disable=unused-argument
Kicks off a SAS executable as defined by the RunConfig provided to
the PGE.
Execution time for the SAS is collected and logged by this method.
**kwargs : dict
Any keyword arguments needed for SAS execution.
sas_program_path = self.runconfig.sas_program_path
sas_program_options = self.runconfig.sas_program_options
sas_runconfig_filepath = self._isolate_sas_runconfig()
command_line = create_sas_command_line(
sas_program_path, sas_runconfig_filepath, sas_program_options
except OSError as err:
self.logger.critical(self.name, ErrorCode.SAS_PROGRAM_FAILED,
f'Failed to create SAS command line, reason: {str(err)}')
self.logger.debug(self.name, ErrorCode.SAS_EXE_COMMAND_LINE,
f'SAS EXE command line: {" ".join(command_line)}')
self.logger.info(self.name, ErrorCode.SAS_PROGRAM_STARTING,
'Starting SAS executable')
elapsed_time = time_and_execute(
command_line, self.logger, self.runconfig.execute_via_shell
self.logger.info(self.name, ErrorCode.SAS_PROGRAM_COMPLETED,
'SAS executable complete')
self.logger.log_one_metric(self.name, 'sas.elapsed_seconds', elapsed_time)
[docs] def run(self, **kwargs):
Main entry point for PGE execution.
The pre-processor stage is run to initialize the PGE, followed by
SAS execution, then completed with the post-processing steps to complete
the job.
print(f'Starting SAS execution for {self.__class__.__name__}')