"""Result exporter module for NLSQ CLI.
This module provides the ResultExporter class for exporting curve fitting
results in multiple formats (JSON, CSV, stdout).
Supported Export Formats
------------------------
- JSON: Full metadata with nested structure
- CSV: Flattened parameter name/value/uncertainty rows
- stdout: JSON format for piping to other tools
Example Usage
-------------
>>> from nlsq.cli.result_exporter import ResultExporter
>>>
>>> exporter = ResultExporter()
>>> result = {"popt": [1.0, 0.5], "pcov": [[0.01, 0], [0, 0.02]], ...}
>>> config = {"export": {"results_file": "output.json", "format": "json"}}
>>> exporter.export(result, config)
"""
import csv
import json
import math
from pathlib import Path
from typing import Any
import numpy as np
from nlsq.cli.errors import CLIError
def _sanitize_nonfinite(obj: Any) -> Any:
"""Recursively replace non-finite floats with None for spec-compliant JSON.
JSON RFC 8259 does not allow NaN or Infinity literals. Python's json module
emits them by default (allow_nan=True), which breaks strict parsers.
Handles: Python float, numpy scalars (np.floating), numpy arrays (converted
to list first so elements are walked), dicts, and lists.
Does not handle other numpy types (np.integer etc.) — those serialize cleanly.
"""
if isinstance(obj, float) and not math.isfinite(obj):
return None
# numpy scalar floats (np.float32, np.float64, …)
if isinstance(obj, np.floating) and not math.isfinite(float(obj)):
return None
# numpy arrays — convert to list so element-level walk applies
if isinstance(obj, np.ndarray):
return _sanitize_nonfinite(obj.tolist())
if isinstance(obj, dict):
return {k: _sanitize_nonfinite(v) for k, v in obj.items()}
if isinstance(obj, list):
return [_sanitize_nonfinite(v) for v in obj]
return obj
[docs]
class NumpyJSONEncoder(json.JSONEncoder):
"""Custom JSON encoder that handles numpy arrays and types."""
[docs]
def default(self, obj: Any) -> Any:
"""Convert numpy types to JSON-serializable types.
Parameters
----------
obj : Any
Object to convert.
Returns
-------
Any
JSON-serializable object.
"""
if isinstance(obj, np.ndarray):
return obj.tolist()
if isinstance(obj, (np.integer, np.int_)):
return int(obj)
if isinstance(obj, (np.floating, np.float64)):
return float(obj)
if isinstance(obj, np.bool_):
return bool(obj)
return super().default(obj)
[docs]
class ResultExporter:
"""Exporter for curve fitting results.
Exports fit results to JSON, CSV, or stdout formats with full
metadata including parameters, covariance, uncertainties, statistics,
and convergence information.
Attributes
----------
None
Methods
-------
export(result, config)
Export fit result to configured format(s).
Examples
--------
>>> exporter = ResultExporter()
>>> result = {"popt": [1.0, 0.5], "pcov": [[0.01, 0], [0, 0.02]]}
>>> config = {"export": {"results_file": "results.json", "format": "json"}}
>>> exporter.export(result, config)
"""
[docs]
def export(self, result: dict[str, Any], config: dict[str, Any]) -> None:
"""Export fit result to configured format(s).
Parameters
----------
result : dict
Fit result dictionary containing:
- popt: Fitted parameters (ndarray or list)
- pcov: Covariance matrix (ndarray or list)
- success: bool indicating fit success
- message: str with convergence message
- nfev: Number of function evaluations
- njev: Number of Jacobian evaluations (optional)
- cost: Final cost value (optional)
- fun: Residual vector (optional)
config : dict
Export configuration containing:
- export.results_file: Output file path
- export.format: "json" or "csv"
- export.stdout: bool to output to stdout
- export.skip_file_on_stdout: bool to skip file when stdout active
- metadata: Workflow metadata (optional)
- model: Model configuration (optional)
Returns
-------
None
Raises
------
CLIError
If export fails due to file or format issues.
"""
export_config = config.get("export", {})
output_format = export_config.get("format", "json").lower()
stdout_mode = export_config.get("stdout", False)
skip_file = export_config.get("skip_file_on_stdout", False)
# Prepare export data
export_data = self._prepare_export_data(result, config)
# Export to stdout if requested
if stdout_mode:
self._export_stdout(export_data)
# Skip file writing if configured
if skip_file:
return
# Get output file path
results_file = export_config.get("results_file")
if results_file is None and not stdout_mode:
raise CLIError(
"No output file specified for export",
suggestion="Set export.results_file in config or use export.stdout: true",
)
if results_file is not None:
output_path = Path(results_file)
# Ensure parent directory exists
output_path.parent.mkdir(parents=True, exist_ok=True)
# Export based on format
if output_format == "json":
self._export_json(export_data, output_path)
elif output_format == "csv":
self._export_csv(export_data, output_path)
else:
raise CLIError(
f"Unsupported export format: {output_format}",
context={"format": output_format},
suggestion="Supported formats are: json, csv",
)
def _prepare_export_data(
self, result: dict[str, Any], config: dict[str, Any]
) -> dict[str, Any]:
"""Prepare export data with full metadata.
Parameters
----------
result : dict
Fit result dictionary.
config : dict
Export configuration.
Returns
-------
dict
Prepared export data with all metadata.
"""
# Extract parameters
popt = result.get("popt", result.get("x", []))
if isinstance(popt, np.ndarray):
popt = popt.tolist()
pcov = result.get("pcov", [])
if isinstance(pcov, np.ndarray):
pcov = pcov.tolist()
# Calculate uncertainties from covariance diagonal
uncertainties = []
if pcov is not None:
pcov_arr = np.asarray(pcov)
# `if pcov:` on an ndarray raises ValueError — use explicit None check.
# Also guard against singular/negative-diagonal covariance (produces NaN).
if pcov_arr.ndim == 2 and pcov_arr.size > 0:
diag = np.diag(pcov_arr)
if np.any(diag < 0) or not np.all(np.isfinite(diag)):
uncertainties = [float("nan")] * len(diag)
else:
uncertainties = np.sqrt(diag).tolist()
# Calculate statistics
statistics = self._calculate_statistics(result)
# Extract convergence info
convergence = {
"iterations": result.get("nfev", 0),
"function_evals": result.get("nfev", 0),
"jacobian_evals": result.get("njev", 0),
"status": "success" if result.get("success", False) else "failed",
"message": result.get("message", ""),
}
if "cost" in result:
convergence["final_cost"] = (
float(result["cost"]) if result["cost"] is not None else None
)
# Extract metadata from config
metadata_config = config.get("metadata", {})
model_config = config.get("model", {})
metadata = {
"workflow_name": metadata_config.get("workflow_name", "unknown"),
"dataset_id": metadata_config.get("dataset_id", "unknown"),
"model_id": model_config.get(
"name", model_config.get("function", "custom")
),
}
# Build export data
export_data = {
"popt": popt,
"pcov": pcov,
"uncertainties": uncertainties,
"statistics": statistics,
"convergence": convergence,
"metadata": metadata,
}
# Include parameter names if available
param_names = config.get("model", {}).get("parameter_names")
if param_names:
export_data["parameter_names"] = param_names
return export_data
def _calculate_statistics(self, result: dict[str, Any]) -> dict[str, Any]:
"""Calculate fit statistics from result.
Parameters
----------
result : dict
Fit result dictionary.
Returns
-------
dict
Statistics dictionary with r_squared, rmse, chi_squared, etc.
"""
statistics: dict[str, Any] = {}
# Get residuals if available
fun = result.get("fun")
if fun is not None:
residuals = np.asarray(fun)
if not np.all(np.isfinite(residuals)):
statistics["statistics_warnings"] = (
"residuals contain non-finite values"
)
else:
# RMSE
statistics["rmse"] = float(np.sqrt(np.mean(residuals**2)))
# Chi-squared (sum of squared residuals)
statistics["chi_squared"] = float(np.sum(residuals**2))
# R-squared if ydata is available — gated here so NaN residuals
# don't silently produce a NaN r_squared outside the warning path.
ydata = result.get("ydata")
if ydata is not None:
y = np.asarray(ydata)
ss_res = np.sum(residuals**2)
ss_tot = np.sum((y - np.mean(y)) ** 2)
if ss_tot > 0:
statistics["r_squared"] = float(1 - ss_res / ss_tot)
# Copy any existing statistics
if "r_squared" in result:
statistics["r_squared"] = float(result["r_squared"])
if "rmse" in result:
statistics["rmse"] = float(result["rmse"])
if "chi_squared" in result:
statistics["chi_squared"] = float(result["chi_squared"])
# Cost is related to sum of squared residuals
if "cost" in result and result["cost"] is not None:
# cost = 0.5 * sum(residuals**2) for least squares
statistics["cost"] = float(result["cost"])
return statistics
def _export_json(self, data: dict[str, Any], output_path: Path) -> None:
"""Export data to JSON format.
Parameters
----------
data : dict
Export data dictionary.
output_path : Path
Output file path.
"""
try:
with open(output_path, "w", encoding="utf-8") as f:
json.dump(
_sanitize_nonfinite(data),
f,
cls=NumpyJSONEncoder,
indent=2,
allow_nan=False,
)
except (OSError, TypeError) as e:
raise CLIError(
f"Failed to write JSON file: {e}",
context={"output_path": str(output_path)},
) from e
def _export_csv(self, data: dict[str, Any], output_path: Path) -> None:
"""Export data to CSV format with flattened rows.
Creates a CSV with parameter name/value/uncertainty rows,
plus statistics as separate rows.
Parameters
----------
data : dict
Export data dictionary.
output_path : Path
Output file path.
"""
popt = data.get("popt", [])
# Copy to avoid mutating the caller's dict in-place
uncertainties = list(data.get("uncertainties", []))
statistics = data.get("statistics", {})
param_names = list(data.get("parameter_names", []))
# Pad uncertainties to match popt length
while len(uncertainties) < len(popt):
uncertainties.append(float("nan"))
# Extend (not replace) param names — preserve any names already provided
while len(param_names) < len(popt):
param_names.append(f"p{len(param_names)}")
try:
with open(output_path, "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
# Write header
writer.writerow(["name", "value", "uncertainty", "type"])
# Write parameter rows
for i, (name, value, uncertainty) in enumerate(
zip(param_names, popt, uncertainties, strict=False)
):
writer.writerow([name, value, uncertainty, "parameter"])
# Write statistics rows
for stat_name, stat_value in statistics.items():
writer.writerow([stat_name, stat_value, "", "statistic"])
# Write metadata
metadata = data.get("metadata", {})
for meta_name, meta_value in metadata.items():
writer.writerow([meta_name, meta_value, "", "metadata"])
# Write convergence info
convergence = data.get("convergence", {})
for conv_name, conv_value in convergence.items():
writer.writerow([conv_name, conv_value, "", "convergence"])
except OSError as e:
raise CLIError(
f"Failed to write CSV file: {e}",
context={"output_path": str(output_path)},
) from e
def _export_stdout(self, data: dict[str, Any]) -> None:
"""Export data to stdout in JSON format.
Parameters
----------
data : dict
Export data dictionary.
"""
json_str = json.dumps(
_sanitize_nonfinite(data), cls=NumpyJSONEncoder, indent=2, allow_nan=False
)
print(json_str)