nlsq.workflow¶
Memory-based workflow system for automatic optimization strategy selection.
Changed in version 0.5.5: The tier-based workflow system was replaced with a unified memory-based approach.
MemoryBudgetSelector replaces auto_select_workflow(), and strategy selection
is now driven entirely by memory budget computation.
Overview¶
The workflow module provides:
MemoryBudget: Dataclass for computing and storing memory estimates
MemoryBudgetSelector: Automatic strategy selection based on memory analysis
OptimizationGoal: Optimization objectives (FAST, ROBUST, GLOBAL, MEMORY_EFFICIENT, QUALITY)
calculate_adaptive_tolerances: Dataset-size-aware tolerance computation
Quick Start¶
from nlsq import fit, curve_fit
from nlsq.core.workflow import MemoryBudget, MemoryBudgetSelector, OptimizationGoal
import jax.numpy as jnp
import numpy as np
def model(x, a, b, c):
return a * jnp.exp(-b * x) + c
x = np.linspace(0, 10, 1_000_000)
y = 2.0 * np.exp(-0.5 * x) + 0.3 + np.random.normal(0, 0.05, len(x))
# Automatic selection via fit() (recommended)
result = fit(model, x, y, p0=[1, 1, 0], workflow="auto")
# Automatic selection via curve_fit()
popt, pcov = curve_fit(model, x, y, p0=[1, 1, 0], method="auto")
# Direct use of MemoryBudgetSelector
selector = MemoryBudgetSelector(safety_factor=0.75)
strategy, config = selector.select(
n_points=len(x),
n_params=3,
memory_limit_gb=16.0, # Optional override
)
print(f"Selected strategy: {strategy}")
# Inspect memory budget
budget = MemoryBudget.compute(n_points=len(x), n_params=3)
print(f"Peak memory: {budget.peak_gb:.2f} GB")
print(f"Fits in memory: {budget.fits_in_memory}")
Memory Budget Classes¶
MemoryBudget¶
- class nlsq.core.workflow.MemoryBudget(available_gb, threshold_gb, data_gb, jacobian_gb, peak_gb)[source]
Bases:
objectComputed memory budget for optimizer selection.
This immutable dataclass represents the computed memory requirements and available resources for automatic optimizer strategy selection. Use the compute() factory method to create instances.
- available_gb
Available system memory in GB (CPU or GPU depending on target).
- Type:
- threshold_gb
Safe memory threshold = available_gb × safety_factor.
- Type:
- data_gb
Memory required for data arrays (x_data, y_data).
- Type:
- jacobian_gb
Memory required for full Jacobian matrix.
- Type:
- peak_gb
Estimated peak memory = data_gb + 1.3 × jacobian_gb + solver overhead.
- Type:
Examples
>>> budget = MemoryBudget.compute(n_points=10_000_000, n_params=10) >>> print(f"Available: {budget.available_gb:.1f} GB") >>> print(f"Peak estimate: {budget.peak_gb:.2f} GB") >>> print(f"Fits in memory: {budget.fits_in_memory}")
- available_gb: float
- threshold_gb: float
- data_gb: float
- jacobian_gb: float
- peak_gb: float
- property fits_in_memory: bool
Check if estimated peak memory fits within safe threshold.
- Returns:
True if peak_gb <= threshold_gb.
- Return type:
- property data_fits: bool
Check if data arrays alone fit within safe threshold.
- Returns:
True if data_gb <= threshold_gb.
- Return type:
- classmethod compute(n_points, n_params, n_features=1, dtype_bytes=8, safety_factor=0.75, memory_limit_gb=None, use_gpu=False)[source]
Compute memory budget for a given dataset size.
- Parameters:
n_points (int) – Number of data points.
n_params (int) – Number of fit parameters.
n_features (int, default=1) – Number of features in x_data (dimensions).
dtype_bytes (int, default=8) – Bytes per element (8 for float64, 4 for float32).
safety_factor (float, default=0.75) – Memory safety factor (0.75 means use 75% of available).
memory_limit_gb (float | None, default=None) – Override memory limit in GB. If None, auto-detect.
use_gpu (bool, default=False) – If True, use GPU memory instead of CPU memory.
- Returns:
Computed memory budget with all fields populated.
- Return type:
- Raises:
ValueError – If n_points <= 0, n_params <= 0, or safety_factor not in (0, 1].
Examples
>>> budget = MemoryBudget.compute(n_points=1_000_000, n_params=5) >>> budget.fits_in_memory True
- __init__(available_gb, threshold_gb, data_gb, jacobian_gb, peak_gb)
Fields:
Field |
Description |
|---|---|
|
Total available memory (CPU or GPU) in GB |
|
Safe threshold (available × safety_factor) |
|
Estimated memory for data arrays (x, y) |
|
Estimated memory for Jacobian matrix |
|
Total peak memory estimate |
Computed Properties:
fits_in_memory: True if peak_gb <= threshold_gbdata_fits: True if data_gb <= threshold_gb
MemoryBudgetSelector¶
- class nlsq.core.workflow.MemoryBudgetSelector(safety_factor=0.75)[source]
Bases:
objectSelects optimal optimizer strategy based on memory budget.
This class computes memory requirements and selects between STREAMING, CHUNKED, and STANDARD strategies based on three sequential memory comparisons.
- Decision Tree:
data_gb > threshold_gb → STREAMING (data doesn’t fit)
peak_gb > threshold_gb → CHUNKED (Jacobian doesn’t fit)
else → STANDARD (everything fits)
- Parameters:
safety_factor (float, default=0.75) – Memory safety factor (0.75 means use 75% of available memory).
Examples
>>> selector = MemoryBudgetSelector(safety_factor=0.75) >>> strategy, config = selector.select(n_points=5_000_000, n_params=10) >>> if strategy == "streaming": ... # Use HybridStreamingOptimizer with config ... pass >>> elif strategy == "chunked": ... # Use LargeDatasetFitter with config ... pass >>> else: ... # Use standard curve_fit() ... pass
- __init__(safety_factor=0.75)[source]
Initialize selector with safety factor.
- Parameters:
safety_factor (float, default=0.75) – Memory safety factor (0.75 means use 75% of available memory).
- select(n_points, n_params, n_features=1, memory_limit_gb=None, goal=None, use_gpu=False, verbose=False)[source]
Select optimal optimizer strategy based on memory budget.
- Parameters:
n_points (int) – Number of data points.
n_params (int) – Number of fit parameters.
n_features (int, default=1) – Number of features in x_data.
memory_limit_gb (float | None, default=None) – Override memory limit in GB. If None, auto-detect.
goal (OptimizationGoal | None, default=None) – Optimization goal (affects tolerances, not strategy selection).
use_gpu (bool, default=False) – If True, use GPU memory instead of CPU memory.
verbose (bool, default=False) – If True, log memory budget details and strategy selection reason.
- Returns:
strategy: “streaming”, “chunked”, or “standard”
config: HybridStreamingConfig, LDMemoryConfig, or None
- Return type:
- Raises:
ValueError – If n_points <= 0 or n_params <= 0.
Strategy Selection Logic:
if data_gb > threshold_gb:
return "streaming" # Data too large for memory
elif peak_gb > threshold_gb:
return "chunked" # Jacobian too large, chunk the computation
else:
return "standard" # Everything fits, use direct curve_fit()
Enumerations¶
OptimizationGoal¶
- class nlsq.core.workflow.OptimizationGoal(*values)[source]
Bases:
EnumOptimization goals that influence workflow selection and tolerances.
Each goal represents a different optimization priority, affecting: - Convergence tolerances (gtol, ftol, xtol) - Multi-start enablement - Memory/speed tradeoffs
- FAST
Prioritize speed with local optimization only. Uses one tier looser tolerances, skips multi-start. Best for: quick exploration, well-conditioned problems.
- Type:
auto
- ROBUST
Standard tolerances with multi-start for better global optimum. Uses dataset-appropriate tolerances, enables multi-start via MultiStartOrchestrator. Best for: production use, unknown problem conditioning.
- Type:
auto
- GLOBAL
Synonym for ROBUST. Emphasizes global optimization. Same behavior as ROBUST, provided for semantic clarity.
- Type:
auto
- MEMORY_EFFICIENT
Minimize memory usage with standard tolerances. Prioritizes streaming/chunking with smaller chunk sizes. Best for: memory-constrained environments, very large datasets.
- Type:
auto
- QUALITY
Highest precision/accuracy as TOP PRIORITY. Uses one tier tighter tolerances, enables multi-start, runs validation passes. Best for: publication-quality results, critical applications.
- Type:
auto
- FAST = 1
- ROBUST = 2
- GLOBAL = 3
- MEMORY_EFFICIENT = 4
- QUALITY = 5
- classmethod normalize(goal)[source]
Normalize GLOBAL to ROBUST since they have same behavior.
- Parameters:
goal (OptimizationGoal) – The goal to normalize.
- Returns:
ROBUST if goal was GLOBAL, otherwise the original goal.
- Return type:
OptimizationGoal
Goal |
Description |
|---|---|
FAST |
Prioritize speed. Uses one tier looser tolerances, skips multi-start. |
ROBUST |
Standard tolerances with multi-start for better global optimum. |
GLOBAL |
Synonym for ROBUST. Emphasizes global optimization. |
MEMORY_EFFICIENT |
Minimize memory usage with standard tolerances. |
QUALITY |
Highest precision. Uses one tier tighter tolerances, enables multi-start. |
Named Workflow Presets¶
The fit() function accepts named presets via the workflow parameter:
Preset |
Strategy |
Tolerance |
Description |
|---|---|---|---|
|
Memory-based |
Adaptive |
Automatic selection based on memory budget |
|
standard |
1e-8 |
Default curve_fit() behavior, no multi-start |
|
standard |
1e-10 |
Highest precision with 20-point multi-start |
|
standard |
1e-6 |
Speed-optimized, no multi-start |
|
chunked |
1e-8 |
Chunked processing with 10-point multi-start |
|
streaming |
1e-7 |
AdaptiveHybridStreamingOptimizer for huge datasets |
|
streaming |
1e-6 |
Multi-GPU/node HPC configuration with checkpointing |
Usage:
from nlsq import fit
# Use automatic memory-based selection
result = fit(model, x, y, p0=[1, 1, 0], workflow="auto")
# Use a named preset
result = fit(model, x, y, p0=[1, 1, 0], workflow="quality")
# Override memory detection
result = fit(model, x, y, p0=[1, 1, 0], workflow="auto", memory_limit_gb=8.0)
Adaptive Tolerances¶
The workflow system uses adaptive tolerances based on dataset size:
Dataset Size |
Points |
Default Tolerance |
Notes |
|---|---|---|---|
TINY |
< 1,000 |
1e-12 |
Maximum precision |
SMALL |
1,000 - 10,000 |
1e-10 |
High precision |
MEDIUM |
10,000 - 100,000 |
1e-9 |
Balanced |
LARGE |
100,000 - 1,000,000 |
1e-8 |
Standard (NLSQ default) |
VERY_LARGE |
1M - 10M |
1e-7 |
Reduced precision |
HUGE |
10M - 100M |
1e-6 |
Streaming mode |
MASSIVE |
> 100M |
1e-5 |
Streaming with checkpoints |
Goal-Based Adjustments:
QUALITY: Uses one tier tighter tolerancesFAST: Uses one tier looser tolerancesROBUST/GLOBAL/MEMORY_EFFICIENT: Uses standard tolerances
from nlsq.core.workflow import calculate_adaptive_tolerances, OptimizationGoal
# 5M points with QUALITY goal
tols = calculate_adaptive_tolerances(5_000_000, goal=OptimizationGoal.QUALITY)
print(tols) # {'gtol': 1e-08, 'ftol': 1e-08, 'xtol': 1e-08}
# 5M points with FAST goal
tols = calculate_adaptive_tolerances(5_000_000, goal=OptimizationGoal.FAST)
print(tols) # {'gtol': 1e-06, 'ftol': 1e-06, 'xtol': 1e-06}
Memory Estimation Details¶
The system estimates memory requirements for each component:
Component |
Formula |
Example (10M pts, 10 params) |
|---|---|---|
Data (x, y) |
n × (features + 1) × 8 |
160 MB |
Jacobian |
n × p × 8 |
800 MB |
JTJ |
p² × 8 |
0.8 KB |
SVD working |
~0.3 × jacobian |
240 MB |
Peak |
data + 1.3×J + solver |
~1.3 GB |
The Jacobian matrix dominates memory usage for most problems.
Utility Functions¶
calculate_adaptive_tolerances¶
- nlsq.core.workflow.calculate_adaptive_tolerances(n_points, goal=None)[source]
Calculate adaptive tolerances based on dataset size and optimization goal.
This function determines appropriate convergence tolerances (gtol, ftol, xtol) for the given dataset size, then applies goal-based adjustments:
“quality” goal: Use one tier tighter (lower) tolerances
“fast” goal: Use one tier looser (higher) tolerances
“robust”/”global”/”memory_efficient”: Use standard tolerances for dataset size
- Parameters:
n_points (int) – Number of data points in the dataset.
goal (OptimizationGoal, optional) – Optimization goal to adjust tolerances. Default: None (use dataset-appropriate).
- Returns:
Dictionary with ‘gtol’, ‘ftol’, ‘xtol’ keys and corresponding tolerance values.
- Return type:
Examples
>>> tols = calculate_adaptive_tolerances(5_000_000) >>> tols['gtol'] 1e-07
>>> tols = calculate_adaptive_tolerances(5_000_000, goal=OptimizationGoal.QUALITY) >>> tols['gtol'] # One tier tighter 1e-08
>>> tols = calculate_adaptive_tolerances(5_000_000, goal=OptimizationGoal.FAST) >>> tols['gtol'] # One tier looser 1e-06
create_checkpoint_directory¶
- nlsq.core.workflow.create_checkpoint_directory(base_dir=None)[source]
Create a checkpoint directory with timestamp.
Creates a directory at ./nlsq_checkpoints/YYYYMMDD_HHMMSS/ for storing optimization checkpoints. Integrates with HybridStreamingConfig.enable_checkpoints.
- Parameters:
base_dir (str or Path, optional) – Base directory for checkpoints. Default: ./nlsq_checkpoints
- Returns:
Absolute path to the created checkpoint directory.
- Return type:
Examples
>>> checkpoint_dir = create_checkpoint_directory() >>> # Returns path like './nlsq_checkpoints/20251219_143052/'
Module Contents¶
Workflow Configuration and Selection Module.
This module provides memory-based optimizer selection and adaptive tolerance calculation for NLSQ curve fitting operations.
Key Components¶
OptimizationGoalenum: Defines optimization priorities (FAST, ROBUST, QUALITY, etc.)MemoryBudgetdataclass: Computes memory requirements for optimizer selectionMemoryBudgetSelectorclass: Selects optimal optimizer strategy based on memorycalculate_adaptive_tolerances(): Returns size-appropriate convergence tolerancesClusterDetectorclass: Detects HPC cluster environments (PBS Pro)
Examples
Memory-based optimizer selection:
>>> from nlsq.core.workflow import MemoryBudgetSelector
>>> selector = MemoryBudgetSelector(safety_factor=0.75)
>>> strategy, config = selector.select(n_points=5_000_000, n_params=10)
>>> if strategy == "streaming":
... pass # Use HybridStreamingOptimizer
>>> elif strategy == "chunked":
... pass # Use LargeDatasetFitter
>>> else:
... pass # Use standard curve_fit()
Adaptive tolerance calculation:
>>> from nlsq.core.workflow import calculate_adaptive_tolerances, OptimizationGoal
>>> tols = calculate_adaptive_tolerances(n_points=5_000_000, goal=OptimizationGoal.QUALITY)
>>> tols['gtol'] # Returns tighter tolerance for QUALITY goal
1e-08
Cluster detection for HPC environments:
>>> from nlsq.core.workflow import ClusterDetector
>>> detector = ClusterDetector()
>>> cluster_info = detector.detect()
>>> if cluster_info:
... print(f"Running on cluster: {cluster_info.total_gpus} GPUs")
- class nlsq.core.workflow.ClusterDetector(default_gpus_per_node=8)[source]
Bases:
objectDetector for cluster environments and GPU configurations.
This class auto-detects PBS cluster environments via $PBS_NODEFILE and single-node multi-GPU configurations via JAX’s device API.
Supports: - PBS Pro cluster manager - Single-node multi-GPU (2-8 GPUs) - Multi-node HPC clusters (10-100 nodes, 8x A100 GPUs per node)
Examples
>>> detector = ClusterDetector() >>> cluster_info = detector.detect() >>> if cluster_info is not None: ... print(f"Cluster detected: {cluster_info.node_count} nodes") ... print(f"Total GPUs: {cluster_info.total_gpus}") ... else: ... print("Not in cluster environment")
Check for PBS specifically:
>>> if detector.is_pbs_environment(): ... cluster_info = detector.detect_pbs() ... print(f"PBS Job ID: {cluster_info.job_id}")
- DEFAULT_GPUS_PER_NODE = 8
- __init__(default_gpus_per_node=8)[source]
Initialize ClusterDetector.
- Parameters:
default_gpus_per_node (int, optional) – Default number of GPUs per node when not auto-detectable. Default: 8 (for A100 HPC nodes).
- detect()[source]
Auto-detect cluster environment.
Tries PBS first, then falls back to local multi-GPU detection. Returns None if not in a cluster environment (single CPU-only machine).
- Returns:
ClusterInfo if cluster detected, None otherwise.
- Return type:
ClusterInfo or None
Examples
>>> detector = ClusterDetector() >>> info = detector.detect() >>> if info: ... print(f"Running on {info.scheduler} with {info.total_gpus} GPUs")
- is_pbs_environment()[source]
Check if running in PBS cluster environment.
- Returns:
True if PBS_NODEFILE environment variable is set.
- Return type:
- detect_pbs()[source]
Detect PBS Pro cluster configuration.
Parses PBS_NODEFILE to determine node count and list. GPU count per node is either auto-detected via JAX or uses default.
- Returns:
ClusterInfo with PBS configuration, or None if not in PBS environment.
- Return type:
ClusterInfo or None
Notes
PBS_NODEFILE contains one line per allocated processor slot. For GPU jobs, typically each GPU gets one line per node.
- detect_local_gpus()[source]
Detect local multi-GPU configuration.
Uses JAX’s device API to enumerate available GPUs on the local node.
- Returns:
ClusterInfo with local GPU configuration, or None if detection fails.
- Return type:
ClusterInfo or None
- class nlsq.core.workflow.ClusterInfo(node_count, gpus_per_node, total_gpus, node_list, scheduler='unknown', job_id=None, interconnect=None)[source]
Bases:
objectInformation about detected cluster environment.
This dataclass contains information about the cluster configuration, including node count, GPUs per node, and total resources available.
- Parameters:
node_count (int) – Number of nodes in the cluster.
gpus_per_node (int) – Number of GPUs per node.
total_gpus (int) – Total number of GPUs across all nodes.
scheduler (str) – Cluster scheduler type (‘pbs’, ‘local’, or ‘unknown’).
job_id (str | None) – PBS job ID if available.
interconnect (str | None) – Interconnect type if detectable (e.g., ‘infiniband’).
Examples
>>> cluster_info = ClusterInfo( ... node_count=6, ... gpus_per_node=8, ... total_gpus=48, ... node_list=["node01", "node02", "node03", "node04", "node05", "node06"], ... scheduler="pbs", ... job_id="12345.pbs_server", ... ) >>> cluster_info.total_gpus 48
- node_count: int
- gpus_per_node: int
- total_gpus: int
- scheduler: str
- to_dict()[source]
Serialize cluster info to dictionary.
- Returns:
Dictionary representation of cluster info.
- Return type:
- classmethod from_dict(d)[source]
Create ClusterInfo from dictionary.
- Parameters:
d (dict) – Dictionary with cluster info fields.
- Returns:
ClusterInfo instance.
- Return type:
ClusterInfo
- __init__(node_count, gpus_per_node, total_gpus, node_list, scheduler='unknown', job_id=None, interconnect=None)
- class nlsq.core.workflow.MultiGPUConfig(n_devices, shard_axis=0, use_pmap=True, use_pjit=False, per_device_batch_size=10000)[source]
Bases:
objectConfiguration for multi-GPU data parallelism.
This class holds configuration for distributing data across multiple GPUs using JAX’s pmap/pjit primitives.
- Parameters:
n_devices (int) – Number of GPU devices to use.
shard_axis (int) – Axis along which to shard data. Default: 0 (batch dimension).
use_pmap (bool) – Use pmap for data parallelism. Default: True.
use_pjit (bool) – Use pjit for more flexible sharding. Default: False.
per_device_batch_size (int) – Batch size per device. Default: 10000.
Examples
>>> config = MultiGPUConfig(n_devices=4, per_device_batch_size=5000) >>> config.total_batch_size 20000
- n_devices: int
- shard_axis: int
- use_pmap: bool
- use_pjit: bool
- per_device_batch_size: int
- property total_batch_size: int
Total batch size across all devices.
- to_dict()[source]
Serialize to dictionary.
- __init__(n_devices, shard_axis=0, use_pmap=True, use_pjit=False, per_device_batch_size=10000)
- nlsq.core.workflow.create_distributed_config(cluster_info)[source]
Create distributed processing configuration for HPC clusters.
Generates configuration suitable for PBS Pro multi-node setup with appropriate chunk sizes, checkpointing, and memory settings.
- Parameters:
cluster_info (ClusterInfo) – Cluster information from ClusterDetector.
- Returns:
Configuration dictionary for distributed processing.
- Return type:
Examples
>>> detector = ClusterDetector() >>> cluster_info = detector.detect() >>> if cluster_info: ... dist_config = create_distributed_config(cluster_info) ... print(f"Chunk size: {dist_config['chunk_size']}")
- nlsq.core.workflow.get_multi_gpu_config(cluster_info=None)[source]
Generate multi-GPU sharding configuration.
Creates a MultiGPUConfig based on detected cluster or local GPU setup.
- Parameters:
cluster_info (ClusterInfo, optional) – Cluster information from ClusterDetector. If None, auto-detects.
- Returns:
Configuration for multi-GPU processing, or None if no GPUs available.
- Return type:
MultiGPUConfig or None
Examples
>>> config = get_multi_gpu_config() >>> if config: ... print(f"Using {config.n_devices} GPUs with batch size {config.total_batch_size}")
See Also¶
Workflow System Overview - Workflow system overview
Common Workflows - Common workflow patterns
Configuration Reference - Configuration reference