Source code for nlsq.core.optimizer_base

"""Base class for optimization algorithms in NLSQ."""

from abc import ABC, abstractmethod
from collections.abc import Callable

import numpy as np

# Initialize JAX configuration through central config
from nlsq.config import JAXConfig

_jax_config = JAXConfig()

from nlsq.result import OptimizeResult
from nlsq.utils.logging import get_logger


[docs] class OptimizerBase(ABC): """Abstract base class for optimization algorithms. This class provides a common interface for optimization algorithms used in NLSQ. It includes common functionality like logging, result creation, and defines the interface that subclasses must implement. """
[docs] def __init__(self, name: str = "optimizer"): """Initialize the optimizer base class. Parameters ---------- name : str Name of the optimizer for logging purposes """ self.name = name self.logger = get_logger(f"optimizer.{name}") self._nfev = 0 self._njev = 0
[docs] @abstractmethod def optimize( self, fun: Callable, x0: np.ndarray, jac: Callable | None = None, bounds: tuple[np.ndarray, np.ndarray] | tuple[float, float] = (-np.inf, np.inf), **kwargs, ) -> OptimizeResult: """Perform optimization. This is the main optimization method that must be implemented by subclasses. Parameters ---------- fun : callable The objective function to minimize x0 : np.ndarray Initial guess for parameters jac : callable, optional Jacobian function bounds : tuple of arrays Lower and upper bounds for parameters **kwargs Additional optimization parameters Returns ------- OptimizeResult The optimization result """
[docs] def reset_counters(self): """Reset function evaluation counters.""" self._nfev = 0 self._njev = 0 self.logger.debug("Counters reset", nfev=0, njev=0)
[docs] def increment_nfev(self, count: int = 1): """Increment function evaluation counter.""" self._nfev += count
[docs] def increment_njev(self, count: int = 1): """Increment Jacobian evaluation counter.""" self._njev += count
@property def nfev(self) -> int: """Number of function evaluations.""" return self._nfev @property def njev(self) -> int: """Number of Jacobian evaluations.""" return self._njev
[docs] def create_result( self, x: np.ndarray, fun: np.ndarray, jac: np.ndarray | None = None, cost: float | None = None, status: int = 0, message: str = "", optimality: float | None = None, active_mask: np.ndarray | None = None, **kwargs, ) -> OptimizeResult: """Create a standardized optimization result. Parameters ---------- x : np.ndarray Optimized parameters fun : np.ndarray Function values at optimized parameters jac : np.ndarray, optional Jacobian at optimized parameters cost : float, optional Final cost value status : int Termination status code message : str Termination message optimality : float, optional First-order optimality measure active_mask : np.ndarray, optional Active constraints mask **kwargs Additional result attributes Returns ------- OptimizeResult Standardized optimization result """ result = OptimizeResult( x=x, fun=fun, jac=jac, cost=cost, nfev=self.nfev, njev=self.njev, status=status, message=message, optimality=optimality, active_mask=active_mask, **kwargs, ) self.logger.debug( "Result created", final_cost=cost, nfev=self.nfev, njev=self.njev, status=status, ) return result
[docs] def check_convergence( self, actual_reduction: float, cost: float, step_norm: float, x_norm: float, ratio: float, ftol: float, xtol: float, ) -> int | None: """Check convergence criteria. Parameters ---------- actual_reduction : float Actual reduction in cost function cost : float Current cost value step_norm : float Norm of the optimization step x_norm : float Norm of the parameter vector ratio : float Ratio of actual to predicted reduction ftol : float Function tolerance xtol : float Parameter tolerance Returns ------- int or None Termination status code if converged, None otherwise """ ftol_satisfied = actual_reduction < ftol * cost xtol_satisfied = step_norm < xtol * (xtol + x_norm) # Check combined criteria first (status 4 takes priority over 2 or 3) if ftol_satisfied and xtol_satisfied: self.logger.debug("Convergence: both tolerances satisfied") return 4 # Check function tolerance if ftol_satisfied: self.logger.debug( "Convergence: function tolerance satisfied", actual_reduction=actual_reduction, ftol=ftol, cost=cost, ) return 2 # Check parameter tolerance if xtol_satisfied: self.logger.debug( "Convergence: parameter tolerance satisfied", step_norm=step_norm, xtol=xtol, x_norm=x_norm, ) return 3 return None
[docs] def log_iteration( self, iteration: int, cost: float, gradient_norm: float | None = None, step_size: float | None = None, **kwargs, ): """Log optimization iteration details. Parameters ---------- iteration : int Iteration number cost : float Current cost value gradient_norm : float, optional Gradient norm step_size : float, optional Step size **kwargs Additional logging parameters """ self.logger.optimization_step( iteration=iteration, cost=cost, gradient_norm=gradient_norm, step_size=step_size, nfev=self.nfev, **kwargs, )
[docs] def log_convergence( self, reason: str, iterations: int, final_cost: float, time_elapsed: float | None = None, **kwargs, ): """Log convergence information. Parameters ---------- reason : str Convergence reason iterations : int Number of iterations final_cost : float Final cost value time_elapsed : float, optional Total optimization time **kwargs Additional logging parameters """ self.logger.convergence( reason=reason, iterations=iterations, final_cost=final_cost, time_elapsed=time_elapsed, **kwargs, )
[docs] class TrustRegionOptimizerBase(OptimizerBase): """Base class for trust region optimization algorithms. This class extends OptimizerBase with trust region specific functionality like trust region radius management and step acceptance criteria. """
[docs] def __init__(self, name: str = "trust_region"): """Initialize trust region optimizer.""" super().__init__(name) self._trust_radius = 1.0
@property def trust_radius(self) -> float: """Current trust region radius.""" return self._trust_radius @trust_radius.setter def trust_radius(self, value: float): """Set trust region radius.""" self._trust_radius = max(0.0, value)
[docs] def update_trust_radius( self, Delta: float, actual_reduction: float, predicted_reduction: float, step_norm: float, step_at_boundary: bool = False, ) -> tuple[float, float]: """Update trust region radius based on step quality. Parameters ---------- Delta : float Current trust region radius actual_reduction : float Actual reduction in objective predicted_reduction : float Predicted reduction from model step_norm : float Norm of the step taken step_at_boundary : bool Whether step reached trust region boundary Returns ------- tuple New trust region radius and reduction ratio """ if predicted_reduction <= 0: ratio = 0.0 else: ratio = actual_reduction / predicted_reduction if ratio < 0.25: Delta_new = 0.25 * step_norm elif ratio > 0.75 and step_at_boundary: Delta_new = min(2.0 * Delta, 1000.0) else: Delta_new = Delta self.trust_radius = Delta_new self.logger.debug( "Trust radius updated", old_radius=Delta, new_radius=Delta_new, ratio=ratio, step_norm=step_norm, at_boundary=step_at_boundary, ) return Delta_new, ratio
[docs] def step_accepted(self, ratio: float, threshold: float = 1e-4) -> bool: """Check if optimization step should be accepted. Parameters ---------- ratio : float Ratio of actual to predicted reduction threshold : float Minimum acceptable ratio Returns ------- bool True if step should be accepted """ accepted = ratio > threshold self.logger.debug( "Step acceptance check", ratio=ratio, threshold=threshold, accepted=accepted ) return accepted