Source code for pysips.regressor

"""
PySIPS: Python package for Symbolic Inference via Posterior Sampling

This module provides a scikit-learn compatible interface for symbolic regression
using Sequential Monte Carlo (SMC) sampling with Bayesian model selection. It
combines symbolic expression generation, probabilistic proposal mechanisms, and
Laplace approximation for normalized marginal likelihood estimation to discover
mathematical expressions that best explain observed data.

The approach uses SMC to sample from a posterior distribution over symbolic
expressions, allowing for principled uncertainty quantification and model
selection in symbolic regression tasks. Unlike traditional genetic programming
approaches, this method provides probabilistic estimates of model quality and
can naturally handle model uncertainty.

Methodology
-----------
The algorithm works through the following steps:

1. **Expression Generation**: Creates initial symbolic expressions using
   configurable operators and complexity constraints

2. **Proposal Mechanisms**: Uses probabilistic combination of:
   - Mutation operations (structural changes to expressions)
   - Crossover operations (combining expressions from gene pool)

3. **Likelihood Evaluation**: Employs Laplace approximation to estimate
   normalized marginal likelihood for Bayesian model comparison

4. **SMC Sampling**: Uses Sequential Monte Carlo to sample from the
   posterior distribution over symbolic expressions

5. **Model Selection**: Chooses final model using either:
   - Mode selection (most frequently sampled expression)
   - Maximum likelihood selection (highest scoring expression)

Parameters Overview
-------------------
Expression Generation:
    - operators: Mathematical operators to include
    - max_complexity: Maximum expression graph size
    - terminal_probability: Probability of terminal node selection
    - constant_probability: Probability of constant vs variable terminals

Mutation Parameters:
    - command_probability: Probability of operation changes
    - node_probability: Probability of node replacement
    - parameter_probability: Probability of constant modification
    - prune_probability: Probability of expression pruning
    - fork_probability: Probability of expression expansion

Sampling Parameters:
    - num_particles: Population size for SMC
    - num_mcmc_samples: MCMC steps per SMC iteration
    - target_ess: Target effective sample size
    - crossover_pool_size: Size of crossover gene pool
    - max_time: Maximum runtime for sampling process

Checkpointing:
    - checkpoint_file: File path for saving/loading sampling progress
      If the checkpoint file exists, fitting will attempt to resume from
      the saved state and continue updating the checkpoint as it proceeds

Usage Example
-------------
>>> from pysips import PysipsRegressor
>>> import numpy as np
>>>
>>> # Generate sample data
>>> X = np.random.randn(100, 2)
>>> y = X[:, 0]**2 + 2*X[:, 1] + np.random.normal(0, 0.1, 100)
>>>
>>> # Create and fit regressor
>>> regressor = PysipsRegressor(
...     operators=['+', '*', 'pow'],
...     max_complexity=20,
...     num_particles=100,
...     model_selection='mode',
...     checkpoint_file='my_run.checkpoint',
...     random_state=42
... )
>>> regressor.fit(X, y)
>>>
>>> # Make predictions
>>> y_pred = regressor.predict(X)
>>>
>>> # Get discovered expression
>>> expression = regressor.get_expression()
>>> print(f"Discovered expression: {expression}")
>>>
>>> # Get all sampled models
>>> models, likelihoods = regressor.get_models()
>>>
>>> # For hyperparameter tuning, disable progress bar
>>> regressor_quiet = PysipsRegressor(
...     operators=['+', '*'],
...     num_particles=50,
...     show_progress_bar=False,  # No progress output
...     random_state=42
... )
>>> regressor_quiet.fit(X, y)  # Silent fitting

Applications
------------
This approach is particularly well-suited for:
- Scientific discovery where interpretability is crucial
- Problems requiring uncertainty quantification in model selection
- Cases where multiple plausible models exist and need to be ranked
- Regression tasks where symbolic relationships are preferred over black-box models
- Applications requiring principled model complexity control

Notes
-----
The method balances exploration and exploitation through:
- Probabilistic proposal selection between mutation and crossover
- Adaptive sampling that focuses on promising regions of expression space
- Multiple model selection criteria to handle different use cases

For best results, consider:
- Adjusting complexity limits based on problem difficulty
- Tuning mutation/crossover probabilities for your domain
- Using sufficient particles for good posterior approximation
- Setting appropriate number of MCMC samples for mixing

Checkpointing allows for:
- Resuming interrupted long-running fits
- Incremental progress saving during extended sampling runs
- Recovery from system failures or resource limitations
"""

from collections import Counter
import numpy as np
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted

from bingo.symbolic_regression import ComponentGenerator, AGraphGenerator

from .laplace_nmll import LaplaceNmll
from .mutation_proposal import MutationProposal
from .crossover_proposal import CrossoverProposal
from .random_choice_proposal import RandomChoiceProposal
from .sampler import sample

USE_PYTHON = True
USE_SIMPLIFICATION = True
DEFAULT_OPERATORS = ["+", "*"]
DEFALT_PARAMETER_INITIALIZATION_BOUNDS = [-5, 5]


# pylint: disable=R0902,R0913,R0917,R0914
[docs] class PysipsRegressor(BaseEstimator, RegressorMixin): """ A scikit-learn compatible wrapper for PySIPS symbolic regression. This regressor uses Sequential Monte Carlo (SMC) sampling to explore the space of symbolic expressions and find mathematical models that best explain the observed data. The approach provides principled uncertainty quantification and supports checkpointing for long-running fits. Parameters ---------- operators : list, default=['+', '*'] List of operators to use in symbolic expressions. max_complexity : int, default=24 Maximum complexity of symbolic expressions. terminal_probability : float, default=0.1 Probability of selecting a terminal during expression generation. constant_probability : float or None, default=None Probability of selecting a constant terminal. If None, will be set to 1/(x_dim + 1). command_probability : float, default=0.2 Probability of command mutation. node_probability : float, default=0.2 Probability of node mutation. parameter_probability : float, default=0.2 Probability of parameter mutation. prune_probability : float, default=0.2 Probability of pruning mutation. fork_probability : float, default=0.2 Probability of fork mutation. repeat_mutation_probability : float, default=0.05 Probability of repeating a mutation. crossover_pool_size : int, default=num_particles Size of the crossover pool. mutation_prob : float, default=0.75 Probability of mutation (vs crossover). crossover_prob : float, default=0.25 Probability of crossover (vs mutation). exclusive : bool, default=True Whether mutation and crossover are exclusive. num_particles : int, default=50 Number of particles for sampling. num_mcmc_samples : int, default=5 Number of MCMC samples. target_ess : float, default=0.8 Target effective sample size. param_init_bounds : list, default=[-5, 5] Bounds for parameter initialization. opt_restarts : int, default=1 Number of optimization restarts. model_selection : str, default="mode" The way to choose a best model from the produced distribution of models. Current options are "mode" for the most frequently occuring model and "max_nml" for the model with maximum normalized marginal likelihood. checkpoint_file : str or None, default=None File path for saving and loading sampling progress. If the checkpoint file exists, fitting will attempt to resume from the saved state and continue updating the checkpoint as sampling proceeds. If None, no checkpointing is performed. random_state : int or None, default=None Random seed for reproducibility. max_time : float or None, default=None Maximum time in seconds to run the sampling process. If None, the sampling will run until completion without time constraints. Cannot be used together with max_equation_evals. max_equation_evals : int or None, default=None Maximum number of evaluations during the sampling process. If None, the sampling will run until completion without time constraints. Cannot be used together with max_time. show_progress_bar : bool, default=True Whether to display a progress bar during fitting. When False, the progress bar will be hidden, which is useful for hyperparameter tuning or when running multiple fits in parallel. """
[docs] def __init__( self, operators=None, max_complexity=24, terminal_probability=0.1, constant_probability=None, command_probability=0.2, node_probability=0.2, parameter_probability=0.2, prune_probability=0.2, fork_probability=0.2, repeat_mutation_probability=0.05, crossover_pool_size=None, mutation_prob=0.75, crossover_prob=0.25, exclusive=True, num_particles=50, num_mcmc_samples=5, target_ess=0.8, param_init_bounds=None, opt_restarts=1, model_selection="mode", checkpoint_file=None, random_state=None, max_time=None, max_equation_evals=None, show_progress_bar=True, ): # Validate that max_time and max_equation_evals are not both specified if max_time is not None and max_equation_evals is not None: raise ValueError( "max_time and max_equation_evals cannot both be specified. " "Please choose one constraint method." ) self.operators = operators if operators is not None else DEFAULT_OPERATORS self.max_complexity = max_complexity self.terminal_probability = terminal_probability self.constant_probability = constant_probability self.command_probability = command_probability self.node_probability = node_probability self.parameter_probability = parameter_probability self.prune_probability = prune_probability self.fork_probability = fork_probability self.repeat_mutation_probability = repeat_mutation_probability self.crossover_pool_size = ( crossover_pool_size if crossover_pool_size is not None else num_particles ) self.mutation_prob = mutation_prob self.crossover_prob = crossover_prob self.exclusive = exclusive self.num_particles = num_particles self.num_mcmc_samples = num_mcmc_samples self.target_ess = target_ess self.param_init_bounds = ( param_init_bounds if param_init_bounds is not None else DEFALT_PARAMETER_INITIALIZATION_BOUNDS ) self.opt_restarts = opt_restarts self.model_selection = model_selection self.checkpoint_file = checkpoint_file self.random_state = random_state self.max_time = max_time self.max_equation_evals = max_equation_evals self.show_progress_bar = show_progress_bar # attributes set after fitting self.n_features_in_ = None self.models_ = None self.likelihoods_ = None self.phis_ = None self.best_model_ = None self.best_likelihood_ = None
def _get_generator(self, x_dim): """Create expression generator.""" constant_prob = self.constant_probability if constant_prob is None: constant_prob = 1 / (x_dim + 1) component_generator = ComponentGenerator( input_x_dimension=x_dim, terminal_probability=self.terminal_probability, constant_probability=constant_prob, ) for comp in self.operators: component_generator.add_operator(comp) return AGraphGenerator( self.max_complexity, component_generator, use_python=USE_PYTHON, use_simplification=USE_SIMPLIFICATION, ) def _get_proposal(self, x_dim, generator): """Create proposal operator.""" constant_prob = self.constant_probability if constant_prob is None: constant_prob = 1 / (x_dim + 1) mutation = MutationProposal( x_dim, operators=self.operators, terminal_probability=self.terminal_probability, constant_probability=constant_prob, command_probability=self.command_probability, node_probability=self.node_probability, parameter_probability=self.parameter_probability, prune_probability=self.prune_probability, fork_probability=self.fork_probability, repeat_mutation_probability=self.repeat_mutation_probability, ) # Generate crossover pool pool = set() while len(pool) < self.crossover_pool_size: pool.add(generator()) crossover = CrossoverProposal(list(pool)) # Create combined proposal return RandomChoiceProposal( [mutation, crossover], [self.mutation_prob, self.crossover_prob], self.exclusive, )
[docs] def fit(self, X, y): """ Fit the symbolic regression model to training data. Parameters ---------- X : array-like of shape (n_samples, n_features) Training input samples. y : array-like of shape (n_samples,) Target values. Returns ------- self : object Returns self. """ # Check and validate input data X, y = check_X_y(X, y, y_numeric=True) self.n_features_in_ = X.shape[1] # Set up the sampling config x_dim = X.shape[1] # Create generator, proposal, and likelihood generator = self._get_generator(x_dim) proposal = self._get_proposal(x_dim, generator) likelihood = LaplaceNmll(X, y) # Run sampling models, likelihoods, phis = sample( likelihood, proposal, generator, max_time=self.max_time, max_equation_evals=self.max_equation_evals, seed=self.random_state, checkpoint_file=self.checkpoint_file, show_progress_bar=self.show_progress_bar, kwargs={ "num_particles": self.num_particles, "num_mcmc_samples": self.num_mcmc_samples, "target_ess": self.target_ess, }, ) # Save the models and their likelihoods self.models_ = models self.likelihoods_ = likelihoods self.phis_ = phis # Select the best model if self.model_selection == "max_nml": best_idx = np.argmax(likelihoods) elif self.model_selection == "mode": model_indices = {model: i for i, model in enumerate(models)} model_counts = Counter(model for model in self.models_) most_common_model = model_counts.most_common(1)[0][0] best_idx = model_indices[most_common_model] else: raise KeyError( f"model_selection method {self.model_selection} not recognized." ) self.best_model_ = models[best_idx] self.best_likelihood_ = likelihoods[best_idx] return self
[docs] def predict(self, X): """ Predict using the best symbolic regression model. Parameters ---------- X : array-like of shape (n_samples, n_features) Samples to predict. Returns ------- y_pred : array-like of shape (n_samples,) Returns predicted values. """ check_is_fitted(self, ["best_model_", "models_"]) X = check_array(X) # Ensure consistent feature count if X.shape[1] != self.n_features_in_: raise ValueError( f"X has {X.shape[1]} features, but BayesRRegressor was " f"trained with {self.n_features_in_} features." ) # Use the best model for prediction prediction = self.best_model_.evaluate_equation_at(X).flatten() return prediction
[docs] def score(self, X, y, sample_weight=None): """ Return the coefficient of determination R^2 of the prediction. Parameters ---------- X : array-like of shape (n_samples, n_features) Test samples. y : array-like of shape (n_samples,) True values for X. sample_weight : array-like of shape (n_samples,), default=None Sample weights. Returns ------- score : float R^2 of self.predict(X) with respect to y. """ # Use default implementation from scikit-learn try: score = super().score(X, y, sample_weight=sample_weight) except ValueError as e: # catch error cause by NaN or inf values in prediction e.g. log(0) if "Input contains NaN" in str(e) or "Input contains infinity" in str(e): return -np.inf raise return score
[docs] def get_expression(self): """ Get the symbolic expression of the best model. Returns ------- expression : str String representation of the best model. """ check_is_fitted(self, ["best_model_"]) return str(self.best_model_)
[docs] def get_models(self): """ Get all sampled models and their likelihoods. Returns ------- models : list List of all sampled models. likelihoods : numpy.ndarray Array of corresponding likelihoods. """ check_is_fitted(self, ["models_", "likelihoods_"]) return self.models_, self.likelihoods_