Source code for fprime_gds.common.testing_fw.api

"""
gds_test_api.py:

This file contains basic asserts that can support integration tests on an FPrime
deployment. This API uses the standard pipeline to get access to commands, events,
telemetry and dictionaries.

:author: koran
"""
import signal
import time

from fprime.common.models.serialize.time_type import TimeType
from fprime_gds.common.handlers import DataHandler
from fprime_gds.common.history.chrono import ChronologicalHistory
from fprime_gds.common.history.test import TestHistory
from fprime_gds.common.logger.test_logger import TestLogger
from fprime_gds.common.testing_fw import predicates
from fprime_gds.common.utils.event_severity import EventSeverity


[docs]class IntegrationTestAPI(DataHandler): """ A value used to begin searches after the current contents in a history and only search future items """
[docs] NOW = "NOW"
def __init__(self, pipeline, logpath=None, fsw_order=True): """ Initializes API: constructs and registers test histories. Args: pipeline: a pipeline object providing access to basic GDS functionality logpath: an optional output destination for the api test log fsw_order: a flag to determine whether the API histories will maintain FSW time order. """ self.pipeline = pipeline # these are owned by the GDS and will not be modified by the test API. self.aggregate_command_history = pipeline.histories.commands self.aggregate_telemetry_history = pipeline.histories.channels self.aggregate_event_history = pipeline.histories.events # these histories are owned by the TestAPI and are modified by the API. self.fsw_ordered = fsw_order if fsw_order: self.command_history = ChronologicalHistory() self.telemetry_history = ChronologicalHistory() self.event_history = ChronologicalHistory() else: self.command_history = TestHistory() self.telemetry_history = TestHistory() self.event_history = TestHistory() self.pipeline.coders.register_command_consumer(self.command_history) self.pipeline.coders.register_event_consumer(self.event_history) self.pipeline.coders.register_channel_consumer(self.telemetry_history) # Initialize latest time. Will be updated whenever a time query is made. self.latest_time = TimeType() # Initialize the logger if logpath is not None: self.logger = TestLogger(logpath) else: self.logger = None # A predicate used as a filter to choose which events to log automatically self.event_log_filter = self.get_event_pred() self.pipeline.coders.register_event_consumer(self) # Used by the data_callback method to detect if events have been received out of order. self.last_evr = None
[docs] def teardown(self): """ To be called once at the end of the API's use. Closes the test log and clears histories. """ self.clear_histories() if self.logger is not None: self.logger.close_log() self.logger = None
###################################################################################### # API Functions ######################################################################################
[docs] def start_test_case(self, case_name, case_id): """ To be called at the start of a test case. This function inserts a log message to denote a new test case is beginning, records the latest time stamp in case the user clears the aggregate histories, and then clears the API's histories. Args: case_name: the name of the test case (str) case_id: a short identifier to denote the test case (str or number) """ msg = "[STARTING CASE] {}".format(case_name) self.__log(msg, TestLogger.GRAY, TestLogger.BOLD, case_id=case_id) self.get_latest_time() # called in case aggregate histories are cleared by the user self.clear_histories()
[docs] def log(self, msg, color=None): """ User-accessible function to log user messages to the test log. Args: msg: a user-provided message to add to the test log. (str) color: a string containing a color hex code "######" (str) """ self.__log(msg, color, sender="API user")
[docs] def get_latest_time(self): """ Finds the latest flight software time received by either history. Returns: a flight software timestamp (TimeType) """ events = self.aggregate_event_history.retrieve() e_time = TimeType() if len(events) > 0: e_time = events[-1].get_time() channels = self.aggregate_telemetry_history.retrieve() t_time = TimeType() if len(channels) > 0: t_time = channels[-1].get_time() self.latest_time = max(e_time, t_time, self.latest_time) return self.latest_time
[docs] def test_assert(self, value, msg="", expect=False): """ this assert gives the user the ability to add formatted assert messages to the test log and raise an assertion. Args: value: a boolean value that determines if the assert is successful. msg: a string describing what is checked by the assert. expect: when True, the call will behave as an expect, and will skip the assert (boolean) Return: True if the assert was successful, False otherwise """ if not expect: ast_msg = "User assertion" fail_color = TestLogger.RED else: ast_msg = "User expectation" fail_color = TestLogger.ORANGE if value: ast_msg = ast_msg + " succeeded: " + msg self.__log(ast_msg, TestLogger.GREEN) else: ast_msg = ast_msg + " failed: " + msg self.__log(ast_msg, fail_color) if not expect: assert value, ast_msg return value
[docs] def predicate_assert(self, predicate, value, msg="", expect=False): """ API assert gives the user the ability to add formatted assert messages to the test log and raise an assertion. Args: value: the value to be evaluated by the predicate. (object) predicate: an instance of predicate that will decided if the test passes (predicate) msg: a string describing what is checked by the assert. (str) expect: when True, the call will behave as an expect, and will skip the assert (boolean) Return: True if the assert was successful, False otherwise """ return self.__assert_pred("User", predicate, value, msg, expect)
[docs] def clear_histories(self, time_stamp=None): """ Clears the IntegrationTestAPI's histories. Because the command history is not correlated to a flight software timestamp, it will be cleared entirely. This function can be used to set up test cases so that the IntegrationTestAPI's histories only contain objects received during that test. Note: this will not clear user-created sub-histories nor the aggregate histories (histories owned by the GDS) Args: time_stamp: If specified, histories are only cleared before the timestamp. """ if time_stamp is not None: time_pred = predicates.greater_than_or_equal_to(time_stamp) e_pred = predicates.event_predicate(time_pred=time_pred) self.event_history.clear(e_pred) t_pred = predicates.telemetry_predicate(time_pred=time_pred) self.telemetry_history.clear(t_pred) msg = "Clearing Test Histories after {}".format(time_stamp) self.__log(msg, TestLogger.WHITE) else: self.event_history.clear() self.telemetry_history.clear() msg = "Clearing Test Histories" self.__log(msg, TestLogger.WHITE) self.command_history.clear()
[docs] def set_event_log_filter( self, event=None, args=None, severity=None, time_pred=None ): """ Constructs an event predicate that is then used to filter which events are interlaced in the test logs. This method replaces the current filter. Calling this method with no arguments will effectively reset the filter. Args: event: an optional mnemonic (str), id (int), or predicate to specify the event type args: an optional list of arguments (list of values, predicates, or None to ignore) severity: an EventSeverity enum or a predicate to specify the event severity time_pred: an optional predicate to specify the flight software timestamp """ self.event_log_filter = self.get_event_pred(event, args, severity, time_pred)
###################################################################################### # History Functions ######################################################################################
[docs] def get_command_test_history(self): """ Accessor for IntegrationTestAPI's command history Returns: a history of CmdData objects """ return self.command_history
[docs] def get_telemetry_test_history(self): """ Accessor for IntegrationTestAPI's telemetry history Returns: a history of ChData objects """ return self.telemetry_history
[docs] def get_event_test_history(self): """ Accessor for IntegrationTestAPI's event history Returns: a history of EventData objects """ return self.event_history
[docs] def get_telemetry_subhistory(self, telemetry_filter=None, fsw_order=True): """ Returns a new instance of TestHistory that will be updated with new telemetry updates as they come in. Specifying a filter will only enqueue updates that satisfy the filter in this new sub-history. The returned history can be substituted into the await and assert methods of this API. Args: telemetry_filter: an optional predicate used to filter a subhistory. fsw_order: a flag to determine whether this subhistory will maintain FSW time order. Returns: an instance of TestHistory """ if fsw_order: subhist = ChronologicalHistory(telemetry_filter) else: subhist = TestHistory(telemetry_filter) self.pipeline.coders.register_channel_consumer(subhist) return subhist
[docs] def remove_telemetry_subhistory(self, subhist): """ De-registers the subhistory from the GDS. Once called, the given subhistory will stop receiving telemetry updates. Args: subhist: a TestHistory instance that is subscribed to event messages Returns: True if the subhistory was removed, False otherwise """ return self.pipeline.coders.remove_channel_consumer(subhist)
[docs] def get_event_subhistory(self, event_filter=None, fsw_order=True): """ Returns a new instance of TestHistory that will be updated with new events as they come in. Specifying a filter will only enqueue events that satisfy the filter in this new sub-history. The returned history can be substituted into the await and assert methods of this API. Args: event_filter: an optional predicate to filter a subhistory. fsw_order: a flag to determine whether this subhistory will maintain FSW time order. Returns: an instance of TestHistory """ if fsw_order: subhist = ChronologicalHistory(event_filter) else: subhist = TestHistory(event_filter) self.pipeline.coders.register_event_consumer(subhist) return subhist
[docs] def remove_event_subhistory(self, subhist): """ De-registers the subhistory from the GDS. Once called, the given subhistory will stop receiving event messages. Args: subhist: a TestHistory instance that is subscribed to event messages Returns: True if the subhistory was removed, False otherwise """ return self.pipeline.coders.remove_event_consumer(subhist)
###################################################################################### # Command Functions ######################################################################################
[docs] def translate_command_name(self, command): """ This function will translate the given mnemonic into an ID as defined by the flight software dictionary. This call will raise an error if the command given is not in the dictionary. Args: command: Either the command id (int) or the command mnemonic (str) Returns: The comand ID (int) """ if isinstance(command, str): cmd_dict = self.pipeline.dictionaries.command_name if command in cmd_dict: return cmd_dict[command].get_id() else: msg = "The command mnemonic, {}, wasn't in the dictionary".format( command ) raise KeyError(msg) else: cmd_dict = self.pipeline.dictionaries.command_id if command in cmd_dict: return command else: msg = "The command id, {}, wasn't in the dictionary".format(command) raise KeyError(msg)
[docs] def send_command(self, command, args=None): """ Sends the specified command. Args: command: the mnemonic (str) or ID (int) of the command to send args: a list of command arguments. """ if args is None: args = [] msg = "Sending Command: {} {}".format(command, args) self.__log(msg, TestLogger.PURPLE) command = self.translate_command_name(command) self.pipeline.send_command(command, args)
[docs] def send_and_await_telemetry(self, command, args=None, channels=None, timeout=5): """ Sends the specified command and awaits the specified channel update or sequence of updates. See await_telemetry and await_telemetry_sequence for full details. Note: If awaiting a sequence avoid specifying timestamps. Args: command: the mnemonic (str) or ID (int) of the command to send args: a list of command arguments. channels: a single or a sequence of channel specs (event_predicates, mnemonics, or IDs) start: an optional index or predicate to specify the earliest item to search timeout: the number of seconds to wait before terminating the search (int) Returns: The channel update or updates found by the search """ if args is None: args = [] if channels is None: channels = [] start = self.telemetry_history.size() self.send_command(command, args) if isinstance(channels, list): return self.await_telemetry_sequence(channels, start=start, timeout=timeout) else: return self.await_telemetry(channels, start=start, timeout=timeout)
[docs] def send_and_await_event(self, command, args=None, events=None, timeout=5): """ Sends the specified command and awaits the specified event message or sequence of messages. See await_event and await event sequence for full details. Note: If awaiting a sequence avoid specifying timestamps. Args: command: the mnemonic (str) or ID (int) of the command to send args: a list of command arguments. events: a single or a sequence of event specifiers (event_predicates, mnemonics, or IDs) start: an optional index or predicate to specify the earliest item to search timeout: the number of seconds to wait before terminating the search (int) Returns: The event or events found by the search """ if args is None: args = [] if events is None: events = [] start = self.event_history.size() self.send_command(command, args) if isinstance(events, list): return self.await_event_sequence(events, start=start, timeout=timeout) else: return self.await_event(events, start=start, timeout=timeout)
###################################################################################### # Command Asserts ######################################################################################
[docs] def send_and_assert_telemetry(self, command, args=None, channels=None, timeout=5): """ Sends the specified command and asserts on the specified channel update or sequence of updates. See await_telemetry and await_telemetry_sequence for full details. Note: If awaiting a sequence avoid specifying timestamps. Args: command: the mnemonic (str) or ID (int) of the command to send args: a list of command arguments. channels: a single or a sequence of channel specs (event_predicates, mnemonics, or IDs) start: an optional index or predicate to specify the earliest item to search timeout: the number of seconds to wait before terminating the search (int) Returns: The channel update or updates found by the search """ if args is None: args = [] if channels is None: channels = [] start = self.telemetry_history.size() self.send_command(command, args) if isinstance(channels, list): return self.assert_telemetry_sequence( channels, start=start, timeout=timeout ) else: return self.assert_telemetry(channels, start=start, timeout=timeout)
[docs] def send_and_assert_event(self, command, args=None, events=None, timeout=5): """ Sends the specified command and asserts on the specified event message or sequence of messages. See assert_event and assert event sequence for full details. Args: command: the mnemonic (str) or ID (int) of the command to send args: a list of command arguments. events: a single or a sequence of event specifiers (event_predicates, mnemonics, or IDs) start: an optional index or predicate to specify the earliest item to search timeout: the number of seconds to wait before terminating the search (int) Returns: The event or events found by the search """ if args is None: args = [] if events is None: events = [] start = self.event_history.size() self.send_command(command, args) if isinstance(events, list): return self.assert_event_sequence(events, start=start, timeout=timeout) else: return self.assert_event(events, start=start, timeout=timeout)
###################################################################################### # Telemetry Functions ######################################################################################
[docs] def translate_telemetry_name(self, channel): """ This function will translate the given mnemonic into an ID as defined by the flight software dictionary. This call will raise an error if the channel given is not in the dictionary. Args: channel: a channel mnemonic (str) or id (int) Returns: the channel ID (int) """ if isinstance(channel, str): ch_dict = self.pipeline.dictionaries.channel_name if channel in ch_dict: return ch_dict[channel].get_id() else: msg = "The telemetry mnemonic, {}, wasn't in the dictionary".format( channel ) raise KeyError(msg) else: ch_dict = self.pipeline.dictionaries.channel_id if channel in ch_dict: return channel else: msg = "The telemetry mnemonic, {}, wasn't in the dictionary".format( channel ) raise KeyError(msg)
[docs] def get_telemetry_pred(self, channel=None, value=None, time_pred=None): """ This function will translate the channel ID, and construct a telemetry_predicate object. It is used as a helper by the IntegrationTestAPI, but could also be helpful to a user of the test API. If channel is already an instance of telemetry_predicate, it will be returned immediately. The provided implementation of telemetry_predicate evaluates true if and only if all specified constraints are satisfied. If a specific constraint isn't specified, then it will not effect the outcome; this means all arguments are optional. If no constraints are specified, the predicate will always return true. Args: channel: an optional mnemonic (str), id (int), or predicate to specify the channel type value: an optional value (object/number) or predicate to specify the value field time_pred: an optional predicate to specify the flight software timestamp Returns: an instance of telemetry_predicate """ if isinstance(channel, predicates.telemetry_predicate): return channel if not predicates.is_predicate(channel) and channel is not None: channel = self.translate_telemetry_name(channel) channel = predicates.equal_to(channel) if not predicates.is_predicate(value) and value is not None: value = predicates.equal_to(value) return predicates.telemetry_predicate(channel, value, time_pred)
[docs] def await_telemetry( self, channel, value=None, time_pred=None, history=None, start="NOW", timeout=5 ): """ A search for a single telemetry update received. By default, the call will only await until a correct update is found. The user can specify that await also searches the current history by specifying a value for start. On timeout, the search will return None. Args: channel: a channel specifier (mnemonic, id, or predicate) value: optional value (object/number) or predicate to specify the value field time_pred: an optional predicate to specify the flight software timestamp history: if given, a substitute history that the function will search and await start: an optional index or predicate to specify the earliest item to search timeout: the number of seconds to wait before terminating the search (int) Returns: the ChData object found during the search, otherwise, None """ t_pred = self.get_telemetry_pred(channel, value, time_pred) if history is None: history = self.get_telemetry_test_history() return self.find_history_item(t_pred, history, start, timeout)
[docs] def await_telemetry_sequence(self, channels, history=None, start="NOW", timeout=5): """ A search for a sequence of telemetry updates. By default, the call will only await until the sequence is completed. The user can specify that await also searches the history by specifying a value for start. On timeout, the search will return the list of found channel updates regardless of whether the sequence is complete. Note: It is reccomended (but not enforced) not to specify timestamps for this assert. Note: This function will always return a list of updates. The user should check if the sequence was completed. Args: channels: an ordered list of channel specifiers (mnemonic, id, or predicate) history: if given, a substitute history that the function will search and await start: an optional index or predicate to specify the earliest item to search timeout: the number of seconds to wait before terminating the search (int) Returns: an ordered list of ChData objects that satisfies the sequence """ seq_preds = [] for channel in channels: seq_preds.append(self.get_telemetry_pred(channel)) if history is None: history = self.get_telemetry_test_history() return self.find_history_sequence(seq_preds, history, start, timeout)
[docs] def await_telemetry_count( self, count, channels=None, history=None, start="NOW", timeout=5 ): """ A search on the number of telemetry updates received. By default, the call will only await until a correct count is achieved. The user can specify that await also searches the current history by specifying a value for start. On timeout, the search will return the list of found channel updates regardless of whether a correct count is achieved. Note: this search will always return a list of objects. The user should check if the search was completed. Args: count: either an exact amount (int) or a predicate to specify how many objects to find channels: a channel specifier or list of channel specifiers (mnemonic, ID, or predicate) history: if given, a substitute history that the function will search and await start: an optional index or predicate to specify the earliest item to search timeout: the number of seconds to wait before terminating the search (int) Returns: a list of the ChData objects that were counted """ if channels is None: search = None elif isinstance(channels, list): t_preds = [] for channel in channels: t_preds.append(self.get_telemetry_pred(channel=channel)) search = predicates.satisfies_any(t_preds) else: search = self.get_telemetry_pred(channel=channels) if history is None: history = self.get_telemetry_test_history() return self.find_history_count(count, history, search, start, timeout)
###################################################################################### # Telemetry Asserts ######################################################################################
[docs] def assert_telemetry( self, channel, value=None, time_pred=None, history=None, start=None, timeout=0 ): """ An assert on a single telemetry update received. If the history doesn't have the correct update, the call will await until a correct update is received or the timeout, at which point it will assert failure. Args: channel: a channel specifier (mnemonic, id, or predicate) value: optional value (object/number) or predicate to specify the value field time_pred: an optional predicate to specify the flight software timestamp history: if given, a substitute history that the function will search and await start: an optional index or predicate to specify the earliest item to search timeout: the number of seconds to wait before terminating the search (int) Returns: the ChData object found during the search """ pred = self.get_telemetry_pred(channel, value, time_pred) result = self.await_telemetry( channel, value, time_pred, history, start, timeout ) msg = "checks if item search found a correct update" self.__assert_pred("Telemetry received", pred, result, msg) return result
[docs] def assert_telemetry_sequence(self, channels, history=None, start=None, timeout=0): """ A search for a sing sequence of telemetry updates messages. If the history doesn't have the complete sequence, the call will await until the sequence is completed or the timeout, at which point it will return the list of found channel updates. Note: It is reccomended (but not enforced) not to specify timestamps for this assert. Note: This function will always return a list of updates the user should check if the sequence was completed. Args: channels: an ordered list of channel specifiers (mnemonic, id, or predicate) history: if given, a substitute history that the function will search and await start: an optional index or predicate to specify the earliest item to search timeout: the number of seconds to wait before terminating the search (int) Returns: an ordered list of ChData objects that satisfies the sequence """ results = self.await_telemetry_sequence(channels, history, start, timeout) len_pred = predicates.equal_to(len(channels)) msg = "checks if sequence search found every update" self.__assert_pred("Telemetry sequence", len_pred, len(results), msg) return results
[docs] def assert_telemetry_count( self, count, channels=None, history=None, start=None, timeout=0 ): """ An assert on the number of channel updates received. If the history doesn't have the correct update count, the call will await until a correct count is achieved or the timeout, at which point it will assert failure. Args: count: either an exact amount (int) or a predicate to specify how many objects to find channels: a channel specifier or list of channel specifiers (mnemonic, ID, or predicate) history: if given, a substitute history that the function will search and await start: an optional index or predicate to specify the earliest item to search timeout: the number of seconds to wait before terminating the search (int) Returns: a list of the ChData objects that were counted """ results = self.await_telemetry_count(count, channels, history, start, timeout) count_pred = ( count if predicates.is_predicate(count) else predicates.equal_to(count) ) msg = "checks if count search found a correct amount of updates" self.__assert_pred("Telemetry count", count_pred, len(results), msg) return results
###################################################################################### # Event Functions ######################################################################################
[docs] def translate_event_name(self, event): """ This function will translate the given mnemonic into an ID as defined by the flight software dictionary. This call will raise an error if the event given is not in the dictionary. Args: event: an event mnemonic (str) or ID (int) Returns: the event ID (int) """ if isinstance(event, str): event_dict = self.pipeline.dictionaries.event_name if event in event_dict: return event_dict[event].get_id() else: msg = "The event mnemonic, {}, wasn't in the dictionary".format(event) raise KeyError(msg) else: event_dict = self.pipeline.dictionaries.event_id if event in event_dict: return event else: msg = "The event id, {}, wasn't in the dictionary".format(event) raise KeyError(msg)
[docs] def get_event_pred(self, event=None, args=None, severity=None, time_pred=None): """ This function will translate the event ID, and construct an event_predicate object. It is used as a helper by the IntegrationTestAPI, but could also be helpful to a user of the test API. If event is already an instance of event_predicate, it will be returned immediately. The provided implementation of event_predicate evaluates true if and only if all specified constraints are satisfied. If a specific constraint isn't specified, then it will not effect the outcome; this means all arguments are optional. If no constraints are specified, the predicate will always return true. Args: event: mnemonic (str), id (int), or predicate to specify the event type args: list of arguments (list of values, predicates, or None to ignore) severity: an EventSeverity enum or a predicate to specify the event severity time_pred: predicate to specify the flight software timestamp Returns: an instance of event_predicate """ if isinstance(event, predicates.event_predicate): return event if not predicates.is_predicate(event) and event is not None: event = self.translate_event_name(event) event = predicates.equal_to(event) if not predicates.is_predicate(args) and args is not None: args = predicates.args_predicate(args) if not predicates.is_predicate(severity) and severity is not None: if not isinstance(severity, EventSeverity): msg = "Given severity was not a valid Severity Enum Value: {} ({})".format( severity, type(severity) ) raise TypeError(msg) severity = predicates.equal_to(severity) return predicates.event_predicate(event, args, severity, time_pred)
[docs] def await_event( self, event, args=None, severity=None, time_pred=None, history=None, start="NOW", timeout=5, ): """ A search for a single event message received. By default, the call will only await until a correct message is found. The user can specify that await also searches the current history by specifying a value for start. On timeout, the search will return None. Args: event: an event specifier (mnemonic, id, or predicate) args: a list of expected arguments (list of values, predicates, or None for don't care) severity: an EventSeverity enum or a predicate to specify the event severity time_pred: an optional predicate to specify the flight software timestamp history: if given, a substitute history that the function will search and await start: an optional index or predicate to specify the earliest item to search timeout: the number of seconds to wait before terminating the search (int) Returns: the EventData object found during the search, otherwise, None """ e_pred = self.get_event_pred(event, args, severity, time_pred) if history is None: history = self.get_event_test_history() return self.find_history_item(e_pred, history, start, timeout)
[docs] def await_event_sequence(self, events, history=None, start="NOW", timeout=5): """ A search for a sequence of event messages. By default, the call will only await until the sequence is completed. The user can specify that await also searches the history by specifying a value for start. On timeout, the search will return the list of found event messages regardless of whether the sequence is complete. Note: It is reccomended (but not enforced) not to specify timestamps for this assert. Note: This function will always return a list of events the user should check if the sequence was completed. Args: events: an ordered list of event specifiers (mnemonic, id, or predicate) history: if given, a substitute history that the function will search and await start: an optional index or predicate to specify the earliest item to search timeout: the number of seconds to wait before terminating the search (int) Returns: an ordered list of EventData objects that satisfies the sequence """ seq_preds = [] for event in events: seq_preds.append(self.get_event_pred(event)) if history is None: history = self.get_event_test_history() return self.find_history_sequence(seq_preds, history, start, timeout)
[docs] def await_event_count( self, count, events=None, history=None, start="NOW", timeout=5 ): """ A search on the number of events received. By default, the call will only await until a correct count is achieved. The user can specify that this await also searches the current history by specifying a value for start. On timeout, the search will return the list of found event messages regardless of whether a correct count is achieved. Note: this search will always return a list of objects. The user should check if the search was completed. Args: count: either an exact amount (int) or a predicate to specify how many objects to find events: an event specifier or list of event specifiers (mnemonic, ID, or predicate) history: if given, a substitute history that the function will search and await start: an optional index or predicate to specify the earliest item to search timeout: the number of seconds to wait before terminating the search (int) Returns: a list of the EventData objects that were counted """ if events is None: search = None elif isinstance(events, list): e_preds = [] for event in events: e_preds.append(self.get_event_pred(event=event)) search = predicates.satisfies_any(e_preds) else: search = self.get_event_pred(event=events) if history is None: history = self.get_event_test_history() return self.find_history_count(count, history, search, start, timeout)
###################################################################################### # Event Asserts ######################################################################################
[docs] def assert_event( self, event, args=None, severity=None, time_pred=None, history=None, start=None, timeout=0, ): """ An assert on a single event message received. If the history doesn't have the correct message, the call will await until a correct message is received or the timeout, at which point it will assert failure. Args: event: an event specifier (mnemonic, id, or predicate) args: a list of expected arguments (list of values, predicates, or None for don't care) severity: an EventSeverity enum or a predicate to specify the event severity time_pred: an optional predicate to specify the flight software timestamp history: if given, a substitute history that the function will search and await start: an optional index or predicate to specify the earliest item to search timeout: the number of seconds to wait before terminating the search (int) Returns: the EventData object found during the search """ pred = self.get_event_pred(event, args, severity, time_pred) result = self.await_event( event, args, severity, time_pred, history, start, timeout ) msg = "checks if item search found a correct event" self.__assert_pred("Event received", pred, result, msg) return result
[docs] def assert_event_sequence(self, events, history=None, start=None, timeout=0): """ An assert that a sequence of event messages is received. If the history doesn't have the complete sequence, the call will await until the sequence is completed or the timeout, at which point it will assert failure. Note: It is reccomended (but not enforced) not to specify timestamps for this assert. Args: events: an ordered list of event specifiers (mnemonic, id, or predicate) history: if given, a substitute history that the function will search and await start: an optional index or predicate to specify the earliest item to search timeout: the number of seconds to wait before terminating the search (int) Returns: an ordered list of EventData objects that satisfied the sequence """ results = self.await_event_sequence(events, history, start, timeout) len_pred = predicates.equal_to(len(events)) msg = "checks if sequence search found every event" self.__assert_pred("Event sequence", len_pred, len(results), msg) return results
[docs] def assert_event_count( self, count, events=None, history=None, start=None, timeout=0 ): """ An assert on the number of events received. If the history doesn't have the correct event count, the call will await until a correct count is achieved or the timeout, at which point it will assert failure. Args: count: either an exact amount (int) or a predicate to specify how many objects to find events: optional event specifier or list of specifiers (mnemonic, id, or predicate) history: if given, a substitute history that the function will search and await start: an optional index or predicate to specify the earliest item to search timeout: the number of seconds to wait before terminating the search (int) Returns: a list of the EventData objects that were counted """ results = self.await_event_count(count, events, history, start, timeout) count_pred = ( count if predicates.is_predicate(count) else predicates.equal_to(count) ) msg = "checks if count search found a correct amount of events" self.__assert_pred("Event count", count_pred, len(results), msg) return results
###################################################################################### # History Searches ######################################################################################
[docs] class __HistorySearcher: """ This class defines the calls made by the __history_search helper method and has a unique implementation for each type of search provided by the api. """ def __init__(self): self.ret_val = None self.repeats = False
[docs] def search_current_history(self, items): """ Searches the scoped existing history Return: True if the search was satisfied, False otherwise """ raise NotImplementedError()
[docs] def get_return_value(self): """ Returns the result of the search whether the search is successful or not """ return self.ret_val
[docs] def requires_repeats(self): """ Returns a flag to determine if the history searcher needs repeated data objects when receive order does not match chronological order. """ return self.repeats
class TimeoutException(Exception): """ This exception is used by the history searches to signal the end of the timeout. """
[docs] def __timeout_sig_handler(self, signum, frame): raise self.TimeoutException()
[docs] def __search_test_history(self, searcher, name, history, start=None, timeout=0): """ This helper method contains the common logic to all search methods in the test API. This means searches on both the event and channel histories rely on this helper. Each history search is performed on both current history items and then on items that have yet to be added to the history. The API defines these two scopes using the variables start and timeout. They have several useful behaviors. start is used to pick the earliest item to search in the current history. start can be specified as either a predicate to search for the first item, an index of the history, the API variable NOW, or an instance of the TimeType timestamp object. The behavior of NOW is to ignore the current history and only search awaited items until the timestamp. The behavior of giving a timetype is to only search items that happened at or after the specified timestamp. timeout is a specification of how long to await future items in seconds. Specifying a timeout of 0 will ignore all future items. The timeout specifies an increment of time relative to the local clock, not the embedded application's clock. Note: the API does not try to check for edge cases where the final item in a search is received as the search times out. The user should ensure that their timeouts are sufficient to complete any awaiting searches. Finally, the test API supports the ability to substitute a history object for any search. This is part of why history must be specified for each search Args: searcher: an instance of __HistorySearcher to execute search-specific logic name: a string name to differentiate the type of search history: the TestHistory object to conduct the search on start: an index, a predicate, the NOW variable, or a TimeType timestamp to pick the first item to search timeout: the number of seconds to await future items """ if start == self.NOW: start = history.size() elif isinstance(start, TimeType): time_pred = predicates.greater_than_or_equal_to(start) e_pred = self.get_telemetry_pred(time_pred=time_pred) t_pred = self.get_event_pred(time_pred=time_pred) start = predicates.satisfies_any([e_pred, t_pred]) current = history.retrieve(start) if searcher.search_current_history(current): return searcher.get_return_value() if timeout: self.__log(name + " now awaiting for at most {} s.".format(timeout)) check_repeats = isinstance(history, ChronologicalHistory) try: signal.signal(signal.SIGALRM, self.__timeout_sig_handler) signal.alarm(timeout) while True: if check_repeats: new_items = history.retrieve_new(searcher.requires_repeats()) else: new_items = history.retrieve_new() for item in new_items: if searcher.incremental_search(item): return searcher.get_return_value() time.sleep(0.1) except self.TimeoutException: self.__log( name + " timed out and ended unsuccessfully.", TestLogger.YELLOW ) finally: signal.alarm(0) else: self.__log(name + " ended unsuccessfully.", TestLogger.YELLOW) return searcher.get_return_value()
[docs] def find_history_item(self, search_pred, history, start=None, timeout=0): """ This function can both search and await for an element in a history. The function will return the first valid object it finds. The search will return when an object is found, or the timeout is reached. Args: search_pred: a predicate to specify a history item. history: the history that the function will search and await start: an index or predicate to specify the earliest item from the history to search timeout: the number of seconds to wait before terminating the search (int) Returns: the data object found during the search, otherwise, None """ class __ItemSearcher(self.__HistorySearcher): def __init__(self, log, search_pred): super().__init__() self.log = log self.search_pred = search_pred self.repeats = False self.ret_val = None msg = "Beginning an item search for an item that satisfies:\n {}".format( self.search_pred ) self.log(msg, TestLogger.YELLOW) def search_current_history(self, items): for item in items: if self.incremental_search(item): return True return False def incremental_search(self, item): if self.search_pred(item): msg = "History search found the specified item: {}".format(item) self.log(msg, TestLogger.YELLOW) self.ret_val = item return True return False searcher = __ItemSearcher(self.__log, search_pred) return self.__search_test_history( searcher, "Item search", history, start, timeout
)
[docs] def find_history_sequence(self, seq_preds, history, start=None, timeout=0): """ This function can both search and await for a sequence of elements in a history. The function will return a list of the history objects to satisfy the sequence search. The search will return when an order of data objects is found that satisfies the entire sequence, or the timeout occurs. Note: this search will always return a list of objects. The user should check if the search was completed. Args: seq_preds: an ordered list of predicate objects to specify a sequence history: the history that the function will search and await start: an index or predicate to specify the earliest item from the history to search timeout: the number of seconds to wait before terminating the search (int) Returns: a list of data objects that satisfied the sequence """ class __SequenceSearcher(self.__HistorySearcher): def __init__(self, log, seq_preds): super().__init__() self.log = log self.ret_val = [] self.seq_preds = seq_preds.copy() self.repeats = True msg = "Beginning a sequence search of {} items.".format( len(self.seq_preds) ) self.log(msg, TestLogger.YELLOW) def search_current_history(self, items): if len(self.seq_preds) == 0: msg = "Sequence search finished, as the specified sequence had 0 items." self.log(msg, TestLogger.YELLOW) return True for item in items: if self.incremental_search(item): return True return False def incremental_search(self, item): if self.seq_preds[0](item): self.log("Sequence search found the next item: {}".format(item)) self.ret_val.append(item) self.seq_preds.pop(0) if len(self.seq_preds) == 0: self.log( "Sequence search found the last item.", TestLogger.YELLOW ) return True return False searcher = __SequenceSearcher(self.__log, seq_preds) return self.__search_test_history( searcher, "Sequence search", history, start, timeout
)
[docs] def find_history_count( self, count, history, search_pred=None, start=None, timeout=0 ): """ This function first counts all valid items in the current history, then can await until a valid number of elements is received by the history. The function will return a list of the history objects counted by the search. The search will return when a correct count of data objects is found, or the timeout occurs. Note: this search will always return a list of objects. The user should check if the search was completed. Args: count: either an exact amount (int) or a predicate to specify how many objects to find history: the history that the function will search and await search_pred: a predicate to specify which items to count. If left blank, all will count start: an index or predicate to specify the earliest item from the history to search timeout: the number of seconds to wait before terminating the search (int) Returns: a list of data objects that were counted during the search """ class __CountSearcher(self.__HistorySearcher): def __init__(self, log, count, search_pred): super().__init__() self.log = log self.ret_val = [] if predicates.is_predicate(count): self.count_pred = count else: self.count_pred = predicates.equal_to(count) self.search_pred = search_pred self.repeats = False msg = "Beginning a count search for an amount of items ({}).".format( self.count_pred ) self.log(msg, TestLogger.YELLOW) def search_current_history(self, items): if self.search_pred is None: self.search_pred = predicates.always_true() self.ret_val = items else: for item in items: if search_pred(item): self.log( "Count search counted another item: {}".format(item) ) self.ret_val.append(item) if self.count_pred(len(self.ret_val)): msg = "Count search found a correct amount: {}".format( len(self.ret_val) ) self.log(msg, TestLogger.YELLOW) return True return False def incremental_search(self, item): if self.search_pred(item): self.log("Count search counted another item: {}".format(item)) self.ret_val.append(item) if self.count_pred(len(self.ret_val)): msg = "Count search found a correct amount: {}".format( len(self.ret_val) ) self.log(msg, TestLogger.YELLOW) return True return False searcher = __CountSearcher(self.__log, count, search_pred) return self.__search_test_history( searcher, "Count search", history, start, timeout
) ###################################################################################### # API Helper Methods ######################################################################################
[docs] def __log(self, message, color=None, style=None, sender="Test API", case_id=None): """ Logs and prints an API Message. If the API isn't using a Logger, then only a message will be printed. Args: message: a string containing the message to log color: a color hex code (str) style: a style option from test logger (str) ["BOLD", "ITALICS", "UNDERLINED"] sender: a marker for where the message originated case_id: a tag for the current test case. Only needs to be set when a new case starts """ if not isinstance(message, str): message = str(message) if self.logger: self.logger.log_message(message, sender, color, style, case_id)
# TODO: Re-add way of printing to console?
[docs] def __assert_pred(self, name, predicate, value, msg="", expect=False): """ Helper to assert that a value satisfies a predicate. Includes arguments to provide more descriptive messages in the logs. Args: name: short name describing the check predicate: an instance of predicate to determine if the assert is successful value: the object evaluated by the predicate msg: a string message to describe what the assert is checking expect: a boolean value: True will be have as an expect and not raise an assertion. Returns: True if the assertion was successful, False otherwise """ name = name + (" expectation" if expect else " assertion") pred_msg = predicates.get_descriptive_string(value, predicate) if predicate(value): ast_msg = name + " succeeded: {}\nassert ".format(msg) + pred_msg self.__log(ast_msg, TestLogger.GREEN) if not expect: assert True, pred_msg return True else: ast_msg = name + " failed: {}\nassert ".format(msg) + pred_msg if not expect: self.__log(ast_msg, TestLogger.RED) assert False, pred_msg else: self.__log(ast_msg, TestLogger.ORANGE) return False
[docs] def data_callback(self, data, sender=None): """ Data callback used by the api to log events and detect when they are received out of order. Args: data: object to store """ if self.event_log_filter(data): msg = "Received EVR: {}".format(data.get_str(verbose=True)) self.__log(msg, TestLogger.BLUE, sender="GDS") if self.last_evr is not None and data.get_time() < self.last_evr.get_time(): msg = "API detected out of order evrs!" msg = msg + "\nReceived First:{}".format( self.last_evr.get_str(verbose=True) ) msg = msg + "\nReceived Second:{}".format(data.get_str(verbose=True)) self.__log(msg, TestLogger.ORANGE) self.last_evr = data