Synthetic Mode Generation

This notebook shows how modes can be elaborated from health states in an fmdtools model.

from fmdtools.sim.sample import SampleApproach, FaultDomain, FaultSample
import fmdtools.sim.propagate as prop
import matplotlib.pyplot as plt
plt.rcParams['pdf.fonttype'] = 42
import multiprocessing as mp
import pandas as pd
import time

The Rover model is in defined, along with a few analysis methods.

from examples.rover.rover_model import Rover, RoverParam

Below we compare the space of hazards revealed by querying the model with: - identifed modes (modes that we identify up-front) - elaborated modes (modes generated by lists of forseeable parameter values) - randomly-generated modes (modes generated by randomly sampling ranges of parameter values)

from fmdtools.analyze.graph import FunctionArchitectureFxnGraph
mdl_illust = Rover()
mg = FunctionArchitectureFxnGraph(mdl_illust)
fig, ax = mg.draw()
colors = ["goldenrod", "magenta", "blue"]

fig.savefig("rover_structure.pdf", format="pdf", bbox_inches = 'tight', pad_inches = 0)
fig.savefig("rover_structure.eps", format="eps", bbox_inches = 'tight', pad_inches = 0)
from mpl_toolkits.mplot3d import proj3d
def plot_hstate_modes(mdl, el_size=10, id_size=30, nom_size=50, el_alpha=1.0, mdl_range=[],
                      range_alpha=1.0, range_size=10, title = "Space of Fault Modes", colors=["yellow", "orange", "red", "blue"]):
    x= [hstates.get('friction', 0.0) for mode, hstates in mdl.fxns['drive'].m.mode_state_dict.items()  if mode[:5]=='hmode']
    y= [hstates.get('transfer', 1.0) for mode, hstates in mdl.fxns['drive'].m.mode_state_dict.items()  if mode[:5]=='hmode']
    z= [hstates.get('drift', 1.0) for mode, hstates in mdl.fxns['drive'].m.mode_state_dict.items()  if mode[:5]=='hmode']

    x_id= [hstates.get('friction', 1.0) for mode, hstates in mdl.fxns['drive'].m.mode_state_dict.items() if mode[:5]!='hmode']
    y_id= [hstates.get('transfer', 1.0) for mode, hstates in mdl.fxns['drive'].m.mode_state_dict.items() if mode[:5]!='hmode']
    z_id= [hstates.get('drift', 0.0) for mode, hstates in mdl.fxns['drive'].m.mode_state_dict.items() if mode[:5]!='hmode']
    id_labels= [mode for mode, hstates in mdl.fxns['drive'].m.mode_state_dict.items() if mode[:5]!='hmode']

    fig = plt.figure()

    ax = fig.add_subplot(projection='3d')

    if mdl_range:
        x_r= [hstates.get('friction', 0.0) for mode, hstates in mdl_range.fxns['drive'].m.mode_state_dict.items()  if mode[:5]=='hmode']
        y_r= [hstates.get('transfer', 1.0) for mode, hstates in mdl_range.fxns['drive'].m.mode_state_dict.items()  if mode[:5]=='hmode']
        z_r= [hstates.get('drift', 1.0) for mode, hstates in mdl_range.fxns['drive'].m.mode_state_dict.items()  if mode[:5]=='hmode']
        ax.scatter(x_r, y_r, z_r, label='range', color=colors[0], s=range_size, alpha=range_alpha)

    ax.scatter(x, y, z, label='set', color=colors[1], s=el_size, alpha=el_alpha)
    ax.scatter(x_id, y_id, z_id, label='identified', marker='X', color=colors[2], s=id_size, alpha=0.8)
    ax.scatter([1.0], [1.0], [0.0], label='nominal', marker='*', color=colors[3], s=nom_size)


    ax2 = fig.add_subplot()
    xmin, _, _ = proj3d.proj_transform(ax.get_xlim()[0], ax.get_ylim()[0], 0,  ax.get_proj())
    xmax, _, _ = proj3d.proj_transform(ax.get_xlim()[1], ax.get_ylim()[1], 0,  ax.get_proj())
    _, ymin, _ = proj3d.proj_transform(ax.get_xlim()[1], ax.get_ylim()[0], 0,  ax.get_proj())
    _, ymax, _ = proj3d.proj_transform(ax.get_xlim()[0], ax.get_ylim()[1], 0,  ax.get_proj())
    ylims = proj3d.proj_transform(ax.get_xlim()[0], ax.get_ylim()[1], 1,  ax.get_proj())
    ax2.set_xlim(xmin, xmax)
    ax2.set_ylim(ymin-0.01, ymax+0.01)
    ax2.set_position((0.30, 0.3, 0.42, 0.4))
    for i, label in enumerate(id_labels):
        #ax.text(x_id[i], y_id[i], z_id[i], label, ha='left', va='bottom')
        xlab, ylab, _ = proj3d.proj_transform(x_id[i], y_id[i], z_id[i],  ax.get_proj())
        if label=='stuck_left': xyt=(-20,-30)
        elif label=='stuck': xyt=(20,-30)
        else: xyt=(20,30)
        plt.annotate(label, xy=(xlab, ylab), xytext=xyt,
            textcoords='offset points', ha='center', va='bottom',
            bbox=dict(boxstyle='round,pad=0.2', fc='white', alpha=1.0),
            arrowprops=dict(arrowstyle='-|>', color=colors[2]))
        #ax2.text(xlab, ylab, label, ha='left', va='bottom')

    handles, labels = ax.get_legend_handles_labels()
    ax2.legend(handles, labels, loc=(0.3,-0.2), framealpha=0.55, facecolor='white')
    ax2.get_legend().legendHandles[0]._sizes = [20]
    return fig

Below we compare the space of fault modes generated by our identified sets:

mdl_set = Rover(p={"drive_modes": {"mode_args": 'set-manual'}, 'ground':{'linetype': 'turn'},
                   'path_param': {'buffer_on': 1.0, 'buffer_poor': 1.5, 'buffer_near': 2.0}})
mdl_range = Rover(p={"drive_modes": {"mode_args": 'range-manual-all'},'ground':{'linetype': 'turn'},
                    'path_param': {'buffer_on': 1.0, 'buffer_poor': 1.5, 'buffer_near': 2.0}})
fig =plot_hstate_modes(mdl_set, mdl_range= mdl_range,range_alpha=0.8, range_size=1, title='', colors=colors+["black"])
fig.savefig("hazard_space.pdf", format="pdf", bbox_inches = 'tight', pad_inches = 0)
fig.savefig("hazard_space.eps", format="eps", bbox_inches = 'tight', pad_inches = 0)
Below we look at how this results in different potential analyses…

This function plots the end distribution of a given attribue from the model endclasses.

def plot_enddist(endclasses, att='end_dist', range=(0,3), endclasses_range=[], density=False, yscale="log",
                 fail_thresh=1.0, thresh_label="Off Course", x_lab="Line Distance", n_bins = 8,
                 title="Results Distributions Given From Approaches", colors=["yellow", "orange", "red"],
                hatching = ['xx', '//', '..']):
    if endclasses_range:
        x_range_class = [endclass for scen, endclass in endclasses_range.get_values(att).items() if scen[6:11]=='hmode']
        n,bins,patches = plt.hist(x_range_class, density=density, label='range', range=range, color=colors[0],
                                  hatch=hatching[0], bins=n_bins, edgecolor="white")
        x_elaborated_class = [endclass for scen, endclass in endclasses.get_values(att).items() if scen[6:11]=='hmode']
        n,bins,patches = plt.hist(x_elaborated_class, density=density, alpha=0.8, label='set', range=range, bins=bins,
                                  color=colors[1], hatch=hatching[1], edgecolor="white")
        x_elaborated_class = [endclass for scen, endclass in endclasses.get_values(att).items() if scen[6:11]=='hmode']
        n,bins,patches = plt.hist(x_elaborated_class, density=density, label='set', range=range,
                                  color=colors[1], hatch=hatching[1], bins=n_bins, edgecolor="white")

    if fail_thresh: plt.axvline(fail_thresh, label=thresh_label, color="black")

    x_identified_class = [endclass for scen, endclass in endclasses.get_values(att).items() if scen[6:11]!='hmode' and scen[0:6]!='nominal']
    label_identified_class = [scen[6:] for scen, endclass in endclasses.get_values(att).items() if scen[6:11]!='hmode'and scen[0:6]!='nominal']

    _,_,_ = plt.hist(x_identified_class, density=density, alpha=0.8, bins=bins, label='identified', color=colors[2], hatch=hatching[2], edgecolor="white")
    plt.grid(axis='y', which='both', zorder=0)
    if yscale=="log": plt.ylabel("# of Scens (log)")
    else:             plt.ylabel("Distribution")
    return plt.gcf()

Finding phases so the Approach can inject the fault halfway through the simulation.

import fmdtools.analyze.phases as ph
endresults, mdlhist = prop.nominal(mdl_set)
phasemap = ph.from_hist(mdlhist)
--rate:                              1.0
--cost:                                0
--prob:                              1.0
--expected_cost:                       0
--in_bound:                         True
--at_finish:                        True
--line_dist:                           1
--num_modes:                           0
--end_dist:                          0.0
--tot_deviation:    0.005246344989065292
--faults:                             {}
--classification:        nominal mission
--end_x:              29.813614084369863
--end_y:               17.26588133276667
--endpt:                        array(2)

Simulating the set and range approach. Note that the identified modes are present in the range and mode approaches, so we end up taking this data from these results afterwards.

We simulate it here to get some idea of the simulation time.

t_id_0 = time.time()
mdl_id = Rover(p={"drive_modes": {"mode_args": 'manual'},
                  'ground':{'linetype': 'turn', 'buffer_on': 0.5, 'buffer_poor': 0.8, 'buffer_near': 1.0}},
               sp={'end_condition': ''})
fault_domain_id = FaultDomain(mdl_id)
fault_sample_id = FaultSample(fault_domain_id, phasemap['plan_path'])
results_id, mdlhists_id = prop.fault_sample(mdl_id, fault_sample_id, staged=True, pool=mp.Pool(5))
t_id = time.time()-t_id_0
t_set_0 = time.time()
mdl_set = Rover(p={"drive_modes": {"mode_args": 'set-manual'},
                   'ground':{'linetype': 'turn', 'buffer_on': 0.5, 'buffer_poor': 0.8, 'buffer_near': 1.0}},
                   sp={'end_condition': ''})
fd_set = FaultDomain(mdl_set)
fs_set = FaultSample(fd_set, phasemap['plan_path'])
results_set, mdlhists_set = prop.fault_sample(mdl_set, fs_set, staged=True, pool=mp.Pool(5))
t_set = time.time() - t_set_0
t_range_0 = time.time()
mdl_range = Rover(p={"drive_modes": {"mode_args": 'range-manual-all'},
                     'ground':{'linetype': 'turn', 'buffer_on': 0.5, 'buffer_poor': 0.8, 'buffer_near': 1.0}},
                  sp={'end_condition': ''})
fd_range = FaultDomain(mdl_range)
fs_range = FaultSample(fd_range, phasemap['plan_path'])
results_range, mdlhists_range = prop.fault_sample(mdl_range, fs_range, staged=True, pool=mp.Pool(5))
t_range = time.time() - t_range_0
sim_times = [t_id, t_set, t_range]
[1.3298428058624268, 1.972724437713623, 97.14148330688477]
fig = plot_enddist(results_set, endclasses_range=results_range, range=(0,2), title='', colors=colors) #, colors = ['red','green','blue']
fig.savefig("line_dist.pdf", format="pdf", bbox_inches = 'tight', pad_inches = 0)
fig.savefig("line_dist.eps", format="eps", bbox_inches = 'tight', pad_inches = 0)
as well as the discovery of fault trajectories…

geoms = {'start': {'shapes': {'shape': {'color': 'black'}, 'near': {'color': 'grey', 'linestyle': 'dotted'}}},
        'end': {'shapes': {'shape': {'color': 'black'}, 'near': {'color': 'grey', 'linestyle': 'dotted'}}},
        'line': {'shapes': {'shape': {'color': 'black'}, 'near': {'color': 'grey', 'linestyle': 'dotted'}}}}

fig, ax = mdl_set.flows['ground'], figsize = (8,8))

# comment out for tests/speed sake
# fig, ax = mdlhists_range.plot_trajectories("flows.pos.s.x", "flows.pos.s.y",
#                                         comp_groups = {'range': list(mdlhists_range.nest().keys())},
#                                         fig=fig, ax=ax, color = colors[2])
fig, ax = mdlhists_set.plot_trajectories("flows.pos.s.x", "flows.pos.s.y",
                                         comp_groups = {'set': list(mdlhists_set.nest().keys())},
                                         color = colors[0], fig=fig, ax=ax)
fig, ax = mdlhists_id.plot_trajectories("flows.pos.s.x", "flows.pos.s.y",
                                        comp_groups = {'id': list(mdlhists_id.nest().keys())},
                                        fig=fig, ax=ax, color = colors[1])
fig.savefig("fault_map.pdf", format="pdf", bbox_inches = 'tight', pad_inches = 0)
fig.savefig("fault_map.eps", format="eps", bbox_inches = 'tight', pad_inches = 0)
Cluster Analysis

Below, we use clustering to identify similar sets of synthetic modes.

# import pip
# pip install scikit-learn
from sklearn.cluster import DBSCAN as cluster #DBSCAN
import numpy as np
def get_X (mdlhists, hmode = True):
    if hmode:
        x = np.array([[value[-1]] for scen, value in mdlhists.get_values('flows.pos.s.x').items() if 'hmode' in scen])
        y = np.array([[value[-1]] for scen, value in mdlhists.get_values('flows.pos.s.y').items() if 'hmode' in scen])
        x = np.array([[value[-1]] for scen, value in mdlhists.get_values('flows.pos.s.x').items() if 'hmode' not in scen])
        y = np.array([[value[-1]] for scen, value in mdlhists.get_values('flows.pos.s.y').items() if 'hmode' not in scen])
    return np.concatenate((x, y), axis=1)

To find similar trajectories, we will cluster on the x-y coordinates of the end position of the Rover accross these scenarios:

X_range = get_X (mdlhists_range)
ec_range = [endclass for scen, endclass in results_range.get_values('end_dist').items() if scen[6:11]=='hmode']
X_set = get_X (mdlhists_set)
ec_set = [endclass for scen, endclass in results_set.get_values('end_dist').items() if scen[6:11]=='hmode']
X_id = get_X (mdlhists_set, hmode = False)
ec_id = [endclass for scen, endclass in results_set.get_values('end_dist').items() if scen[6:11]!='hmode' and scen[0:6]!='nominal']

To identify where all the clusters are accross all three approaches, we first have to combine the three sets of data into one (otherwise it might cluster differently).

X = np.concatenate((X_range, X_set, X_id))

This runs the clustering algorithm:

cl = cluster().fit(X)
labels = cl.labels_

The number of clusters is:

unique_labels = set(labels)
len(unique_labels)- (1 if -1 in labels else 0)

Here we plot the clusters on the course map to see what the algorithm identified.

mdl_set.flows['ground'], figsize = (8,8))
#clust_colors = [ for each in np.linspace(0, 1, len(unique_labels))]
clust_colors ='viridis', len(unique_labels)+1)
markers = ["+","*","x","^","."]

for i, k in enumerate(unique_labels):
    x_label = [x[0] for i,x in enumerate(X) if labels[i]==k]
    y_label = [x[1] for i,x in enumerate(X) if labels[i]==k]
    if k == -1: clust_lab = 'unclustered'
    else:       clust_lab = 'cluster '+str(k)
    plt.scatter(x_label, y_label, zorder=i, label=clust_lab, color=clust_colors.colors[i], marker=markers[i])

#plt.scatter(X_set[:,0],X_set[:,1], label='set', color='black', marker='+')

#plt.scatter(X_id[:,0],X_id[:,1], label='identified', color='gray', marker='X')

plt.xlabel('x-distance (m)')
plt.ylabel('y-distance (m)')

fig = plt.gcf()
fig.savefig("cluster_map.pdf", format="pdf", bbox_inches = 'tight', pad_inches = 0)
fig.savefig("cluster_map.eps", format="eps", bbox_inches = 'tight', pad_inches = 0)
Next, we would like to see how these approaches span the different clusters. If an approach has more coverage, we might say that it better explored the space.

categorizations = {tuple(x):labels[i] for i,x in enumerate(X)}
cat_values = {lab:{tuple(x) for x in X if lab==categorizations[tuple(x)]} for lab in set(labels)}

These are the categories found by each approach:

cat_range = [categorizations[tuple(x)] for x in X_range]
{-1, 0, 1}
cat_id = [categorizations[tuple(x)] for x in X_id]
{0, 1}
cat_set = [categorizations[tuple(x)] for x in X_set]
{0, 1}

This function categorizes the results of each approach into the given clusters

def get_cats(categorizations, X, ec, unique_categorizations):
    for cat in unique_categorizations:
        cats[cat] = [ec[i] for i,x in enumerate(X) if categorizations[tuple(x)]==cat]+[0]
    return cats
cats = get_cats(categorizations,X_range, ec_range, unique_labels)
def worst_cat(categorizations, X, ec, unique_categorizations):
    for cat in unique_categorizations:
        cats = [ec[i] for i,x in enumerate(X) if categorizations[tuple(x)]==cat]+[0]
        worst_cats[cat] = np.max(cats)
    return worst_cats
worst_cat_id = worst_cat(categorizations,X_id, ec_id, unique_labels)
worst_cat_range = worst_cat(categorizations,X_range, ec_range, unique_labels)
worst_cat_set = worst_cat(categorizations,X_set, ec_set, unique_labels)

This table shows the worst-case identified in each category

cat_tab = pd.DataFrame([[*worst_cat_id.values()], [*worst_cat_set.values()], [*worst_cat_range.values()]], index = ["Identified", "Set", "Range"], columns = worst_cat_id.keys())
0 1 -1
Identified 18.536162 0.000000 0.000000
Set 18.536162 0.556991 0.000000
Range 18.621225 0.000000 13.954836

To view the coverage of each approach, here we plot the frequency of scenarios accross the clusters.

hatching = ['xx', '//', '..']
fig, ax = plt.subplots()
ax.hist(cat_range, bins, label='elaborated-range', color=colors[0], hatch=hatching[0], edgecolor="white")
ax.hist(cat_set, bins, label='elaborated-set', color=colors[1], hatch=hatching[1], edgecolor="white")
ax.hist(cat_id, bins, label='identified', color=colors[2], hatch=hatching[2], edgecolor="white")
ax.set_xticks(bins + 0.5)
ax.grid(axis='y', which='both')
ax.set_ylabel("frequency (log scale)")
fig.savefig("cluster_dist.pdf", format="pdf", bbox_inches = 'tight', pad_inches = 0)
fig.savefig("cluster_dist.eps", format="eps", bbox_inches = 'tight', pad_inches = 0)
Next, summarizing the overall stats from the approaches:

uncat_range = sum([i==-1 for i in cat_range])
uncat_set = sum([i==-1 for i in cat_set])
uncat_id = sum([i==-1 for i in cat_id])
uncat = [uncat_id, uncat_set, uncat_range]
lens = [len(X_id), len(X_set), len(X_range)]
num_cat = 100*np.array([len(set(cat_id))/len(unique_labels),len(set(cat_set))/len(unique_labels),len(set(cat_range))/len(unique_labels)])
res_tab = pd.DataFrame([lens, sim_times,num_cat, uncat])
res_tab.columns=['Identified', 'Elaborated-Set', 'Elaborated-Range']
res_tab.index=["Scenarios","Comp. Time (s)", "% Clusters", "Unclustered"]
Identified Elaborated-Set Elaborated-Range
Scenarios 5.000000 35.000000 1099.000000
Comp. Time (s) 1.329843 1.972724 97.141483
% Clusters 66.666667 66.666667 100.000000
Unclustered 0.000000 0.000000 20.000000
 & Identified & Elaborated-Set & Elaborated-Range \\
Scenarios & 5.00 & 35.00 & 1099.00 \\
Comp. Time (s) & 1.33 & 1.97 & 97.14 \\
% Clusters & 66.67 & 66.67 & 100.00 \\
Unclustered & 0.00 & 0.00 & 20.00 \\

Finally, we use the following to calculate more detailed statistics about the cluster coverage.

def calc_2d_cov_loss(xmin, xmax, ymin, ymax, xs, ys):
    return calc_coverage_loss(xmin, xmax, xs)*calc_coverage_loss(ymin, ymax, ys)
def calc_coverage_loss(xmin, xmax, xs):
    if xs:
        return (xmax-np.max(xs)+np.min(xs)-xmin)/(xmax-xmin)
        return  (xmax-xmin)/(xmax-xmin)
def get_xs_ys(values, X_type):
    return [v[0] for v in values if v in X_type], [v[1] for v in values if v in X_type]
cov_loss_id,cov_loss_set,cov_loss_range = [],[],[]
num_scens_id,num_scens_set,num_scens_range = [],[],[]
for cat, values in cat_values.items():
    xs = [v[0] for v in values]
    ys = [v[1] for v in values]
    xs_id, ys_id = get_xs_ys(values, X_id)
    xs_set, ys_set = get_xs_ys(values, X_set)
    xs_range, ys_range = get_xs_ys(values, X_range)
    xmin,xmax,ymin,ymax = np.min(xs), np.max(xs), np.min(ys), np.max(ys)

    cov_loss_id.append(calc_2d_cov_loss(xmin,xmax,ymin,ymax ,xs_id,ys_id))
    cov_loss_set.append(calc_2d_cov_loss(xmin,xmax,ymin,ymax ,xs_set,ys_set))
    cov_loss_range.append(calc_2d_cov_loss(xmin,xmax,ymin,ymax, xs_range,ys_range))


Coverage Compared to the range approach:

cov_tab = pd.DataFrame([cov_loss_id,cov_loss_set,cov_loss_range], columns=list(cat_values.keys()), index = ["Identified", "Set", "Range"])

0 1 -1
Identified 0.322338 0.704326 1.0
Set 0.101203 0.077128 1.0
Range 0.000000 0.000000 0.0

Number of scenarios per approach for each cluster:

scens_tab = pd.DataFrame([num_scens_id,num_scens_set,num_scens_range], columns=list(cat_values.keys()), index = ["Identified", "Set", "Range"], dtype=int)

0 1 -1
Identified 3 2 0
Set 18 6 0
Range 932 38 20

Combined Table:

comb = pd.concat({"# Scenarios":scens_tab, "Coverage Loss":cov_tab, "Worst-Case":cat_tab}, axis="columns",join='inner')
comb = comb.swaplevel(0, axis="columns")
#index = pd.MultiIndex.from_tuples(comb.columns)
#comb = comb.reindex(index, axis="columns")
comb = comb.sort_index(axis="columns")
comb = comb.T
#comb_2.index = comb.index
Identified Set Range
-1 # Scenarios 0.000000 0.000000 20.000000
Coverage Loss 1.000000 1.000000 0.000000
Worst-Case 0.000000 0.000000 13.954836
0 # Scenarios 3.000000 18.000000 932.000000
Coverage Loss 0.322338 0.101203 0.000000
Worst-Case 18.536162 18.536162 18.621225
1 # Scenarios 2.000000 6.000000 38.000000
Coverage Loss 0.704326 0.077128 0.000000
Worst-Case 0.000000 0.556991 0.000000
print(comb.to_latex( ))
 &  & Identified & Set & Range \\
\multirow[t]{3}{*}{-1} & # Scenarios & 0.000000 & 0.000000 & 20.000000 \\
 & Coverage Loss & 1.000000 & 1.000000 & 0.000000 \\
 & Worst-Case & 0.000000 & 0.000000 & 13.954836 \\
\multirow[t]{3}{*}{0} & # Scenarios & 3.000000 & 18.000000 & 932.000000 \\
 & Coverage Loss & 0.322338 & 0.101203 & 0.000000 \\
 & Worst-Case & 18.536162 & 18.536162 & 18.621225 \\
\multirow[t]{3}{*}{1} & # Scenarios & 2.000000 & 6.000000 & 38.000000 \\
 & Coverage Loss & 0.704326 & 0.077128 & 0.000000 \\
 & Worst-Case & 0.000000 & 0.556991 & 0.000000 \\

[ ]: