Source code for nlsq.streaming.telemetry

"""Telemetry for monitoring defense layer activations.

This module provides telemetry tracking for the 4-layer defense strategy
used during L-BFGS warmup in the adaptive hybrid streaming optimizer.
"""

from __future__ import annotations

import threading
import time
from collections import deque

__all__ = [
    "DefenseLayerTelemetry",
    "get_defense_telemetry",
    "reset_defense_telemetry",
]


[docs] class DefenseLayerTelemetry: """Telemetry for monitoring 4-layer defense strategy activations. Tracks when each defense layer is triggered during warmup to help with production monitoring and tuning. This class maintains thread-safe statistics that can be queried or exported for monitoring dashboards. The 4 layers tracked are: - Layer 1: Warm start detection (skips warmup) - Layer 2: Adaptive step size selection (refinement/careful/exploration) - Layer 3: Cost-increase guard (aborts warmup if loss increases) - Layer 4: Step clipping (limits update magnitude) Attributes ---------- layer1_warm_start_triggers : int Count of warm start detection activations (warmup skipped) layer2_lr_mode_counts : dict[str, int] Counts per LR mode: {"refinement": n, "careful": m, "exploration": k} layer3_cost_guard_triggers : int Count of cost-increase guard aborts layer4_clip_triggers : int Count of step clipping activations total_warmup_calls : int Total number of warmup phase executions """
[docs] def __init__(self) -> None: """Initialize telemetry with zeroed counters.""" self.reset()
[docs] def reset(self) -> None: """Reset all telemetry counters to zero.""" self.layer1_warm_start_triggers: int = 0 self.layer2_lr_mode_counts: dict[str, int] = { "refinement": 0, "careful": 0, "exploration": 0, "fixed": 0, } self.layer3_cost_guard_triggers: int = 0 self.layer4_clip_triggers: int = 0 self.total_warmup_calls: int = 0 # L-BFGS-specific telemetry counters self.lbfgs_history_buffer_fill_events: int = 0 self.lbfgs_line_search_failures: int = 0 # Detailed event log (last N events) self._max_events: int = 1000 self._event_log: deque[dict] = deque(maxlen=self._max_events)
[docs] def record_warmup_start(self) -> None: """Record start of a warmup phase.""" self.total_warmup_calls += 1
[docs] def record_layer1_trigger(self, relative_loss: float, threshold: float) -> None: """Record Layer 1 warm start detection trigger. Parameters ---------- relative_loss : float Relative loss that triggered warm start threshold : float Threshold value that was exceeded """ self.layer1_warm_start_triggers += 1 self._log_event( "layer1_warm_start", {"relative_loss": relative_loss, "threshold": threshold}, )
[docs] def record_layer2_lr_mode(self, mode: str, relative_loss: float) -> None: """Record Layer 2 adaptive LR mode selection. Parameters ---------- mode : str Selected LR mode: "refinement", "careful", "exploration", or "fixed" relative_loss : float Relative loss that determined the mode """ if mode in self.layer2_lr_mode_counts: self.layer2_lr_mode_counts[mode] += 1 self._log_event( "layer2_lr_mode", {"mode": mode, "relative_loss": relative_loss} )
[docs] def record_layer3_trigger( self, cost_ratio: float, tolerance: float, iteration: int ) -> None: """Record Layer 3 cost-increase guard trigger. Parameters ---------- cost_ratio : float Cost increase ratio that triggered the guard tolerance : float Tolerance threshold that was exceeded iteration : int Iteration number when triggered """ self.layer3_cost_guard_triggers += 1 self._log_event( "layer3_cost_guard", {"cost_ratio": cost_ratio, "tolerance": tolerance, "iteration": iteration}, )
[docs] def record_layer4_clip(self, original_norm: float, max_norm: float) -> None: """Record Layer 4 step clipping activation. Parameters ---------- original_norm : float Original update norm before clipping max_norm : float Maximum allowed norm (clipping threshold) """ self.layer4_clip_triggers += 1 self._log_event( "layer4_clip", {"original_norm": original_norm, "max_norm": max_norm} )
[docs] def record_lbfgs_history_fill(self, iteration: int) -> None: """Record L-BFGS history buffer fill event. Called when the L-BFGS history buffer becomes fully populated, signaling transition from cold start to full L-BFGS mode. Parameters ---------- iteration : int Iteration number when history buffer filled """ self.lbfgs_history_buffer_fill_events += 1 self._log_event( "lbfgs_history_fill", {"iteration": iteration}, )
[docs] def record_lbfgs_line_search_failure( self, iteration: int, reason: str = "" ) -> None: """Record L-BFGS line search failure event. Called when the L-BFGS line search fails to find an acceptable step. Parameters ---------- iteration : int Iteration number when line search failed reason : str, optional Reason for line search failure """ self.lbfgs_line_search_failures += 1 self._log_event( "lbfgs_line_search_failure", {"iteration": iteration, "reason": reason}, )
def _log_event(self, event_type: str, data: dict) -> None: """Log an event with timestamp. Parameters ---------- event_type : str Type of event data : dict Event data """ event = {"type": event_type, "timestamp": time.time(), "data": data} self._event_log.append(event)
[docs] def get_trigger_rates(self) -> dict[str, float]: """Get trigger rates as percentage of total warmup calls. Returns ------- dict[str, float] Trigger rates for each layer as percentages (0-100) """ if self.total_warmup_calls == 0: return { "layer1_warm_start_rate": 0.0, "layer2_refinement_rate": 0.0, "layer2_careful_rate": 0.0, "layer2_exploration_rate": 0.0, "layer3_cost_guard_rate": 0.0, "layer4_clip_rate": 0.0, "lbfgs_history_buffer_fill_rate": 0.0, "lbfgs_line_search_failure_rate": 0.0, } total = self.total_warmup_calls return { "layer1_warm_start_rate": 100.0 * self.layer1_warm_start_triggers / total, "layer2_refinement_rate": 100.0 * self.layer2_lr_mode_counts["refinement"] / total, "layer2_careful_rate": 100.0 * self.layer2_lr_mode_counts["careful"] / total, "layer2_exploration_rate": 100.0 * self.layer2_lr_mode_counts["exploration"] / total, "layer3_cost_guard_rate": 100.0 * self.layer3_cost_guard_triggers / total, "layer4_clip_rate": 100.0 * self.layer4_clip_triggers / total, "lbfgs_history_buffer_fill_rate": 100.0 * self.lbfgs_history_buffer_fill_events / total, "lbfgs_line_search_failure_rate": 100.0 * self.lbfgs_line_search_failures / total, }
[docs] def get_summary(self) -> dict: """Get summary statistics for all defense layers. Returns ------- dict Summary with counts and rates for each layer """ rates = self.get_trigger_rates() return { "total_warmup_calls": self.total_warmup_calls, "layer1": { "name": "warm_start_detection", "triggers": self.layer1_warm_start_triggers, "rate_pct": rates["layer1_warm_start_rate"], }, "layer2": { "name": "adaptive_lr_selection", "mode_counts": self.layer2_lr_mode_counts.copy(), "rates_pct": { "refinement": rates["layer2_refinement_rate"], "careful": rates["layer2_careful_rate"], "exploration": rates["layer2_exploration_rate"], }, }, "layer3": { "name": "cost_increase_guard", "triggers": self.layer3_cost_guard_triggers, "rate_pct": rates["layer3_cost_guard_rate"], }, "layer4": { "name": "step_clipping", "triggers": self.layer4_clip_triggers, "rate_pct": rates["layer4_clip_rate"], }, }
[docs] def get_recent_events(self, n: int = 10) -> list[dict]: """Get most recent N events. Parameters ---------- n : int Number of recent events to return Returns ------- list[dict] Most recent events """ return list(self._event_log)[-n:]
[docs] def export_metrics(self) -> dict: """Export metrics in a format suitable for monitoring systems. Returns ------- dict Metrics with consistent naming for Prometheus/Grafana/etc. """ return { "nlsq_defense_warmup_calls_total": self.total_warmup_calls, "nlsq_defense_layer1_triggers_total": self.layer1_warm_start_triggers, "nlsq_defense_layer2_refinement_total": self.layer2_lr_mode_counts[ "refinement" ], "nlsq_defense_layer2_careful_total": self.layer2_lr_mode_counts["careful"], "nlsq_defense_layer2_exploration_total": self.layer2_lr_mode_counts[ "exploration" ], "nlsq_defense_layer3_triggers_total": self.layer3_cost_guard_triggers, "nlsq_defense_layer4_triggers_total": self.layer4_clip_triggers, "nlsq_defense_lbfgs_history_fill_total": self.lbfgs_history_buffer_fill_events, "nlsq_defense_lbfgs_line_search_failures_total": self.lbfgs_line_search_failures, }
# Global telemetry instance for monitoring _defense_telemetry: DefenseLayerTelemetry | None = None _defense_telemetry_lock = threading.Lock()
[docs] def get_defense_telemetry() -> DefenseLayerTelemetry: """Get global defense layer telemetry instance. Returns ------- DefenseLayerTelemetry Global telemetry instance (created on first call) """ global _defense_telemetry # noqa: PLW0603 if _defense_telemetry is None: with _defense_telemetry_lock: if _defense_telemetry is None: _defense_telemetry = DefenseLayerTelemetry() return _defense_telemetry
[docs] def reset_defense_telemetry() -> None: """Reset global defense layer telemetry.""" global _defense_telemetry # noqa: PLW0602 with _defense_telemetry_lock: if _defense_telemetry is not None: _defense_telemetry.reset()