nlsq.workflow

Memory-based workflow system for automatic optimization strategy selection.

Changed in version 0.5.5: The tier-based workflow system was replaced with a unified memory-based approach. MemoryBudgetSelector replaces auto_select_workflow(), and strategy selection is now driven entirely by memory budget computation.

Overview

The workflow module provides:

  • MemoryBudget: Dataclass for computing and storing memory estimates

  • MemoryBudgetSelector: Automatic strategy selection based on memory analysis

  • OptimizationGoal: Optimization objectives (FAST, ROBUST, GLOBAL, MEMORY_EFFICIENT, QUALITY)

  • calculate_adaptive_tolerances: Dataset-size-aware tolerance computation

Quick Start

from nlsq import fit, curve_fit
from nlsq.core.workflow import MemoryBudget, MemoryBudgetSelector, OptimizationGoal
import jax.numpy as jnp
import numpy as np


def model(x, a, b, c):
    return a * jnp.exp(-b * x) + c


x = np.linspace(0, 10, 1_000_000)
y = 2.0 * np.exp(-0.5 * x) + 0.3 + np.random.normal(0, 0.05, len(x))

# Automatic selection via fit() (recommended)
result = fit(model, x, y, p0=[1, 1, 0], workflow="auto")

# Automatic selection via curve_fit()
popt, pcov = curve_fit(model, x, y, p0=[1, 1, 0], method="auto")

# Direct use of MemoryBudgetSelector
selector = MemoryBudgetSelector(safety_factor=0.75)
strategy, config = selector.select(
    n_points=len(x),
    n_params=3,
    memory_limit_gb=16.0,  # Optional override
)
print(f"Selected strategy: {strategy}")

# Inspect memory budget
budget = MemoryBudget.compute(n_points=len(x), n_params=3)
print(f"Peak memory: {budget.peak_gb:.2f} GB")
print(f"Fits in memory: {budget.fits_in_memory}")

Memory Budget Classes

MemoryBudget

class nlsq.core.workflow.MemoryBudget(available_gb, threshold_gb, data_gb, jacobian_gb, peak_gb)[source]

Bases: object

Computed memory budget for optimizer selection.

This immutable dataclass represents the computed memory requirements and available resources for automatic optimizer strategy selection. Use the compute() factory method to create instances.

available_gb

Available system memory in GB (CPU or GPU depending on target).

Type:

float

threshold_gb

Safe memory threshold = available_gb × safety_factor.

Type:

float

data_gb

Memory required for data arrays (x_data, y_data).

Type:

float

jacobian_gb

Memory required for full Jacobian matrix.

Type:

float

peak_gb

Estimated peak memory = data_gb + 1.3 × jacobian_gb + solver overhead.

Type:

float

Examples

>>> budget = MemoryBudget.compute(n_points=10_000_000, n_params=10)
>>> print(f"Available: {budget.available_gb:.1f} GB")
>>> print(f"Peak estimate: {budget.peak_gb:.2f} GB")
>>> print(f"Fits in memory: {budget.fits_in_memory}")
available_gb: float
threshold_gb: float
data_gb: float
jacobian_gb: float
peak_gb: float
property fits_in_memory: bool

Check if estimated peak memory fits within safe threshold.

Returns:

True if peak_gb <= threshold_gb.

Return type:

bool

property data_fits: bool

Check if data arrays alone fit within safe threshold.

Returns:

True if data_gb <= threshold_gb.

Return type:

bool

classmethod compute(n_points, n_params, n_features=1, dtype_bytes=8, safety_factor=0.75, memory_limit_gb=None, use_gpu=False)[source]

Compute memory budget for a given dataset size.

Parameters:
  • n_points (int) – Number of data points.

  • n_params (int) – Number of fit parameters.

  • n_features (int, default=1) – Number of features in x_data (dimensions).

  • dtype_bytes (int, default=8) – Bytes per element (8 for float64, 4 for float32).

  • safety_factor (float, default=0.75) – Memory safety factor (0.75 means use 75% of available).

  • memory_limit_gb (float | None, default=None) – Override memory limit in GB. If None, auto-detect.

  • use_gpu (bool, default=False) – If True, use GPU memory instead of CPU memory.

Returns:

Computed memory budget with all fields populated.

Return type:

MemoryBudget

Raises:

ValueError – If n_points <= 0, n_params <= 0, or safety_factor not in (0, 1].

Examples

>>> budget = MemoryBudget.compute(n_points=1_000_000, n_params=5)
>>> budget.fits_in_memory
True
__init__(available_gb, threshold_gb, data_gb, jacobian_gb, peak_gb)

Fields:

Field

Description

available_gb

Total available memory (CPU or GPU) in GB

threshold_gb

Safe threshold (available × safety_factor)

data_gb

Estimated memory for data arrays (x, y)

jacobian_gb

Estimated memory for Jacobian matrix

peak_gb

Total peak memory estimate

Computed Properties:

  • fits_in_memory: True if peak_gb <= threshold_gb

  • data_fits: True if data_gb <= threshold_gb

MemoryBudgetSelector

class nlsq.core.workflow.MemoryBudgetSelector(safety_factor=0.75)[source]

Bases: object

Selects optimal optimizer strategy based on memory budget.

This class computes memory requirements and selects between STREAMING, CHUNKED, and STANDARD strategies based on three sequential memory comparisons.

Decision Tree:
  1. data_gb > threshold_gb → STREAMING (data doesn’t fit)

  2. peak_gb > threshold_gb → CHUNKED (Jacobian doesn’t fit)

  3. else → STANDARD (everything fits)

Parameters:

safety_factor (float, default=0.75) – Memory safety factor (0.75 means use 75% of available memory).

Examples

>>> selector = MemoryBudgetSelector(safety_factor=0.75)
>>> strategy, config = selector.select(n_points=5_000_000, n_params=10)
>>> if strategy == "streaming":
...     # Use HybridStreamingOptimizer with config
...     pass
>>> elif strategy == "chunked":
...     # Use LargeDatasetFitter with config
...     pass
>>> else:
...     # Use standard curve_fit()
...     pass
__init__(safety_factor=0.75)[source]

Initialize selector with safety factor.

Parameters:

safety_factor (float, default=0.75) – Memory safety factor (0.75 means use 75% of available memory).

select(n_points, n_params, n_features=1, memory_limit_gb=None, goal=None, use_gpu=False, verbose=False)[source]

Select optimal optimizer strategy based on memory budget.

Parameters:
  • n_points (int) – Number of data points.

  • n_params (int) – Number of fit parameters.

  • n_features (int, default=1) – Number of features in x_data.

  • memory_limit_gb (float | None, default=None) – Override memory limit in GB. If None, auto-detect.

  • goal (OptimizationGoal | None, default=None) – Optimization goal (affects tolerances, not strategy selection).

  • use_gpu (bool, default=False) – If True, use GPU memory instead of CPU memory.

  • verbose (bool, default=False) – If True, log memory budget details and strategy selection reason.

Returns:

  • strategy: “streaming”, “chunked”, or “standard”

  • config: HybridStreamingConfig, LDMemoryConfig, or None

Return type:

tuple[str, config]

Raises:

ValueError – If n_points <= 0 or n_params <= 0.

Strategy Selection Logic:

if data_gb > threshold_gb:
    return "streaming"  # Data too large for memory
elif peak_gb > threshold_gb:
    return "chunked"    # Jacobian too large, chunk the computation
else:
    return "standard"   # Everything fits, use direct curve_fit()

Enumerations

OptimizationGoal

class nlsq.core.workflow.OptimizationGoal(*values)[source]

Bases: Enum

Optimization goals that influence workflow selection and tolerances.

Each goal represents a different optimization priority, affecting: - Convergence tolerances (gtol, ftol, xtol) - Multi-start enablement - Memory/speed tradeoffs

FAST

Prioritize speed with local optimization only. Uses one tier looser tolerances, skips multi-start. Best for: quick exploration, well-conditioned problems.

Type:

auto

ROBUST

Standard tolerances with multi-start for better global optimum. Uses dataset-appropriate tolerances, enables multi-start via MultiStartOrchestrator. Best for: production use, unknown problem conditioning.

Type:

auto

GLOBAL

Synonym for ROBUST. Emphasizes global optimization. Same behavior as ROBUST, provided for semantic clarity.

Type:

auto

MEMORY_EFFICIENT

Minimize memory usage with standard tolerances. Prioritizes streaming/chunking with smaller chunk sizes. Best for: memory-constrained environments, very large datasets.

Type:

auto

QUALITY

Highest precision/accuracy as TOP PRIORITY. Uses one tier tighter tolerances, enables multi-start, runs validation passes. Best for: publication-quality results, critical applications.

Type:

auto

FAST = 1
ROBUST = 2
GLOBAL = 3
MEMORY_EFFICIENT = 4
QUALITY = 5
classmethod normalize(goal)[source]

Normalize GLOBAL to ROBUST since they have same behavior.

Parameters:

goal (OptimizationGoal) – The goal to normalize.

Returns:

ROBUST if goal was GLOBAL, otherwise the original goal.

Return type:

OptimizationGoal

Goal

Description

FAST

Prioritize speed. Uses one tier looser tolerances, skips multi-start.

ROBUST

Standard tolerances with multi-start for better global optimum.

GLOBAL

Synonym for ROBUST. Emphasizes global optimization.

MEMORY_EFFICIENT

Minimize memory usage with standard tolerances.

QUALITY

Highest precision. Uses one tier tighter tolerances, enables multi-start.

Named Workflow Presets

The fit() function accepts named presets via the workflow parameter:

Preset

Strategy

Tolerance

Description

"auto"

Memory-based

Adaptive

Automatic selection based on memory budget

"standard"

standard

1e-8

Default curve_fit() behavior, no multi-start

"quality"

standard

1e-10

Highest precision with 20-point multi-start

"fast"

standard

1e-6

Speed-optimized, no multi-start

"large_robust"

chunked

1e-8

Chunked processing with 10-point multi-start

"streaming"

streaming

1e-7

AdaptiveHybridStreamingOptimizer for huge datasets

"hpc_distributed"

streaming

1e-6

Multi-GPU/node HPC configuration with checkpointing

Usage:

from nlsq import fit

# Use automatic memory-based selection
result = fit(model, x, y, p0=[1, 1, 0], workflow="auto")

# Use a named preset
result = fit(model, x, y, p0=[1, 1, 0], workflow="quality")

# Override memory detection
result = fit(model, x, y, p0=[1, 1, 0], workflow="auto", memory_limit_gb=8.0)

Adaptive Tolerances

The workflow system uses adaptive tolerances based on dataset size:

Dataset Size

Points

Default Tolerance

Notes

TINY

< 1,000

1e-12

Maximum precision

SMALL

1,000 - 10,000

1e-10

High precision

MEDIUM

10,000 - 100,000

1e-9

Balanced

LARGE

100,000 - 1,000,000

1e-8

Standard (NLSQ default)

VERY_LARGE

1M - 10M

1e-7

Reduced precision

HUGE

10M - 100M

1e-6

Streaming mode

MASSIVE

> 100M

1e-5

Streaming with checkpoints

Goal-Based Adjustments:

  • QUALITY: Uses one tier tighter tolerances

  • FAST: Uses one tier looser tolerances

  • ROBUST/GLOBAL/MEMORY_EFFICIENT: Uses standard tolerances

from nlsq.core.workflow import calculate_adaptive_tolerances, OptimizationGoal

# 5M points with QUALITY goal
tols = calculate_adaptive_tolerances(5_000_000, goal=OptimizationGoal.QUALITY)
print(tols)  # {'gtol': 1e-08, 'ftol': 1e-08, 'xtol': 1e-08}

# 5M points with FAST goal
tols = calculate_adaptive_tolerances(5_000_000, goal=OptimizationGoal.FAST)
print(tols)  # {'gtol': 1e-06, 'ftol': 1e-06, 'xtol': 1e-06}

Memory Estimation Details

The system estimates memory requirements for each component:

Component

Formula

Example (10M pts, 10 params)

Data (x, y)

n × (features + 1) × 8

160 MB

Jacobian

n × p × 8

800 MB

JTJ

p² × 8

0.8 KB

SVD working

~0.3 × jacobian

240 MB

Peak

data + 1.3×J + solver

~1.3 GB

The Jacobian matrix dominates memory usage for most problems.

Utility Functions

calculate_adaptive_tolerances

nlsq.core.workflow.calculate_adaptive_tolerances(n_points, goal=None)[source]

Calculate adaptive tolerances based on dataset size and optimization goal.

This function determines appropriate convergence tolerances (gtol, ftol, xtol) for the given dataset size, then applies goal-based adjustments:

  • “quality” goal: Use one tier tighter (lower) tolerances

  • “fast” goal: Use one tier looser (higher) tolerances

  • “robust”/”global”/”memory_efficient”: Use standard tolerances for dataset size

Parameters:
  • n_points (int) – Number of data points in the dataset.

  • goal (OptimizationGoal, optional) – Optimization goal to adjust tolerances. Default: None (use dataset-appropriate).

Returns:

Dictionary with ‘gtol’, ‘ftol’, ‘xtol’ keys and corresponding tolerance values.

Return type:

dict[str, float]

Examples

>>> tols = calculate_adaptive_tolerances(5_000_000)
>>> tols['gtol']
1e-07
>>> tols = calculate_adaptive_tolerances(5_000_000, goal=OptimizationGoal.QUALITY)
>>> tols['gtol']  # One tier tighter
1e-08
>>> tols = calculate_adaptive_tolerances(5_000_000, goal=OptimizationGoal.FAST)
>>> tols['gtol']  # One tier looser
1e-06

create_checkpoint_directory

nlsq.core.workflow.create_checkpoint_directory(base_dir=None)[source]

Create a checkpoint directory with timestamp.

Creates a directory at ./nlsq_checkpoints/YYYYMMDD_HHMMSS/ for storing optimization checkpoints. Integrates with HybridStreamingConfig.enable_checkpoints.

Parameters:

base_dir (str or Path, optional) – Base directory for checkpoints. Default: ./nlsq_checkpoints

Returns:

Absolute path to the created checkpoint directory.

Return type:

str

Examples

>>> checkpoint_dir = create_checkpoint_directory()
>>> # Returns path like './nlsq_checkpoints/20251219_143052/'

Module Contents

Workflow Configuration and Selection Module.

This module provides memory-based optimizer selection and adaptive tolerance calculation for NLSQ curve fitting operations.

Key Components

  • OptimizationGoal enum: Defines optimization priorities (FAST, ROBUST, QUALITY, etc.)

  • MemoryBudget dataclass: Computes memory requirements for optimizer selection

  • MemoryBudgetSelector class: Selects optimal optimizer strategy based on memory

  • calculate_adaptive_tolerances(): Returns size-appropriate convergence tolerances

  • ClusterDetector class: Detects HPC cluster environments (PBS Pro)

Examples

Memory-based optimizer selection:

>>> from nlsq.core.workflow import MemoryBudgetSelector
>>> selector = MemoryBudgetSelector(safety_factor=0.75)
>>> strategy, config = selector.select(n_points=5_000_000, n_params=10)
>>> if strategy == "streaming":
...     pass  # Use HybridStreamingOptimizer
>>> elif strategy == "chunked":
...     pass  # Use LargeDatasetFitter
>>> else:
...     pass  # Use standard curve_fit()

Adaptive tolerance calculation:

>>> from nlsq.core.workflow import calculate_adaptive_tolerances, OptimizationGoal
>>> tols = calculate_adaptive_tolerances(n_points=5_000_000, goal=OptimizationGoal.QUALITY)
>>> tols['gtol']  # Returns tighter tolerance for QUALITY goal
1e-08

Cluster detection for HPC environments:

>>> from nlsq.core.workflow import ClusterDetector
>>> detector = ClusterDetector()
>>> cluster_info = detector.detect()
>>> if cluster_info:
...     print(f"Running on cluster: {cluster_info.total_gpus} GPUs")
class nlsq.core.workflow.ClusterDetector(default_gpus_per_node=8)[source]

Bases: object

Detector for cluster environments and GPU configurations.

This class auto-detects PBS cluster environments via $PBS_NODEFILE and single-node multi-GPU configurations via JAX’s device API.

Supports: - PBS Pro cluster manager - Single-node multi-GPU (2-8 GPUs) - Multi-node HPC clusters (10-100 nodes, 8x A100 GPUs per node)

Examples

>>> detector = ClusterDetector()
>>> cluster_info = detector.detect()
>>> if cluster_info is not None:
...     print(f"Cluster detected: {cluster_info.node_count} nodes")
...     print(f"Total GPUs: {cluster_info.total_gpus}")
... else:
...     print("Not in cluster environment")

Check for PBS specifically:

>>> if detector.is_pbs_environment():
...     cluster_info = detector.detect_pbs()
...     print(f"PBS Job ID: {cluster_info.job_id}")
DEFAULT_GPUS_PER_NODE = 8
__init__(default_gpus_per_node=8)[source]

Initialize ClusterDetector.

Parameters:

default_gpus_per_node (int, optional) – Default number of GPUs per node when not auto-detectable. Default: 8 (for A100 HPC nodes).

detect()[source]

Auto-detect cluster environment.

Tries PBS first, then falls back to local multi-GPU detection. Returns None if not in a cluster environment (single CPU-only machine).

Returns:

ClusterInfo if cluster detected, None otherwise.

Return type:

ClusterInfo or None

Examples

>>> detector = ClusterDetector()
>>> info = detector.detect()
>>> if info:
...     print(f"Running on {info.scheduler} with {info.total_gpus} GPUs")
is_pbs_environment()[source]

Check if running in PBS cluster environment.

Returns:

True if PBS_NODEFILE environment variable is set.

Return type:

bool

detect_pbs()[source]

Detect PBS Pro cluster configuration.

Parses PBS_NODEFILE to determine node count and list. GPU count per node is either auto-detected via JAX or uses default.

Returns:

ClusterInfo with PBS configuration, or None if not in PBS environment.

Return type:

ClusterInfo or None

Notes

PBS_NODEFILE contains one line per allocated processor slot. For GPU jobs, typically each GPU gets one line per node.

detect_local_gpus()[source]

Detect local multi-GPU configuration.

Uses JAX’s device API to enumerate available GPUs on the local node.

Returns:

ClusterInfo with local GPU configuration, or None if detection fails.

Return type:

ClusterInfo or None

class nlsq.core.workflow.ClusterInfo(node_count, gpus_per_node, total_gpus, node_list, scheduler='unknown', job_id=None, interconnect=None)[source]

Bases: object

Information about detected cluster environment.

This dataclass contains information about the cluster configuration, including node count, GPUs per node, and total resources available.

Parameters:
  • node_count (int) – Number of nodes in the cluster.

  • gpus_per_node (int) – Number of GPUs per node.

  • total_gpus (int) – Total number of GPUs across all nodes.

  • node_list (list[str]) – List of node hostnames.

  • scheduler (str) – Cluster scheduler type (‘pbs’, ‘local’, or ‘unknown’).

  • job_id (str | None) – PBS job ID if available.

  • interconnect (str | None) – Interconnect type if detectable (e.g., ‘infiniband’).

Examples

>>> cluster_info = ClusterInfo(
...     node_count=6,
...     gpus_per_node=8,
...     total_gpus=48,
...     node_list=["node01", "node02", "node03", "node04", "node05", "node06"],
...     scheduler="pbs",
...     job_id="12345.pbs_server",
... )
>>> cluster_info.total_gpus
48
node_count: int
gpus_per_node: int
total_gpus: int
node_list: list[str]
scheduler: str
job_id: str | None
interconnect: str | None
to_dict()[source]

Serialize cluster info to dictionary.

Returns:

Dictionary representation of cluster info.

Return type:

dict

classmethod from_dict(d)[source]

Create ClusterInfo from dictionary.

Parameters:

d (dict) – Dictionary with cluster info fields.

Returns:

ClusterInfo instance.

Return type:

ClusterInfo

__init__(node_count, gpus_per_node, total_gpus, node_list, scheduler='unknown', job_id=None, interconnect=None)
class nlsq.core.workflow.MultiGPUConfig(n_devices, shard_axis=0, use_pmap=True, use_pjit=False, per_device_batch_size=10000)[source]

Bases: object

Configuration for multi-GPU data parallelism.

This class holds configuration for distributing data across multiple GPUs using JAX’s pmap/pjit primitives.

Parameters:
  • n_devices (int) – Number of GPU devices to use.

  • shard_axis (int) – Axis along which to shard data. Default: 0 (batch dimension).

  • use_pmap (bool) – Use pmap for data parallelism. Default: True.

  • use_pjit (bool) – Use pjit for more flexible sharding. Default: False.

  • per_device_batch_size (int) – Batch size per device. Default: 10000.

Examples

>>> config = MultiGPUConfig(n_devices=4, per_device_batch_size=5000)
>>> config.total_batch_size
20000
n_devices: int
shard_axis: int
use_pmap: bool
use_pjit: bool
per_device_batch_size: int
property total_batch_size: int

Total batch size across all devices.

to_dict()[source]

Serialize to dictionary.

__init__(n_devices, shard_axis=0, use_pmap=True, use_pjit=False, per_device_batch_size=10000)
nlsq.core.workflow.create_distributed_config(cluster_info)[source]

Create distributed processing configuration for HPC clusters.

Generates configuration suitable for PBS Pro multi-node setup with appropriate chunk sizes, checkpointing, and memory settings.

Parameters:

cluster_info (ClusterInfo) – Cluster information from ClusterDetector.

Returns:

Configuration dictionary for distributed processing.

Return type:

dict

Examples

>>> detector = ClusterDetector()
>>> cluster_info = detector.detect()
>>> if cluster_info:
...     dist_config = create_distributed_config(cluster_info)
...     print(f"Chunk size: {dist_config['chunk_size']}")
nlsq.core.workflow.get_multi_gpu_config(cluster_info=None)[source]

Generate multi-GPU sharding configuration.

Creates a MultiGPUConfig based on detected cluster or local GPU setup.

Parameters:

cluster_info (ClusterInfo, optional) – Cluster information from ClusterDetector. If None, auto-detects.

Returns:

Configuration for multi-GPU processing, or None if no GPUs available.

Return type:

MultiGPUConfig or None

Examples

>>> config = get_multi_gpu_config()
>>> if config:
...     print(f"Using {config.n_devices} GPUs with batch size {config.total_batch_size}")

See Also