"""Automatic algorithm selection for NLSQ optimization.
This module analyzes problem characteristics and automatically selects
the best optimization algorithm and parameters.
"""
from collections.abc import Callable
from inspect import signature
import numpy as np
from nlsq.config import JAXConfig
_jax_config = JAXConfig()
[docs]
class AlgorithmSelector:
"""Automatically select best algorithm based on problem characteristics.
This class analyzes the optimization problem and recommends the best
algorithm, loss function, and parameters based on:
- Problem size and dimensionality
- Data characteristics (outliers, noise, conditioning)
- Memory constraints
- Convergence requirements
"""
[docs]
def __init__(self):
"""Initialize algorithm selector."""
# Algorithm characteristics
self.algorithm_properties = {
"trf": {
"memory_factor": 2.5, # Relative memory usage
"speed": "fast",
"robust": True,
"handles_bounds": True,
"good_for_large": True,
"good_for_ill_conditioned": False,
},
"lm": {
"memory_factor": 1.5,
"speed": "medium",
"robust": False,
"handles_bounds": False,
"good_for_large": False,
"good_for_ill_conditioned": True,
},
"dogbox": {
"memory_factor": 2.0,
"speed": "medium",
"robust": False,
"handles_bounds": True,
"good_for_large": False,
"good_for_ill_conditioned": False,
},
}
# Loss function characteristics
self.loss_properties = {
"linear": {"robust": False, "smooth": True},
"huber": {"robust": True, "smooth": True},
"soft_l1": {"robust": True, "smooth": True},
"cauchy": {"robust": True, "smooth": True},
"arctan": {"robust": True, "smooth": True},
}
[docs]
def analyze_problem(
self,
f: Callable,
xdata: np.ndarray,
ydata: np.ndarray,
p0: np.ndarray | None = None,
bounds: tuple | None = None,
memory_limit_gb: float | None = None,
) -> dict:
"""Analyze problem characteristics.
Parameters
----------
f : callable
Model function to fit
xdata : np.ndarray
Independent variable data
ydata : np.ndarray
Dependent variable data
p0 : np.ndarray, optional
Initial parameter guess
bounds : tuple, optional
Parameter bounds
memory_limit_gb : float, optional
Memory constraint in GB
Returns
-------
analysis : dict
Problem characteristics and statistics
"""
# Convert to numpy for analysis
xdata = np.asarray(xdata)
ydata = np.asarray(ydata)
n_points = len(xdata) if xdata.ndim == 1 else xdata.shape[0]
# Estimate number of parameters
n_params = self._estimate_n_params(f, p0)
# Basic statistics
analysis = {
"n_points": n_points,
"n_params": n_params,
"overdetermination_ratio": n_points / n_params if n_params > 0 else np.inf,
}
# Data characteristics
analysis.update(self._analyze_data(xdata, ydata))
# Problem size classification
if n_points <= 100:
analysis["size_class"] = "small"
elif n_points < 10000:
analysis["size_class"] = "medium"
elif n_points < 1000000:
analysis["size_class"] = "large"
else:
analysis["size_class"] = "very_large"
# Conditioning estimate
analysis["condition_estimate"] = self._estimate_conditioning(xdata, n_params)
# Memory requirements
if memory_limit_gb is not None:
analysis["memory_constrained"] = self._check_memory_constraints(
n_points, n_params, memory_limit_gb
)
else:
analysis["memory_constrained"] = False
# Bounds analysis
analysis["has_bounds"] = bounds is not None and bounds != (-np.inf, np.inf)
# Parameter scale analysis
if p0 is not None:
analysis["param_scale_range"] = np.ptp(np.log10(np.abs(p0) + 1e-10))
else:
analysis["param_scale_range"] = 0
return analysis
def _estimate_n_params(self, f: Callable, p0: np.ndarray | None) -> int:
"""Estimate number of parameters.
Parameters
----------
f : callable
Model function
p0 : np.ndarray, optional
Initial guess
Returns
-------
n_params : int
Estimated number of parameters
"""
if p0 is not None:
return len(p0)
try:
sig = signature(f)
# Count parameters excluding x
return len(sig.parameters) - 1
except (TypeError, ValueError):
# Default guess
return 3
def _analyze_data(self, xdata: np.ndarray, ydata: np.ndarray) -> dict:
"""Analyze data characteristics.
Parameters
----------
xdata : np.ndarray
Independent variable
ydata : np.ndarray
Dependent variable
Returns
-------
characteristics : dict
Data characteristics
"""
results = {}
if len(ydata) == 0:
return {"outlier_fraction": 0.0, "has_outliers": False}
# Check for outliers using IQR method
q1, q3 = np.percentile(ydata, [25, 75])
iqr = q3 - q1
outlier_bounds = (q1 - 3 * iqr, q3 + 3 * iqr)
n_outliers = np.sum((ydata < outlier_bounds[0]) | (ydata > outlier_bounds[1]))
results["outlier_fraction"] = n_outliers / len(ydata)
results["has_outliers"] = results["outlier_fraction"] > 0.01
# Check for noise level (using local variation)
if len(ydata) > 10:
# Estimate noise from differences
diff = np.diff(ydata)
noise_estimate = np.median(np.abs(diff))
signal_range = np.ptp(ydata)
results["snr_estimate"] = signal_range / (noise_estimate + 1e-10)
results["is_noisy"] = results["snr_estimate"] < 10
else:
results["snr_estimate"] = np.inf
results["is_noisy"] = False
# Data range
if xdata.ndim == 1:
results["x_range"] = np.ptp(xdata)
results["x_scale"] = np.log10(results["x_range"] + 1e-10)
# Check for uniform spacing
if len(xdata) > 2:
spacing = np.diff(xdata)
results["x_uniform"] = np.std(spacing) / np.mean(spacing) < 0.01
else:
results["x_uniform"] = True
else:
results["x_range"] = 0
results["x_scale"] = 0
results["x_uniform"] = False
results["y_range"] = np.ptp(ydata)
results["y_scale"] = np.log10(results["y_range"] + 1e-10)
# Check for zeros or near-zeros
results["has_zeros"] = np.any(np.abs(ydata) < 1e-10)
return results
def _estimate_conditioning(self, xdata: np.ndarray, n_params: int) -> float:
"""Estimate problem conditioning.
Parameters
----------
xdata : np.ndarray
Independent variable
n_params : int
Number of parameters
Returns
-------
condition_estimate : float
Estimated condition number
"""
if xdata.ndim > 1:
# Multi-dimensional x
return 1.0
# Build vandermonde-like matrix for polynomial features
n_features = min(n_params, 5)
n_samples = min(len(xdata), 1000) # Sample for large datasets
if n_samples < n_features:
return np.inf
# Sample data
indices = np.linspace(0, len(xdata) - 1, n_samples, dtype=int)
x_sample = xdata[indices]
# Normalize to [0, 1]
x_min, x_max = x_sample.min(), x_sample.max()
if x_max - x_min < 1e-10:
return np.inf
x_norm = (x_sample - x_min) / (x_max - x_min)
# Build feature matrix
X = np.vstack([x_norm**i for i in range(n_features)]).T
try:
cond = np.linalg.cond(X)
return cond
except Exception:
return np.inf
def _check_memory_constraints(
self, n_points: int, n_params: int, memory_limit_gb: float
) -> bool:
"""Check if problem fits in memory limit.
Parameters
----------
n_points : int
Number of data points
n_params : int
Number of parameters
memory_limit_gb : float
Memory limit in GB
Returns
-------
constrained : bool
Whether memory is constrained
"""
# Estimate memory for Jacobian and working arrays
memory_needed_gb = (8 * n_points * n_params * 3) / 1e9 # Factor of 3 for safety
return memory_needed_gb > memory_limit_gb
def _apply_user_preferences(
self, recommendations: dict, user_preferences: dict | None
) -> None:
"""Apply user preferences to recommendations.
Parameters
----------
recommendations : dict
Algorithm recommendations (modified in place)
user_preferences : dict or None
User preferences
"""
if not user_preferences:
return
if "prioritize" in user_preferences:
priority = user_preferences["prioritize"]
if priority == "speed":
recommendations["ftol"] = 1e-6
recommendations["xtol"] = 1e-6
recommendations["max_nfev"] = 100
elif priority == "accuracy":
recommendations["ftol"] = 1e-10
recommendations["xtol"] = 1e-10
recommendations["gtol"] = 1e-10
def _adjust_for_condition_number(
self, recommendations: dict, condition_estimate: float
) -> None:
"""Adjust parameters for ill-conditioned problems.
Parameters
----------
recommendations : dict
Algorithm recommendations (modified in place)
condition_estimate : float
Condition number estimate
"""
if condition_estimate > 1e10:
# Ill-conditioned: use more conservative tolerances
recommendations["ftol"] = 1e-6
recommendations["xtol"] = 1e-6
def _select_trust_region_solver(
self, recommendations: dict, n_points: int, n_params: int
) -> None:
"""Select appropriate trust region solver.
Parameters
----------
recommendations : dict
Algorithm recommendations (modified in place)
n_points : int
Number of data points
n_params : int
Number of parameters
"""
if n_points > 100000:
# Large problem: use iterative solver
if n_points > 1000000:
recommendations["tr_solver"] = "lsmr" # Iterative solver
elif n_params > 100:
# Many parameters: use iterative solver
recommendations["tr_solver"] = "lsmr"
def _select_loss_function(
self, recommendations: dict, has_outliers: bool, outlier_fraction: float
) -> None:
"""Select appropriate loss function based on outliers.
Parameters
----------
recommendations : dict
Algorithm recommendations (modified in place)
has_outliers : bool
Whether outliers were detected
outlier_fraction : float
Fraction of outliers (0-1)
"""
if not has_outliers:
return
if outlier_fraction > 0.1:
recommendations["loss"] = "cauchy" # Very robust
elif outlier_fraction > 0.05:
recommendations["loss"] = "huber" # Moderately robust
elif outlier_fraction > 0.01:
recommendations["loss"] = "soft_l1" # Slightly robust
else:
recommendations["loss"] = "linear"
def _adjust_for_noise(self, recommendations: dict, is_noisy: bool) -> None:
"""Adjust tolerances for noisy data.
Parameters
----------
recommendations : dict
Algorithm recommendations (modified in place)
is_noisy : bool
Whether data appears noisy
"""
if is_noisy:
# Relax tolerances for noisy data
recommendations["ftol"] = max(recommendations["ftol"], 1e-6)
recommendations["xtol"] = max(recommendations["xtol"], 1e-6)
def _adjust_for_memory_constraints(
self, recommendations: dict, memory_constrained: bool
) -> None:
"""Adjust for memory-constrained environments.
Parameters
----------
recommendations : dict
Algorithm recommendations (modified in place)
memory_constrained : bool
Whether memory is constrained
"""
if memory_constrained:
recommendations["algorithm"] = "trf"
recommendations["tr_solver"] = "lsmr" # Memory-efficient iterative solver
def _adjust_max_iterations(self, recommendations: dict, n_points: int) -> None:
"""Adjust maximum iterations based on problem size.
Parameters
----------
recommendations : dict
Algorithm recommendations (modified in place)
n_points : int
Number of data points
"""
if recommendations["max_nfev"] is None:
if n_points > 1000000:
recommendations["max_nfev"] = 50
elif n_points > 100000:
recommendations["max_nfev"] = 100
elif n_points > 10000:
recommendations["max_nfev"] = 200
# else: keep None (no limit)
def _select_x_scale(self, recommendations: dict, param_scale_range: float) -> None:
"""Select parameter scaling strategy.
Parameters
----------
recommendations : dict
Algorithm recommendations (modified in place)
param_scale_range : float
Range of parameter scales (in orders of magnitude)
"""
if param_scale_range > 3:
# Parameters vary over many orders of magnitude
recommendations["x_scale"] = "jac"
else:
recommendations["x_scale"] = 1.0
[docs]
def select_algorithm(
self, problem_analysis: dict, user_preferences: dict | None = None
) -> dict:
"""Select best algorithm based on problem analysis.
This method orchestrates algorithm selection by calling focused
helper methods for each decision.
Parameters
----------
problem_analysis : dict
Results from analyze_problem
user_preferences : dict, optional
User preferences (e.g., prioritize speed vs accuracy)
Returns
-------
recommendations : dict
Recommended algorithm and parameters
"""
# Initialize default recommendations
recommendations = {
"algorithm": "trf", # Default (only algorithm currently implemented)
"loss": "linear",
"use_bounds": problem_analysis.get("has_bounds", False),
"max_nfev": None,
"ftol": 1e-8,
"xtol": 1e-8,
"gtol": 1e-8,
"x_scale": "jac",
"tr_solver": None,
"verbose": 0,
}
# Extract problem characteristics
n_points = problem_analysis["n_points"]
n_params = problem_analysis["n_params"]
# Step 1: Apply user preferences
self._apply_user_preferences(recommendations, user_preferences)
# Step 2: Adjust for condition number
condition_estimate = problem_analysis.get("condition_estimate", 1)
self._adjust_for_condition_number(recommendations, condition_estimate)
# Step 3: Select trust region solver based on problem size
self._select_trust_region_solver(recommendations, n_points, n_params)
# Step 4: Select loss function based on outliers
has_outliers = problem_analysis.get("has_outliers", False)
outlier_fraction = problem_analysis.get("outlier_fraction", 0)
self._select_loss_function(recommendations, has_outliers, outlier_fraction)
# Step 5: Adjust for noisy data
is_noisy = problem_analysis.get("is_noisy", False)
self._adjust_for_noise(recommendations, is_noisy)
# Step 6: Adjust for memory constraints
memory_constrained = problem_analysis.get("memory_constrained", False)
self._adjust_for_memory_constraints(recommendations, memory_constrained)
# Step 7: Adjust maximum iterations
self._adjust_max_iterations(recommendations, n_points)
# Step 8: Select parameter scaling
param_scale_range = problem_analysis.get("param_scale_range", 0)
self._select_x_scale(recommendations, param_scale_range)
return recommendations
[docs]
def get_algorithm_explanation(self, recommendations: dict) -> str:
"""Get human-readable explanation of algorithm choice.
Parameters
----------
recommendations : dict
Algorithm recommendations
Returns
-------
explanation : str
Explanation of the choices
"""
explanation = []
# Algorithm choice
alg = recommendations["algorithm"]
if alg == "trf":
explanation.append(
"Trust Region Reflective (TRF) algorithm selected (currently the only supported algorithm in NLSQ)"
)
# Future support for other algorithms
# elif alg == 'lm':
# explanation.append("Levenberg-Marquardt (LM) algorithm selected for good convergence properties")
# elif alg == 'dogbox':
# explanation.append("Dogbox algorithm selected for bounded optimization")
# Loss function
loss = recommendations["loss"]
if loss != "linear":
explanation.append(f"Using {loss} loss function for outlier robustness")
# Tolerances
if recommendations["ftol"] >= 1e-6:
explanation.append("Relaxed tolerances for faster convergence")
elif recommendations["ftol"] <= 1e-10:
explanation.append("Tight tolerances for high accuracy")
# Solver
if recommendations.get("tr_solver") == "lsmr":
explanation.append("Using iterative solver for memory efficiency")
# Iterations
if recommendations["max_nfev"] is not None:
explanation.append(f"Limited to {recommendations['max_nfev']} iterations")
return "\n".join(explanation)
# Global selector instance
_algorithm_selector = AlgorithmSelector()
[docs]
def auto_select_algorithm(
f: Callable,
xdata: np.ndarray,
ydata: np.ndarray,
p0: np.ndarray | None = None,
bounds: tuple | None = None,
memory_limit_gb: float | None = None,
user_preferences: dict | None = None,
) -> dict:
"""Automatically select best optimization algorithm.
Parameters
----------
f : callable
Model function
xdata : np.ndarray
Independent variable
ydata : np.ndarray
Dependent variable
p0 : np.ndarray, optional
Initial guess
bounds : tuple, optional
Parameter bounds
memory_limit_gb : float, optional
Memory limit
user_preferences : dict, optional
User preferences
Returns
-------
recommendations : dict
Algorithm recommendations
"""
analysis = _algorithm_selector.analyze_problem(
f, xdata, ydata, p0, bounds, memory_limit_gb
)
recommendations = _algorithm_selector.select_algorithm(analysis, user_preferences)
return recommendations