Source code for upsp.processing.kulite_utilities

"""Kulites class and helpers"""
import numpy as np
import os
import re
import scipy.io
from scipy import signal
from glob import glob
import pandas as pd
import csv
from collections import namedtuple
from collections import OrderedDict
import copy
import logging

from . import p3d_utilities

log = logging.getLogger(__name__)


[docs]class Kulites: """Load and manage kulite data Data is in PSI unless optional psf argument is True Parameters ---------- data_dir : str directory with ``*.info``, ``*.slow``, ``*.fast`` files run : int run number seq : int sequence number kulites : list kulites to load data_type : numpy.dtype data type of array elements f_type : str 'slow' or 'fast' Attributes ---------- data : dict Dictionary with Kulite names as keys and numnpy arrays with pressure time histoires as values. """ def __init__(self, data_dir, run, seq, kulites='all', data_type=np.float32, psf=False, f_type='slow'): # Create dictionary to hold data self.raw, raw = dict(), dict() self.native, native = dict(), dict() self.data, data = dict(), dict() # Get a list of all info files. There are a couple variations, # auto-detect using regexes. info_files = self.__autodetect_info_files(data_dir, run, seq) log.info('Autodetected info files: %s', str(info_files)) # Check each info file for the kulites of interest for info_file in sorted(info_files): # Load the header information info = self.__read_info(info_file) # Search for kulites of interest, or grab them all idcs, kuls = [], [] for idx, kul in enumerate(info.chanconfig.index): if (kulites == 'all' and 'K' in kul) or (kul in kulites): idcs.append(idx) kuls.append(kul) # If there is data of interest in this file, load it if len(idcs) > 0: vraw, vnative, vpsi = self.__read_data(info, idcs, f_type) for i in range(0, len(idcs)): if psf: vpsi[i] = vpsi[i] * 144 data[kuls[i]] = np.array(vpsi[i], dtype=data_type) raw[kuls[i]] = np.array(vraw[i], dtype=data_type) native[kuls[i]] = np.array(vnative[i], dtype=data_type) self.data = OrderedDict(sorted(data.items())) self.raw = OrderedDict(sorted(raw.items())) self.native = OrderedDict(sorted(native.items())) # Get frequency information if f_type == 'fast': row_idx = 0 elif f_type == 'slow': row_idx = 1 self.sample_rate = info.statistics['Sampling Rate'][row_idx] def __autodetect_info_files(self, data_dir, run: int, seq: int): # Find all *.info files corresponding to this datapoint (run, seq). # Searches based on filenames in the Kulite data folder. # # (If there are more Kulites than there are channels for a single PXI # unit, than there may be more than one *.info file). fnames = glob(os.path.join(data_dir, "*.info")) prefixes = [os.path.splitext(os.path.basename(fn))[0] for fn in fnames] patterns = [ r'T(?P<run>\d+)p(?P<seq>\d+)t\d+$', r'r(?P<run>\d{4})s(?P<seq>\d{3})t\d+_S\dC\d$' ] def match_run_seq(_p, _prefix): m = re.search(_p, _prefix) if m: return int(m.group('run'), base=10), int(m.group('seq'), base=10) return None # Assume if the first info filename matches the pattern, then ALL of them will. for p in patterns: first_prefix_match = match_run_seq(p, prefixes[0]) if first_prefix_match: return [ fn for fn, pf in zip(fnames, prefixes) if match_run_seq(p, pf) == (run, seq) ] return [] def __read_info(self, info_file): """ Read the info file and load in the header information Args: info_file (str): unitary info filename Returns: (namedtuple) with the full header information Raises: Exception: unexpected file format """ # Read all of the data into a list d = None with open(info_file, "r") as fheader: reader = csv.reader(fheader, delimiter="\t") d = list(reader) # Split the header file into its four parts: # filepaths, statistics, daqconfig, channelconfig # 1st line, 2nd column is the number of lines in the block (zero indexed) n_filepaths = int(d[0][1]) # pop all lines one at a time, don't keep the line numbers at the beginning group_filepaths = [d.pop(0)[2:] for line in range(n_filepaths + 1)] n_statistics = int(d[0][1]) group_statistics = [d.pop(0)[2:] for line in range(n_statistics + 1)] n_daqconfig = int(d[0][1]) group_daqconfig = [d.pop(0)[2:] for line in range(n_daqconfig + 1)] n_channelconfig = int(d[0][1]) group_channelconfig = [d.pop(0)[2:] for line in range(n_channelconfig + 1)] # Set up the output named tuple of dataframes filepaths_df = pd.DataFrame(group_filepaths, columns=group_filepaths.pop(0)) statistics_df = pd.DataFrame(group_statistics, columns=group_statistics.pop(0)) statistics_df = statistics_df.apply(pd.to_numeric, errors='ignore') daqconfig_df = pd.DataFrame(group_daqconfig, columns=group_daqconfig.pop(0)) daqconfig_df = daqconfig_df.apply(pd.to_numeric, errors='ignore') chanconfig_df = pd.DataFrame( group_channelconfig, columns=group_channelconfig.pop(0) ) # TODO discuss with Nate about this convention. We're trying to # bypass having to manually tell our processing code what Kulites # were plugged into what channel, and instead infer it from the # raw data. # # - Ideally, the "User Name" column is present, and contains the # name of the Kulite that was plugged into that channel. (As is the # case in the UPWT SLS data.) # # - In practice (e.g., in the TC3 2022-02 output data), the "User Name" # column may not be present. In this case, we need some other way to # find the Kulite/channel mapping. # # - For now, we assume that Kulites are named 'K01', 'K02', ..., where # the numbering corresponds to what channel they are plugged into. # if 'User Name' not in chanconfig_df.columns: log.warning(''.join([ '"User Name" column not found in %s. ', 'Will infer Kulite-to-channel mapping: ', 'K01 -> Channel 1, K02 -> Channel 2, etc.', ]), info_file) chanconfig_df['User Name'] = [ 'K%02d' % int(s) for s in chanconfig_df['User Channel'] ] chanconfig_df = chanconfig_df.set_index('User Name') chanconfig_df = chanconfig_df.apply(pd.to_numeric, errors='ignore') # There are several iterations on the content contained in the # *.info file channel info block. In general, the number of # columns in the block indicates the "version." # # Most importantly, we need to distinguish whether the sensor # calibration information is present. When it *is* present, we # don't need to load it via the separate *.mat or *.zPXImat file. # # NOTE: column counts here differ from the actual # columns in # the *.info file: # - two leading columns containing line/section numbers are removed # - the 'index' is the "User Name" column, so it is also removed if len(chanconfig_df.columns) == 18: Version = 4 elif len(chanconfig_df.columns) == 17: Version = 3 else: if chanconfig_df.columns[1] == "User Channel": Version = 2 elif chanconfig_df.columns[1] == "Physical Channel": Version = 1 else: raise Exception( "Can't determine file format. Did Kevin change it again?" ) log.info('Detected info file version: %d', Version) accepted_versions = [1, 4] if Version not in accepted_versions: raise Exception( "%s is version %d; only versions %s are accepted" % (info_file, Version, str(accepted_versions)) ) # Set up the output HeaderInfo = namedtuple( 'HeaderInfo', ['filename', 'filepaths', 'statistics', 'daqconfig', 'chanconfig'] ) info = HeaderInfo( info_file, filepaths_df, statistics_df, daqconfig_df, chanconfig_df ) return info def __read_data(self, info, channels, f_type): """ Read in the unitary data for the channels of interest and multiply by coefficients to dimensionalize Args: info (namedtuple) : info header data channels (List) : list of channel numbers (0-based) within file f_type (str) : 'fast' or 'slow' Returns: (List) of numpy arrays with data in dB? Raises: RuntimeError: invalid channel id """ # Identify data filename based on f_type: # 1. Load entry from info file for given f_type. # 2. If entry is not a valid path, then infer based on info filename. if f_type == 'fast': row_idx = 1 elif f_type == 'slow': row_idx = 2 filename = os.path.join( os.path.dirname(info.filename), info.filepaths['File Name'][row_idx] ) if not os.path.exists(filename): info_prefix, _ = os.path.splitext(info.filename) inferred_filename = info_prefix + '.' + f_type # log.warning( # '*.info points to non-existent *.%s file (%s); inferring: %s', # f_type, filename, inferred_filename # ) filename = inferred_filename print(filename) def try_load_cal(): prefix = os.path.splitext(filename)[0] cal_file = prefix + '.zPXImat' if not os.path.exists(cal_file): return None return scipy.io.loadmat(cal_file) raw = [] native = [] data = [] # Get the frequency ID for the type of file if f_type == 'fast': freqID = 0 elif f_type == 'slow': freqID = 1 # shortcut to remove a bunch of typing stats = info.statistics chan = info.chanconfig nChannels = stats.loc[freqID, 'Number Channels'] nSamples = stats.loc[freqID, 'Samples Acquired'] # Read the full binary file bin_data = None with open(filename, 'rb') as f: dtypestr = '(1,)i4' dt = np.dtype(dtypestr) bin_data = np.fromfile(f, dtype=dt) bin_data = np.reshape(bin_data, (nSamples, nChannels)) cal = try_load_cal() if cal: print('calibration matrix:', cal['calibrationmatrix'].shape) # Perform import via numpy array # type is nChannel columns of int32 (i4) for channel in channels: if channel > nChannels: raise RuntimeError('Invalid channel ID, cannot read') vraw = bin_data[:, channel] # apply native scaling and native offset (counts to volts) vnative = vraw * chan['Coeff k1'][channel] + chan['Coeff k0'][channel] if cal: # Use calibration data from separate file # % V/psi with gain sens. Convert to Pa/V cal_term = cal['calibrationmatrix'][channel, 2] # TODO confirm magic number is conversion from psi to Pa. # 1 psi = 6894.757 Pa # We don't need the conversion because we want to keep data in psi. # micsens = 1 / (cal_term / 6894.757) micsens = 1 / cal_term vdata = vnative * micsens else: # Use calibration data from chanconfig # Apply instrument sensitivity and offset (volts to engineering units) # I think this is in psi vdata = vnative * chan['EU C1'][channel] / \ (10**(chan['EU Gain (dB)'][channel] / 20)) + \ chan['EU C0'][channel] print( 'C[%02d]' % (channel + 1,), vraw.shape, 'Counts:', f'{np.min(vraw):+10d}', f'{np.max(vraw):+10d}', 'V:', f'{np.min(vnative):.3f}', f'{np.max(vnative):.3f}', 'psi:', f'{np.min(vdata):.3f}', f'{np.max(vdata):.3f}', ) raw.append(vraw) native.append(vnative) data.append(vdata) return raw, native, data
###############################################################################
[docs]def read_tgts(tgts_file, kulites='all'): """Read in the tgts file data and return the coordinates of the kulites Parameters ---------- tgts_file : str targets data file kulites : list, optional list of kulite strings. If "all" (default), all targets identified as Kulites are returned. Returns ------- pos : dict of list (x,y,z) positions for each kulite keyed by name """ pos = dict() with open(tgts_file, 'r') as f: for line in f: vals = line.strip().split() # Fiducial marks do not have an extra column with the kulite name if len(vals) > 11 and 'K' in vals[11] and 'pK' not in vals[11] \ and 'mK' not in vals[11]: if kulites != 'all': if vals[11] in kulites: pos[vals[11]] = [float(vals[1]), float(vals[2]), float(vals[3])] else: pos[vals[11]] = [float(vals[1]), float(vals[2]), float(vals[3])] return pos
[docs]def compute_delta_rms(kulites, sosfilter=None): """ calculate rms values for each kulite by subtracting off the mean, and return a dict. if sosfilter is provided, applies it to the timeseries before calculating rms """ def calc_rms(a): return np.sqrt(np.mean(a ** 2)) rms_dict = dict() for name, data in kulites.data.items(): if sosfilter is not None: filtered = signal.sosfilt(sosfilter, data) else: filtered = data rms_dict[name] = calc_rms(filtered - np.mean(filtered)) return rms_dict
[docs]def compute_rms_from_psd(kulites, startfreq=None): """ calculate rms values for each kulite in the input collection by integrating its psd if startfreq is provided, integrates from startfreq upwards """ rms_dict = dict() psds = compute_psd(kulites) freqs = psds['freq'] for name, data in psds.items(): if name == 'freq': continue if startfreq is None: rms_dict[name] = np.sum(data) else: i_desired = np.where(freqs >= startfreq) rms_dict[name] = np.sum(data[i_desired]) return rms_dict
[docs]def apply_filter(kulites, sosfilter): """ Apply an sos filter to the given kulite collection, and return a new kulites object. Creates a deep copy in case you want to use this for comparing filtered vs unfiltered collections, and want to hang on to an unmodified version too """ filtered_kulites = copy.deepcopy(kulites) for name, timeseries in kulites.data.items(): filtered_kulites.data[name] = signal.sosfilt(sosfilter, timeseries) return filtered_kulites
[docs]def compute_psd(kulites, w_len=1024): """Compute PSDs for kulite data Parameters ---------- kulites : Kulites Kulites object w_len : int, optional window length Returns ------- dict Dictionary of PSDs for each kulite, with an additional entry "freq" for the frequency bins """ data = dict() # Compute PSD freq = None for key, val in kulites.data.items(): a = val if len(val.shape) > 1: if val.shape[1] == 1: a = np.transpose(val) srate = kulites.sample_rate freq, data[key] = signal.welch(a, fs=srate, window='hanning', nperseg=w_len, detrend='linear') data['freq'] = freq return data
# FIXME this is broken?
[docs]def create_kulite_grid(tgts_file, kul_strs): """Generate a structured grid of kulite locations Parameters ---------- tgts_file : str TGTS kulite location file kul_strs : list kulite names to include in plots Returns ------- kul_grid : StructGrid kulite positions in cartesion and cylindrical form """ # Load the kulite positions kulite_pos = kul_strs.read_tgts(tgts_file, kul_strs) # Generate grid kul_grid = tgts_file.StructGrid() kul_grid.sz.append([len(kul_strs), 1, 1]) for kul, pos in kulite_pos.items(): kul_grid.name.append(kul) kul_grid.x.append(pos[0]) kul_grid.y.append(pos[1]) kul_grid.z.append(pos[2]) kul_grid.r, kul_grid.theta = p3d_utilities.to_cylindrical(kul_grid.y, kul_grid.z) return kul_grid
[docs]def write_kulite_positions_p3d(output_file, tgts_file, kul_strs): """Generate a 1D plot3d grid with kulite locations Parameters ---------- output_file : str output p3d file tgts_file : str TGTS kulite location file kul_strs : list kulite names to include in plots """ kul_grid = create_kulite_grid(tgts_file, kul_strs) kul_grid.write_p3d(output_file)
[docs]def read_targets_matrix(tgts_file, kul_strs): """Generate a list of kulite data Parameters ---------- tgts_file : str TGTS kulite location file kul_strs : list kulite names to include in plots. Returns ------- list of list Kulite data of the form: ``name, x, y, z, r, theta, i, j, k, diam, zone`` """ if kul_strs != 'all': mat = np.empty([len(kul_strs), 11], dtype=object) idx = 0 else: mat = np.empty([0, 11], dtype=object) with open(tgts_file, 'r') as f: for line in f: vals = line.strip().split() # Only lines with all entries, and only kulites (not painted or masked) if len(vals) > 11 and 'K' in vals[11] and 'pK' not in vals[11] \ and 'mK' not in vals[11]: # Extract information x = vals[1] y = vals[2] z = vals[3] r_val, theta_val = p3d_utilities.to_cylindrical(float(y), float(z)) r = str(r_val) theta = str(theta_val) i = vals[4] j = vals[5] k = vals[6] d = vals[7] zone = vals[8] if kul_strs != 'all': if vals[11] in kul_strs: mat[idx] = [vals[11], x, y, z, r, theta, i, j, k, d, zone] idx = idx + 1 else: if vals[11] not in mat[:, 0]: newrow = [vals[11], x, y, z, r, theta, i, j, k, d, zone] mat = np.vstack([mat, newrow]) # sort list of lists by first column mat_sort = mat[mat[:, 0].argsort()] return mat_sort
[docs]def create_pressure_array(kuls, data_type=np.float32): """Generate a numpy array with kulite pressure-time histories Parameters ---------- kuls : dict kulite names linked to array of pressure values data_type : numpy.dtype data type of array elements Returns ------- np.ndarray kulite data """ size = len(kuls.data) * len(kuls.data[list(kuls.data.keys())[0]]) leng = len(kuls.data[list(kuls.data.keys())[0]]) data_array = np.empty(size, dtype=data_type) log.info("Creating kulite pressure file of %d kulites, history of %d," + " size %d, and type %s..", len(kuls.data), leng, size, data_type) idx = 0 for kul in kuls.data: data_array[idx : idx + leng] = kuls.data[kul] idx = idx + leng return data_array