"""Optimization diagnostics and monitoring for NLSQ.
This module provides real-time monitoring, convergence detection,
and diagnostic reporting for optimization processes.
"""
import threading
import time
import warnings
from collections import deque
from typing import Any
import numpy as np
from nlsq.config import JAXConfig
_jax_config = JAXConfig()
try:
import psutil
HAS_PSUTIL = True
except ImportError:
HAS_PSUTIL = False
warnings.warn(
"psutil not installed, memory monitoring will be limited", UserWarning
)
[docs]
class ConvergenceMonitor:
"""Monitor convergence patterns and detect problems.
Detects:
- Oscillation in parameter or cost values
- Stagnation (no progress)
- Divergence (increasing cost)
- Slow convergence
- Numerical instability
"""
[docs]
def __init__(self, window_size: int = 10, sensitivity: float = 1.0):
"""Initialize convergence monitor.
Parameters
----------
window_size : int
Size of sliding window for pattern detection
sensitivity : float
Sensitivity factor (1.0 = normal, <1 = less sensitive, >1 = more sensitive)
"""
self.window_size = window_size
self.sensitivity = sensitivity
self.cost_history: deque[float] = deque(maxlen=window_size)
self.param_history: deque[np.ndarray] = deque(maxlen=window_size)
self.gradient_history: deque[float] = deque(maxlen=window_size)
self.step_size_history: deque[float] = deque(maxlen=window_size)
# Pattern detection thresholds
self.oscillation_threshold = 0.7 * sensitivity
self.stagnation_threshold = 1e-10 * sensitivity
self.divergence_threshold = 1.5 / sensitivity
[docs]
def update(
self,
cost: float,
params: np.ndarray,
gradient: np.ndarray | None = None,
step_size: float | None = None,
):
"""Update monitor with new iteration data.
Parameters
----------
cost : float
Current cost value
params : np.ndarray
Current parameter values
gradient : np.ndarray, optional
Current gradient
step_size : float, optional
Step size taken
"""
self.cost_history.append(cost)
self.param_history.append(params.copy())
if gradient is not None:
grad_norm = np.linalg.norm(gradient)
self.gradient_history.append(float(grad_norm))
if step_size is not None:
self.step_size_history.append(step_size)
[docs]
def add_iteration(
self,
cost: float,
params: np.ndarray,
gradient: np.ndarray | None = None,
step_size: float | None = None,
):
"""Alias for update() method for backward compatibility.
Parameters
----------
cost : float
Current cost value
params : np.ndarray
Current parameter values
gradient : np.ndarray, optional
Current gradient
step_size : float, optional
Step size taken
"""
self.update(cost, params, gradient, step_size)
[docs]
def detect_oscillation(self) -> tuple[bool, float]:
"""Detect oscillation in optimization.
Returns
-------
is_oscillating : bool
Whether oscillation is detected
oscillation_score : float
Oscillation severity score (0-1)
"""
if len(self.cost_history) < self.window_size:
return False, 0.0
costs = np.array(self.cost_history)
# Check for alternating pattern
differences = np.diff(costs)
sign_changes = np.sum(np.diff(np.sign(differences)) != 0)
oscillation_score = sign_changes / max(1, len(differences) - 1)
# Also check parameter oscillation
if len(self.param_history) >= 3:
params = np.array(self.param_history)
param_diffs = np.diff(params, axis=0)
param_sign_changes = np.mean(
[
np.sum(np.diff(np.sign(param_diffs[:, i])) != 0)
/ max(1, len(param_diffs) - 1)
for i in range(params.shape[1])
]
)
oscillation_score = max(oscillation_score, param_sign_changes)
return oscillation_score > self.oscillation_threshold, oscillation_score
[docs]
def detect_stagnation(self) -> tuple[bool, float]:
"""Detect stagnation in optimization.
Returns
-------
is_stagnant : bool
Whether stagnation is detected
stagnation_score : float
Stagnation severity score
"""
if len(self.cost_history) < min(5, self.window_size):
return False, 0.0
recent_costs = list(self.cost_history)[-5:]
cost_variance = float(np.var(recent_costs))
cost_mean = float(np.mean(recent_costs))
if cost_mean == 0:
relative_variance = cost_variance
else:
relative_variance = cost_variance / (abs(cost_mean) + 1e-10)
# Check gradient stagnation
if len(self.gradient_history) >= 3:
recent_grads = list(self.gradient_history)[-3:]
grad_stagnation = (
float(np.mean(recent_grads)) < self.stagnation_threshold * 10
)
else:
grad_stagnation = False
is_stagnant = relative_variance < self.stagnation_threshold or grad_stagnation
stagnation_score = 1.0 - min(1.0, relative_variance / self.stagnation_threshold)
return is_stagnant, stagnation_score
[docs]
def detect_divergence(self) -> tuple[bool, float]:
"""Detect divergence in optimization.
Returns
-------
is_diverging : bool
Whether divergence is detected
divergence_score : float
Divergence severity score
"""
if len(self.cost_history) < 3:
return False, 0.0
costs = np.array(self.cost_history)
# Check if cost is increasing
if len(costs) >= 5:
recent = costs[-5:]
older = costs[-10:-5] if len(costs) >= 10 else costs[:5]
mean_recent = np.mean(recent)
mean_older = np.mean(older)
if mean_older > 0:
divergence_ratio = mean_recent / mean_older
is_diverging = divergence_ratio > self.divergence_threshold
divergence_score = min(
1.0, (divergence_ratio - 1.0) / self.divergence_threshold
)
else:
is_diverging = mean_recent > mean_older * self.divergence_threshold
divergence_score = 0.5 if is_diverging else 0.0
else:
is_diverging = costs[-1] > costs[0] * self.divergence_threshold
divergence_score = 0.5 if is_diverging else 0.0
return is_diverging, divergence_score
[docs]
def get_convergence_rate(self) -> float | None:
"""Estimate convergence rate.
Returns
-------
rate : float or None
Convergence rate (negative = diverging, positive = converging)
"""
if len(self.cost_history) < 3:
return None
costs = np.array(self.cost_history)
# Fit exponential decay: cost = a * exp(-rate * iteration)
iterations = np.arange(len(costs))
# Use log-linear regression for stability
with np.errstate(divide="ignore", invalid="ignore"):
log_costs = np.log(np.abs(costs) + 1e-10)
if np.all(np.isfinite(log_costs)):
# Simple linear regression
A = np.vstack([iterations, np.ones(len(iterations))]).T
rate, _ = np.linalg.lstsq(A, log_costs, rcond=None)[0]
return -rate # Negative because we want positive rate for convergence
return None
[docs]
class OptimizationDiagnostics:
"""Comprehensive optimization diagnostics and reporting.
Tracks:
- Iteration data (parameters, cost, gradients)
- Convergence metrics
- Memory usage
- Timing information
- Numerical stability indicators
"""
[docs]
def __init__(self, verbosity: int = 1):
"""Initialize diagnostics system.
Parameters
----------
verbosity : int
Verbosity level:
- 0: Minimal diagnostics (no condition number computation)
- 1: Normal (cheap 1-norm condition estimate, O(nm))
- 2: Detailed (full SVD condition number, O(mn²))
"""
self.iteration_data: deque[dict[str, Any]] = deque(maxlen=10_000)
self.convergence_monitor = ConvergenceMonitor()
self.start_time: float | None = None
self.verbosity = verbosity
# Problem detection
self.warnings_issued: list[str] = []
self.numerical_issues: deque[str] = deque(maxlen=1_000)
# Performance metrics
self.function_eval_count = 0
self.jacobian_eval_count = 0
# Memory tracking
self.peak_memory: float = 0.0
self.initial_memory: float = self._get_memory_usage()
# Problem metadata (initialized by start_optimization)
self.n_params: int | None = None
self.n_data: int | None = None
self.method: str | None = None
self.loss: str | None = None
self.initial_params: np.ndarray | None = None
self.problem_name: str | None = None
[docs]
def start_optimization(
self,
x0: np.ndarray | None = None,
problem_name: str = "optimization",
*,
n_params: int | None = None,
n_data: int | None = None,
method: str | None = None,
loss: str | None = None,
):
"""Initialize diagnostics for new optimization.
Parameters
----------
x0 : np.ndarray, optional
Initial parameters (legacy API)
problem_name : str
Name for this optimization problem
n_params : int, optional
Number of parameters (new API from LeastSquares)
n_data : int, optional
Number of data points (new API from LeastSquares)
method : str, optional
Optimization method (new API from LeastSquares)
loss : str, optional
Loss function name (new API from LeastSquares)
"""
self.problem_name = problem_name
self.start_time = time.time()
self.iteration_data = deque(maxlen=10_000)
self.warnings_issued = []
self.numerical_issues = deque(maxlen=1_000)
self.function_eval_count = 0
self.jacobian_eval_count = 0
# Store problem metadata (new API)
self.n_params = n_params
self.n_data = n_data
self.method = method
self.loss = loss
# Handle both legacy (x0) and new (n_params) API
if x0 is not None:
self.initial_params = x0.copy()
elif n_params is not None:
# Create placeholder for initial params when only n_params provided
self.initial_params = np.zeros(n_params)
else:
self.initial_params = None
self.initial_memory = self._get_memory_usage()
[docs]
def record_iteration(
self,
iteration: int,
x: np.ndarray,
cost: float,
gradient: np.ndarray | None = None,
jacobian: np.ndarray | None = None,
step_size: float | None = None,
method_info: dict | None = None,
):
"""Record data for current iteration.
Parameters
----------
iteration : int
Iteration number
x : np.ndarray
Current parameters
cost : float
Current cost value
gradient : np.ndarray, optional
Current gradient
jacobian : np.ndarray, optional
Current Jacobian matrix
step_size : float, optional
Step size taken
method_info : dict, optional
Algorithm-specific information
"""
current_time = time.time()
elapsed = current_time - self.start_time if self.start_time else 0
# Basic data
data = {
"iteration": iteration,
"parameters": x.copy(),
"cost": cost,
"timestamp": current_time,
"elapsed_time": elapsed,
"memory_usage_mb": self._get_memory_usage(),
}
# Gradient information
if gradient is not None:
grad_norm = np.linalg.norm(gradient)
data["gradient_norm"] = grad_norm
data["gradient_max"] = np.max(np.abs(gradient))
# Check for numerical issues
if not np.all(np.isfinite(gradient)):
self.numerical_issues.append(
f"Iteration {iteration}: Non-finite gradient"
)
elif grad_norm > 1e10:
self.numerical_issues.append(
f"Iteration {iteration}: Extremely large gradient"
)
# Jacobian information
if jacobian is not None:
self.jacobian_eval_count += 1
# Only compute condition number if verbosity > 0
if self.verbosity > 0:
try:
if self.verbosity >= 2:
# Full SVD condition number (expensive: O(mn²))
svd_vals = np.linalg.svdvals(jacobian)
condition_number = svd_vals[0] / (svd_vals[-1] + 1e-10)
else:
# Cheap 1-norm condition estimate (O(nm), ~50-90% faster)
# Uses matrix norms instead of full SVD decomposition
condition_number = np.linalg.cond(jacobian, p=1)
data["jacobian_condition"] = condition_number
if condition_number > 1e12:
self.numerical_issues.append(
f"Iteration {iteration}: Ill-conditioned Jacobian (cond={condition_number:.2e})"
)
except (np.linalg.LinAlgError, ValueError):
data["jacobian_condition"] = np.inf
# Step information
if step_size is not None:
data["step_size"] = step_size
# Method-specific info
if method_info:
data["method_info"] = method_info
self.iteration_data.append(data)
self.function_eval_count += 1
# Update convergence monitor
self.convergence_monitor.update(cost, x, gradient, step_size)
# Check for convergence issues
self._check_convergence_health(iteration)
# Update peak memory
current_memory = self._get_memory_usage()
self.peak_memory = max(self.peak_memory, current_memory)
[docs]
def record_event(self, event_type: str, data: dict[str, Any] | None = None):
"""Record a special event during optimization.
Parameters
----------
event_type : str
Type of event (e.g., 'recovery_success', 'recovery_failed')
data : dict, optional
Additional event data
"""
# Store in warnings if it's a warning/error event
if "failed" in event_type or "error" in event_type:
self.warnings_issued.append(f"{event_type}: {data}")
def _check_convergence_health(self, iteration: int):
"""Check for convergence problems and issue warnings.
Parameters
----------
iteration : int
Current iteration number
"""
# Only check every few iterations to avoid spam
if iteration % 5 != 0 or iteration < 10:
return
# Check oscillation
is_oscillating, osc_score = self.convergence_monitor.detect_oscillation()
if is_oscillating and "oscillation" not in self.warnings_issued:
warnings.warn(
f"Optimization may be oscillating (score={osc_score:.2f}). "
"Consider reducing step size or changing algorithm.",
RuntimeWarning,
)
self.warnings_issued.append("oscillation")
# Check stagnation
is_stagnant, stag_score = self.convergence_monitor.detect_stagnation()
if is_stagnant and "stagnation" not in self.warnings_issued:
warnings.warn(
f"Optimization may be stagnant (score={stag_score:.2f}). "
"Consider relaxing tolerances or perturbing parameters.",
RuntimeWarning,
)
self.warnings_issued.append("stagnation")
# Check divergence
is_diverging, div_score = self.convergence_monitor.detect_divergence()
if is_diverging and "divergence" not in self.warnings_issued:
warnings.warn(
f"Optimization may be diverging (score={div_score:.2f}). "
"Consider better initial guess or different algorithm.",
RuntimeWarning,
)
self.warnings_issued.append("divergence")
def _get_memory_usage(self) -> float:
"""Get current memory usage in MB.
Returns
-------
memory_mb : float
Memory usage in megabytes
"""
if HAS_PSUTIL:
try:
process = psutil.Process()
return process.memory_info().rss / 1024 / 1024
except (OSError, AttributeError):
return 0.0
return 0.0
[docs]
def get_summary_statistics(self) -> dict:
"""Get summary statistics for the optimization.
Returns
-------
stats : dict
Summary statistics
"""
if not self.iteration_data:
return {}
costs = [d["cost"] for d in self.iteration_data]
stats = {
"total_iterations": len(self.iteration_data),
"function_evaluations": self.function_eval_count,
"jacobian_evaluations": self.jacobian_eval_count,
"initial_cost": costs[0],
"final_cost": costs[-1],
"cost_reduction": costs[0] - costs[-1],
"relative_cost_reduction": (costs[0] - costs[-1]) / (abs(costs[0]) + 1e-10),
"min_cost": min(costs),
"max_cost": max(costs),
}
# Timing
if self.start_time and self.iteration_data:
total_time = self.iteration_data[-1]["elapsed_time"]
stats["total_time_seconds"] = total_time
stats["time_per_iteration"] = total_time / len(self.iteration_data)
# Memory
stats["peak_memory_mb"] = self.peak_memory
stats["memory_increase_mb"] = self.peak_memory - self.initial_memory
# Convergence rate
conv_rate = self.convergence_monitor.get_convergence_rate()
if conv_rate is not None:
stats["convergence_rate"] = conv_rate
# Gradient info
if any("gradient_norm" in d for d in self.iteration_data):
grad_norms = [
d["gradient_norm"] for d in self.iteration_data if "gradient_norm" in d
]
stats["initial_gradient_norm"] = grad_norms[0]
stats["final_gradient_norm"] = grad_norms[-1]
stats["min_gradient_norm"] = min(grad_norms)
# Condition number
if any("jacobian_condition" in d for d in self.iteration_data):
cond_numbers = [
d["jacobian_condition"]
for d in self.iteration_data
if "jacobian_condition" in d
]
stats["max_condition_number"] = max(cond_numbers)
stats["mean_condition_number"] = np.mean(cond_numbers)
# Problems detected
stats["warnings_issued"] = self.warnings_issued.copy()
stats["numerical_issues"] = len(self.numerical_issues)
return stats
[docs]
def generate_report(self, verbose: bool = True) -> str:
"""Generate human-readable optimization report.
Parameters
----------
verbose : bool
Whether to include detailed information
Returns
-------
report : str
Formatted report
"""
stats = self.get_summary_statistics()
if not stats:
return "No optimization data available"
lines = []
lines.append("=" * 60)
lines.append(f"Optimization Report: {getattr(self, 'problem_name', 'Unknown')}")
lines.append("=" * 60)
# Basic metrics
lines.append("\n--- Performance Metrics ---")
lines.append(f"Total iterations: {stats.get('total_iterations', 0)}")
lines.append(f"Function evaluations: {stats.get('function_evaluations', 0)}")
lines.append(f"Jacobian evaluations: {stats.get('jacobian_evaluations', 0)}")
# Cost reduction
lines.append("\n--- Cost Reduction ---")
lines.append(f"Initial cost: {stats.get('initial_cost', 0):.6e}")
lines.append(f"Final cost: {stats.get('final_cost', 0):.6e}")
lines.append(f"Absolute reduction: {stats.get('cost_reduction', 0):.6e}")
lines.append(
f"Relative reduction: {stats.get('relative_cost_reduction', 0) * 100:.2f}%"
)
# Convergence
lines.append("\n--- Convergence ---")
if "convergence_rate" in stats:
rate = stats["convergence_rate"]
if rate > 0:
lines.append(f"Convergence rate: {rate:.4f} (converging)")
else:
lines.append(f"Convergence rate: {rate:.4f} (diverging)")
if "final_gradient_norm" in stats:
lines.append(f"Initial gradient norm: {stats['initial_gradient_norm']:.6e}")
lines.append(f"Final gradient norm: {stats['final_gradient_norm']:.6e}")
# Numerical stability
if "max_condition_number" in stats:
lines.append("\n--- Numerical Stability ---")
lines.append(f"Max condition number: {stats['max_condition_number']:.2e}")
lines.append(f"Mean condition number: {stats['mean_condition_number']:.2e}")
# Timing
if "total_time_seconds" in stats:
lines.append("\n--- Timing ---")
lines.append(f"Total time: {stats['total_time_seconds']:.2f} seconds")
lines.append(
f"Time per iteration: {stats['time_per_iteration'] * 1000:.1f} ms"
)
# Memory
lines.append("\n--- Memory Usage ---")
lines.append(f"Peak memory: {stats.get('peak_memory_mb', 0):.1f} MB")
lines.append(f"Memory increase: {stats.get('memory_increase_mb', 0):.1f} MB")
# Problems
if verbose:
if stats.get("warnings_issued"):
lines.append("\n--- Warnings ---")
lines.extend(f" • {warning}" for warning in stats["warnings_issued"])
if self.numerical_issues:
lines.append("\n--- Numerical Issues ---")
lines.extend(
f" • {issue}" for issue in list(self.numerical_issues)[:5]
) # Show first 5
if len(self.numerical_issues) > 5:
lines.append(f" ... and {len(self.numerical_issues) - 5} more")
lines.append("\n" + "=" * 60)
return "\n".join(lines)
[docs]
def plot_convergence(self, save_path: str | None = None):
"""Plot convergence history.
Parameters
----------
save_path : str, optional
Path to save plot
"""
try:
import matplotlib.pyplot as plt
except ImportError:
warnings.warn("matplotlib not available, cannot plot convergence")
return
if not self.iteration_data:
return
_fig, axes = plt.subplots(2, 2, figsize=(12, 8))
iterations = [d["iteration"] for d in self.iteration_data]
# Cost history
costs = [d["cost"] for d in self.iteration_data]
axes[0, 0].semilogy(iterations, costs, "b-")
axes[0, 0].set_xlabel("Iteration")
axes[0, 0].set_ylabel("Cost")
axes[0, 0].set_title("Cost History")
axes[0, 0].grid(True)
# Gradient norm
if any("gradient_norm" in d for d in self.iteration_data):
grad_norms = [d.get("gradient_norm", np.nan) for d in self.iteration_data]
axes[0, 1].semilogy(iterations, grad_norms, "r-")
axes[0, 1].set_xlabel("Iteration")
axes[0, 1].set_ylabel("Gradient Norm")
axes[0, 1].set_title("Gradient Norm History")
axes[0, 1].grid(True)
# Step size
if any("step_size" in d for d in self.iteration_data):
step_sizes = [d.get("step_size", np.nan) for d in self.iteration_data]
axes[1, 0].plot(iterations, step_sizes, "g-")
axes[1, 0].set_xlabel("Iteration")
axes[1, 0].set_ylabel("Step Size")
axes[1, 0].set_title("Step Size History")
axes[1, 0].grid(True)
# Memory usage
memory = [d["memory_usage_mb"] for d in self.iteration_data]
axes[1, 1].plot(iterations, memory, "m-")
axes[1, 1].set_xlabel("Iteration")
axes[1, 1].set_ylabel("Memory (MB)")
axes[1, 1].set_title("Memory Usage")
axes[1, 1].grid(True)
plt.suptitle(
f"Optimization Convergence: {getattr(self, 'problem_name', 'Unknown')}"
)
plt.tight_layout()
if save_path:
plt.savefig(save_path)
else:
plt.show()
# Global diagnostics instance
_global_diagnostics: OptimizationDiagnostics | None = None
_global_diagnostics_lock = threading.Lock()
[docs]
def get_diagnostics(verbosity: int | None = None) -> OptimizationDiagnostics:
"""Get global diagnostics instance.
Parameters
----------
verbosity : int, optional
Verbosity level for new instance (0=minimal, 1=normal, 2=detailed).
Only used when creating a new instance.
Returns
-------
diagnostics : OptimizationDiagnostics
Global diagnostics instance
"""
global _global_diagnostics # noqa: PLW0603
if _global_diagnostics is None:
with _global_diagnostics_lock:
if _global_diagnostics is None:
_global_diagnostics = OptimizationDiagnostics(
verbosity=verbosity if verbosity is not None else 1
)
return _global_diagnostics
[docs]
def reset_diagnostics(verbosity: int = 1):
"""Reset global diagnostics.
Parameters
----------
verbosity : int
Verbosity level (0=minimal, 1=normal, 2=detailed)
"""
global _global_diagnostics # noqa: PLW0603
with _global_diagnostics_lock:
_global_diagnostics = OptimizationDiagnostics(verbosity=verbosity)