Source code for nlsq.stability.fallback

"""
Automatic Fallback Strategies for Robust Optimization
======================================================

This module provides automatic fallback mechanisms that attempt to recover
from optimization failures by trying alternative approaches.

Key Features:
- Automatic method selection (trf → alternative approaches)
- Initial guess perturbation for escaping local minima
- Tolerance adjustment for difficult problems
- Parameter bound inference when needed
- Robust loss function application
- Problem rescaling for numerical stability

Example:
    >>> from nlsq.stability.fallback import FallbackOrchestrator
    >>>
    >>> orchestrator = FallbackOrchestrator(verbose=True)
    >>> result = orchestrator.fit_with_fallback(
    ...     model, xdata, ydata, p0=[1, 0.5],
    ...     max_attempts=5
    ... )
    >>> print(f"Success with strategy: {result.fallback_strategy_used}")
"""

from collections.abc import Callable
from typing import Any, ClassVar, Protocol

import numpy as np

from nlsq.utils.logging import get_logger

__all__ = [
    "FallbackOrchestrator",
    "FallbackResult",
    "FallbackStrategy",
]


class StrategyFactory(Protocol):
    """Protocol for strategy classes that can be instantiated with no arguments."""

    def __call__(self) -> "FallbackStrategy": ...


[docs] class FallbackStrategy: """Base class for fallback strategies."""
[docs] def __init__(self, name: str, description: str, priority: int = 0): """ Initialize fallback strategy. Parameters ---------- name : str Strategy name description : str Human-readable description priority : int, optional Execution priority (higher = earlier). Default: 0 """ self.name = name self.description = description self.priority = priority self.attempts = 0 self.successes = 0
[docs] def apply(self, kwargs: dict[str, Any]) -> dict[str, Any]: """ Apply strategy by modifying fit parameters. Parameters ---------- kwargs : dict Original curve_fit keyword arguments Returns ------- modified_kwargs : dict Modified keyword arguments """ raise NotImplementedError("Subclasses must implement apply()")
def __repr__(self): return ( f"{self.__class__.__name__}(name='{self.name}', priority={self.priority})" )
[docs] class AlternativeMethodStrategy(FallbackStrategy): """Try alternative optimization methods."""
[docs] def __init__(self): super().__init__( name="alternative_method", description="Try alternative optimization method", priority=10, # High priority - cheap to try ) # Placeholder: only "trf" is currently supported, so this strategy is a no-op # when TRF is already in use. Extend method_sequence to add alternatives. self.method_sequence = ["trf"] self.current_index = 0
[docs] def apply(self, kwargs: dict[str, Any]) -> dict[str, Any]: """Try next method in sequence.""" modified = kwargs.copy() if self.current_index < len(self.method_sequence): modified["method"] = self.method_sequence[self.current_index] self.current_index += 1 return modified
[docs] class PerturbInitialGuessStrategy(FallbackStrategy): """Perturb initial guess to escape local minima."""
[docs] def __init__(self, perturbation_scale: float = 0.1, max_perturbations: int = 3): super().__init__( name="perturb_p0", description=f"Perturb initial guess by {perturbation_scale * 100:.0f}%", priority=8, ) self.perturbation_scale = perturbation_scale self.max_perturbations = max_perturbations self.perturbation_count = 0
[docs] def apply(self, kwargs: dict[str, Any]) -> dict[str, Any]: """Add random perturbation to p0.""" modified = kwargs.copy() p0 = np.array(modified.get("p0", [1.0])) if self.perturbation_count < self.max_perturbations: # Add random noise scaled by parameter magnitude (additive for p0=0) noise = np.random.randn(*p0.shape) * self.perturbation_scale perturbed_p0 = p0 + noise * np.maximum(np.abs(p0), 1e-3) # Ensure p0 stays within bounds if provided bounds = modified.get("bounds", (-np.inf, np.inf)) lower, upper = bounds lower = np.atleast_1d(lower) upper = np.atleast_1d(upper) perturbed_p0 = np.clip(perturbed_p0, lower, upper) modified["p0"] = perturbed_p0 self.perturbation_count += 1 return modified
[docs] class AdjustTolerancesStrategy(FallbackStrategy): """Relax optimization tolerances."""
[docs] def __init__(self, relaxation_factor: float = 10.0): super().__init__( name="adjust_tolerances", description=f"Relax tolerances by {relaxation_factor}x", priority=7, ) self.relaxation_factor = relaxation_factor self.current_factor = 1.0
[docs] def apply(self, kwargs: dict[str, Any]) -> dict[str, Any]: """Relax ftol, xtol, gtol.""" modified = kwargs.copy() # Default tolerances ftol = modified.get("ftol", 1e-8) xtol = modified.get("xtol", 1e-8) gtol = modified.get("gtol", 1e-8) # Relax by factor self.current_factor *= self.relaxation_factor modified["ftol"] = ftol * self.current_factor modified["xtol"] = xtol * self.current_factor modified["gtol"] = gtol * self.current_factor return modified
[docs] class AddParameterBoundsStrategy(FallbackStrategy): """Infer and add parameter bounds if not provided."""
[docs] def __init__(self): super().__init__( name="add_bounds", description="Add inferred parameter bounds", priority=6, )
[docs] def apply(self, kwargs: dict[str, Any]) -> dict[str, Any]: """Infer reasonable bounds from data and p0.""" modified = kwargs.copy() # Check if bounds already provided if "bounds" in modified and modified["bounds"] != (-np.inf, np.inf): return modified # Already has bounds p0 = np.array(modified.get("p0", [1.0])) xdata = modified.get("_xdata") # Internal use ydata = modified.get("_ydata") # Internal use if xdata is None or ydata is None: return modified # Can't infer without data # Heuristic bounds based on data range y_range = np.ptp(ydata) y_min, y_max = np.min(ydata), np.max(ydata) # Conservative bounds: p0 ± 10x, but reasonable for data scale lower = np.maximum(p0 / 10, y_min - y_range) upper = np.minimum(p0 * 10, y_max + y_range) # Ensure positive for amplitude-like parameters (heuristic) if np.all(p0 > 0): lower = np.maximum(lower, 0) modified["bounds"] = (lower, upper) return modified
[docs] class UseRobustLossStrategy(FallbackStrategy): """Apply robust loss function to handle outliers."""
[docs] def __init__(self): super().__init__( name="robust_loss", description="Apply Huber loss for outliers", priority=5 ) self.loss_sequence = ["soft_l1", "huber", "cauchy"] self.current_index = 0
[docs] def apply(self, kwargs: dict[str, Any]) -> dict[str, Any]: """Try next robust loss function.""" modified = kwargs.copy() if self.current_index < len(self.loss_sequence): modified["loss"] = self.loss_sequence[self.current_index] modified["f_scale"] = modified.get("f_scale", 1.0) self.current_index += 1 return modified
[docs] class RescaleProblemStrategy(FallbackStrategy): """Rescale data for numerical stability."""
[docs] def __init__(self): super().__init__( name="rescale_problem", description="Rescale parameters and data", priority=4, )
[docs] def apply(self, kwargs: dict[str, Any]) -> dict[str, Any]: """Normalize data to [0, 1] range.""" modified = kwargs.copy() xdata = modified.get("_xdata") ydata = modified.get("_ydata") p0 = modified.get("p0") if xdata is None or ydata is None or p0 is None: return modified # Store scaling factors for later inverse transform x_scale = np.ptp(xdata) y_scale = np.ptp(ydata) x_offset = np.min(xdata) y_offset = np.min(ydata) if x_scale > 0 and y_scale > 0: modified["_x_scale"] = x_scale modified["_y_scale"] = y_scale modified["_x_offset"] = x_offset modified["_y_offset"] = y_offset modified["_scaled"] = True return modified
[docs] class FallbackResult: """Enhanced optimization result with fallback information."""
[docs] def __init__(self, result, strategy_used: str | None = None, attempts: int = 1): """ Initialize fallback result. Parameters ---------- result : OptimizeResult Underlying optimization result strategy_used : str, optional Name of fallback strategy that succeeded attempts : int, optional Number of attempts before success """ self.result = result self.fallback_strategy_used = strategy_used self.fallback_attempts = attempts
[docs] def __getattr__(self, name): """Delegate attribute access to underlying result.""" return getattr(self.result, name)
[docs] class FallbackOrchestrator: """ Orchestrates automatic fallback strategies for robust optimization. The orchestrator tries multiple recovery strategies when optimization fails, including alternative methods, perturbed initial guesses, relaxed tolerances, and robust loss functions. Parameters ---------- strategies : list of FallbackStrategy, optional List of strategies to try. If None, uses default strategies. max_attempts : int, optional Maximum total attempts across all strategies. Default: 10 verbose : bool, optional Print progress messages. Default: False Attributes ---------- strategies : list of FallbackStrategy Active fallback strategies, sorted by priority total_attempts : int Total number of fit attempts made successful_strategies : dict Count of successes per strategy Examples -------- >>> from nlsq.stability.fallback import FallbackOrchestrator >>> import numpy as np >>> >>> def model(x, a, b): ... return a * np.exp(-b * x) >>> >>> x = np.linspace(0, 10, 100) >>> y = 2.5 * np.exp(-0.5 * x) + 0.1 * np.random.randn(100) >>> >>> orchestrator = FallbackOrchestrator(verbose=True) >>> result = orchestrator.fit_with_fallback( ... model, x, y, p0=[1, 0.1] # Deliberately poor p0 ... ) >>> >>> if result.fallback_strategy_used: ... print(f"Recovered using: {result.fallback_strategy_used}") """ DEFAULT_STRATEGIES: ClassVar[list[StrategyFactory]] = [ AlternativeMethodStrategy, PerturbInitialGuessStrategy, AdjustTolerancesStrategy, AddParameterBoundsStrategy, UseRobustLossStrategy, RescaleProblemStrategy, ]
[docs] def __init__( self, strategies: list[FallbackStrategy] | None = None, max_attempts: int = 10, verbose: bool = False, ): """Initialize fallback orchestrator.""" self.logger = get_logger(__name__) self.verbose = verbose # Initialize strategies if strategies is None: self.strategies = [strategy() for strategy in self.DEFAULT_STRATEGIES] else: self.strategies = strategies # Sort by priority (highest first) self.strategies.sort(key=lambda s: s.priority, reverse=True) self.max_attempts = max_attempts self.total_attempts = 0 self.successful_strategies: dict[str, int] = {}
[docs] def fit_with_fallback(self, f: Callable, xdata, ydata, **kwargs) -> FallbackResult: """ Attempt curve fit with automatic fallback on failure. Parameters ---------- f : callable Model function xdata : array_like Independent variable data ydata : array_like Dependent variable data **kwargs Additional arguments passed to curve_fit Returns ------- result : FallbackResult Optimization result with fallback metadata Raises ------ RuntimeError If all fallback strategies fail """ from nlsq.core.minpack import ( curve_fit, # Deferred import to avoid circular dependency ) # Inject xdata/ydata for strategies that need it kwargs["_xdata"] = xdata kwargs["_ydata"] = ydata # Try original parameters first self.total_attempts += 1 if self.verbose: print(f"Attempt 1/{self.max_attempts}: Original parameters") try: result = curve_fit(f, xdata, ydata, **kwargs) if self.verbose: print("Success with original parameters!") return FallbackResult(result, strategy_used=None, attempts=1) except Exception as e: if self.verbose: print(f"[FAIL] Failed: {type(e).__name__}: {e}") last_exception = e # Try fallback strategies for strategy in self.strategies: if self.total_attempts >= self.max_attempts: break self.total_attempts += 1 strategy.attempts += 1 if self.verbose: print( f"\nAttempt {self.total_attempts}/{self.max_attempts}: " f"{strategy.description}" ) try: # Apply strategy modifications modified_kwargs = strategy.apply(kwargs) # Remove internal markers modified_kwargs.pop("_xdata", None) modified_kwargs.pop("_ydata", None) # Try fit with modified parameters result = curve_fit(f, xdata, ydata, **modified_kwargs) # Success! strategy.successes += 1 self.successful_strategies[strategy.name] = ( self.successful_strategies.get(strategy.name, 0) + 1 ) if self.verbose: print(f"[OK] Success with {strategy.name}!") return FallbackResult( result, strategy_used=strategy.name, attempts=self.total_attempts ) except Exception as e: if self.verbose: print(f"[FAIL] Failed: {type(e).__name__}") last_exception = e continue # All strategies failed error_msg = ( f"All {self.total_attempts} fallback attempts failed. " f"Last error: {type(last_exception).__name__}: {last_exception}" ) self.logger.error(error_msg) raise RuntimeError(error_msg) from last_exception
[docs] def get_statistics(self) -> dict[str, Any]: """ Get statistics on fallback strategy performance. Returns ------- stats : dict Dictionary with success rates and attempt counts """ stats: dict[str, Any] = { "total_attempts": self.total_attempts, "strategies": [], } for strategy in self.strategies: strategy_stats = { "name": strategy.name, "description": strategy.description, "attempts": strategy.attempts, "successes": strategy.successes, "success_rate": ( strategy.successes / strategy.attempts if strategy.attempts > 0 else 0.0 ), } stats["strategies"].append(strategy_stats) return stats
[docs] def print_statistics(self): """Print human-readable statistics.""" stats = self.get_statistics() print("=" * 60) print("FALLBACK ORCHESTRATOR STATISTICS") print("=" * 60) print(f"Total attempts: {stats['total_attempts']}") print("\nStrategy Performance:") print("-" * 60) for s in stats["strategies"]: if s["attempts"] > 0: print( f" {s['name']:20s}: {s['successes']:3d}/{s['attempts']:3d} " f"({s['success_rate'] * 100:5.1f}%)" ) print(f" └─ {s['description']}") print("=" * 60)