From d71ccfc68d6c14a748c643b2cbd756b31202c0dd Mon Sep 17 00:00:00 2001 From: Wojciech Tobis Date: Mon, 11 May 2026 16:05:37 +0200 Subject: [PATCH 1/7] fix: set DELTA aggregation temporality on OTLPMetricExporter Co-Authored-By: Claude Opus 4.7 --- solnlib/observability.py | 9 +++++++- tests/unit/test_observability.py | 38 +++++++++++++++++++++++++++++++- 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/solnlib/observability.py b/solnlib/observability.py index d29eaf1a..b3de663b 100644 --- a/solnlib/observability.py +++ b/solnlib/observability.py @@ -484,7 +484,14 @@ def _create_otlp_exporter(self) -> Optional[MetricExporter]: server_cert = f.read() credentials = grpc.ssl_channel_credentials(root_certificates=server_cert) - exporter = OTLPMetricExporter(endpoint=endpoint, credentials=credentials) + exporter = OTLPMetricExporter( + endpoint=endpoint, + credentials=credentials, + preferred_temporality={ + Counter: AggregationTemporality.DELTA, + Histogram: AggregationTemporality.DELTA, + }, + ) self._logger.info("OTLP gRPC exporter configured with TLS for %s", endpoint) return exporter diff --git a/tests/unit/test_observability.py b/tests/unit/test_observability.py index b9950adf..e0c594fb 100644 --- a/tests/unit/test_observability.py +++ b/tests/unit/test_observability.py @@ -18,7 +18,8 @@ from unittest.mock import MagicMock import pytest -from opentelemetry.sdk.metrics.export import MetricExportResult +from opentelemetry.sdk.metrics import Counter, Histogram +from opentelemetry.sdk.metrics.export import AggregationTemporality, MetricExportResult from solnlib.observability import LoggerMetricExporter, ObservabilityService @@ -433,3 +434,38 @@ def test_create_otlp_exporter_returns_exporter_when_cert_present( result = ObservabilityService._create_otlp_exporter(svc) # Assert assert result is mock_exporter + + def test_create_otlp_exporter_uses_delta_temporality( + self, logger, monkeypatch, tmp_path + ): + # Arrange + monkeypatch.setenv("SPOTLIGHT_OTEL_RECEIVER_PORT", "4317") + monkeypatch.setenv("SPLUNK_HOME", str(tmp_path)) + cert_path = tmp_path / "var/packages/data/spotlight-collector" + cert_path.mkdir(parents=True) + (cert_path / "server.crt").write_bytes(b"fake-cert") + monkeypatch.setattr( + "solnlib.observability.grpc.ssl_channel_credentials", + MagicMock(return_value=MagicMock()), + ) + mock_exporter_cls = MagicMock(return_value=MagicMock()) + monkeypatch.setattr( + "solnlib.observability.OTLPMetricExporter", + mock_exporter_cls, + ) + # Do NOT use _make_service — it patches _create_otlp_exporter away. + # Create a bare svc object and call the real method directly. + svc = ObservabilityService( + modinput_type="test-input", + logger=logger, + ta_name="my_ta", + ta_version="1.0.0", + ) + mock_exporter_cls.reset_mock() + # Act + ObservabilityService._create_otlp_exporter(svc) + # Assert — preferred_temporality must set DELTA for Counter and Histogram + _, kwargs = mock_exporter_cls.call_args + temporality = kwargs["preferred_temporality"] + assert temporality[Counter] == AggregationTemporality.DELTA + assert temporality[Histogram] == AggregationTemporality.DELTA From 249fb7fad8c6f92fd32cf8e126d851f02a719a68 Mon Sep 17 00:00:00 2001 From: Wojciech Tobis Date: Mon, 11 May 2026 16:15:01 +0200 Subject: [PATCH 2/7] feat: add flush() method to ObservabilityService Co-Authored-By: Claude Opus 4.7 --- solnlib/observability.py | 20 ++++++++++++++++++-- tests/unit/test_observability.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/solnlib/observability.py b/solnlib/observability.py index b3de663b..3757d2d3 100644 --- a/solnlib/observability.py +++ b/solnlib/observability.py @@ -283,6 +283,7 @@ def __init__( self.event_count_counter: Optional[Counter] = None self.event_bytes_counter: Optional[Counter] = None self._meter: Optional[Meter] = None + self._provider: Optional[MeterProvider] = None try: if ta_name is None or ta_version is None: @@ -313,8 +314,10 @@ def __init__( for exporter in extra_exporters or []: metric_readers.append(PeriodicExportingMetricReader(exporter)) - provider = MeterProvider(resource=resource, metric_readers=metric_readers) - self._meter = provider.get_meter(ta_name, ta_version) + self._provider = MeterProvider( + resource=resource, metric_readers=metric_readers + ) + self._meter = self._provider.get_meter(ta_name, ta_version) self.event_count_counter = self._meter.create_counter( name="splunk.addon.events", @@ -538,3 +541,16 @@ def register_instrument( if self._meter is None: return None return callback(self._meter) + + def flush(self, timeout_millis: float = 30_000) -> None: + """Force-flush all metric readers. + + Should be called before the modular input process exits to ensure + all buffered metrics are exported. + """ + if self._provider is None: + return + try: + self._provider.force_flush(timeout_millis=int(timeout_millis)) + except Exception as e: + self._logger.warning("Failed to flush metrics: %s", e) diff --git a/tests/unit/test_observability.py b/tests/unit/test_observability.py index e0c594fb..18cc29b1 100644 --- a/tests/unit/test_observability.py +++ b/tests/unit/test_observability.py @@ -469,3 +469,31 @@ def test_create_otlp_exporter_uses_delta_temporality( temporality = kwargs["preferred_temporality"] assert temporality[Counter] == AggregationTemporality.DELTA assert temporality[Histogram] == AggregationTemporality.DELTA + + def test_flush_calls_provider_force_flush(self, logger, monkeypatch): + # Arrange + svc = _make_service(logger, monkeypatch) + mock_provider = MagicMock() + svc._provider = mock_provider + # Act + svc.flush(timeout_millis=5_000) + # Assert + mock_provider.force_flush.assert_called_once_with(timeout_millis=5000) + + def test_flush_is_noop_when_provider_missing(self, logger, monkeypatch): + # Arrange + svc = _make_service(logger, monkeypatch) + svc._provider = None + # Act / Assert — must not raise + svc.flush() + + def test_flush_logs_warning_on_exception(self, logger, monkeypatch): + # Arrange + svc = _make_service(logger, monkeypatch) + mock_provider = MagicMock() + mock_provider.force_flush.side_effect = RuntimeError("boom") + svc._provider = mock_provider + # Act + svc.flush() + # Assert + logger.warning.assert_called() From 3a5ef98d66cd267d7c04deea62e7f44ca463fcc3 Mon Sep 17 00:00:00 2001 From: Wojciech Tobis Date: Mon, 11 May 2026 17:33:04 +0200 Subject: [PATCH 3/7] refactor: move grpc imports into _create_otlp_exporter for safe lazy loading Make solnlib.observability importable without grpcio by deferring grpc and OTLPMetricExporter imports to the point where they are actually used inside _create_otlp_exporter. This allows modules to import StanzaObservabilityRecorder in environments where grpcio is not installed. Co-Authored-By: Claude Opus 4.7 --- solnlib/observability.py | 7 ++- tests/unit/test_observability.py | 87 +++++++++++++++++++++++++++----- 2 files changed, 78 insertions(+), 16 deletions(-) diff --git a/solnlib/observability.py b/solnlib/observability.py index 3757d2d3..249bec9f 100644 --- a/solnlib/observability.py +++ b/solnlib/observability.py @@ -60,7 +60,6 @@ import ssl import urllib.request from typing import Callable, Optional, Union -import grpc from .splunkenv import get_conf_stanzas from opentelemetry.metrics import Instrument, Meter from opentelemetry.sdk.metrics import MeterProvider, Counter, Histogram @@ -72,7 +71,6 @@ AggregationTemporality, ) from opentelemetry.sdk.resources import Resource -from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter _Logger = Union[logging.Logger, logging.LoggerAdapter] @@ -452,6 +450,11 @@ def _create_otlp_exporter(self) -> Optional[MetricExporter]: - Any other exception occurs during exporter construction. """ try: + import grpc + from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( + OTLPMetricExporter, + ) + splunk_home = os.environ.get("SPLUNK_HOME", "/opt/splunk") otel_port = self._resolve_otlp_port() diff --git a/tests/unit/test_observability.py b/tests/unit/test_observability.py index 18cc29b1..e7cb295b 100644 --- a/tests/unit/test_observability.py +++ b/tests/unit/test_observability.py @@ -15,6 +15,7 @@ # import logging +import sys from unittest.mock import MagicMock import pytest @@ -414,14 +415,18 @@ def test_create_otlp_exporter_returns_exporter_when_cert_present( cert_path = tmp_path / "var/packages/data/spotlight-collector" cert_path.mkdir(parents=True) (cert_path / "server.crt").write_bytes(b"fake-cert") - monkeypatch.setattr( - "solnlib.observability.grpc.ssl_channel_credentials", - MagicMock(return_value=MagicMock()), - ) + # Patch grpc module directly (it's imported inside _create_otlp_exporter) + mock_grpc = MagicMock() + mock_grpc.ssl_channel_credentials = MagicMock(return_value=MagicMock()) + monkeypatch.setitem(sys.modules, "grpc", mock_grpc) + # Patch OTLPMetricExporter module + mock_otlp_module = MagicMock() mock_exporter = MagicMock() - monkeypatch.setattr( - "solnlib.observability.OTLPMetricExporter", - MagicMock(return_value=mock_exporter), + mock_otlp_module.OTLPMetricExporter = MagicMock(return_value=mock_exporter) + monkeypatch.setitem( + sys.modules, + "opentelemetry.exporter.otlp.proto.grpc.metric_exporter", + mock_otlp_module, ) # Build svc without patching _create_otlp_exporter so the real method is called svc = ObservabilityService( @@ -444,14 +449,18 @@ def test_create_otlp_exporter_uses_delta_temporality( cert_path = tmp_path / "var/packages/data/spotlight-collector" cert_path.mkdir(parents=True) (cert_path / "server.crt").write_bytes(b"fake-cert") - monkeypatch.setattr( - "solnlib.observability.grpc.ssl_channel_credentials", - MagicMock(return_value=MagicMock()), - ) + # Patch grpc module directly (it's imported inside _create_otlp_exporter) + mock_grpc = MagicMock() + mock_grpc.ssl_channel_credentials = MagicMock(return_value=MagicMock()) + monkeypatch.setitem(sys.modules, "grpc", mock_grpc) + # Patch OTLPMetricExporter module mock_exporter_cls = MagicMock(return_value=MagicMock()) - monkeypatch.setattr( - "solnlib.observability.OTLPMetricExporter", - mock_exporter_cls, + mock_otlp_module = MagicMock() + mock_otlp_module.OTLPMetricExporter = mock_exporter_cls + monkeypatch.setitem( + sys.modules, + "opentelemetry.exporter.otlp.proto.grpc.metric_exporter", + mock_otlp_module, ) # Do NOT use _make_service — it patches _create_otlp_exporter away. # Create a bare svc object and call the real method directly. @@ -497,3 +506,53 @@ def test_flush_logs_warning_on_exception(self, logger, monkeypatch): svc.flush() # Assert logger.warning.assert_called() + + def test_module_importable_without_grpc(self, monkeypatch): + # Arrange + import builtins + + # Simulate grpc not being installed by temporarily hiding it + grpc_mod = sys.modules.pop("grpc", None) + otlp_mod = sys.modules.pop( + "opentelemetry.exporter.otlp.proto.grpc.metric_exporter", None + ) + # Also clear the observability module and any submodules + obs_mods_to_clear = [ + k for k in sys.modules.keys() if k.startswith("solnlib.observability") + ] + saved_obs_mods = {k: sys.modules.pop(k) for k in obs_mods_to_clear} + + # Mock ImportError for grpc modules before import + original_import = builtins.__import__ + + def mock_import(name, *args, **kwargs): + if ( + name == "grpc" + or name == "opentelemetry.exporter.otlp.proto.grpc.metric_exporter" + ): + raise ModuleNotFoundError(f"No module named '{name}'") + return original_import(name, *args, **kwargs) + + try: + # Patch __import__ to raise for grpc modules + builtins.__import__ = mock_import + # Act + import solnlib.observability as reimported_obs + + # Assert + assert hasattr(reimported_obs, "ObservabilityService") + assert hasattr(reimported_obs, "LoggerMetricExporter") + except ImportError as e: + pytest.fail( + f"solnlib.observability should be importable without grpcio, but got: {e}" + ) + finally: + builtins.__import__ = original_import + if grpc_mod is not None: + sys.modules["grpc"] = grpc_mod + if otlp_mod is not None: + sys.modules[ + "opentelemetry.exporter.otlp.proto.grpc.metric_exporter" + ] = otlp_mod + for k, v in saved_obs_mods.items(): + sys.modules[k] = v From 02c7d94c7eb0c273cbc40c8a222f8df1c416553c Mon Sep 17 00:00:00 2001 From: Wojciech Tobis Date: Mon, 11 May 2026 17:47:09 +0200 Subject: [PATCH 4/7] feat: add StanzaObservabilityRecorder to solnlib Implements a stanza-scoped observability recorder backed by a shared ObservabilityService singleton cache, enabling efficient per-stanza event metrics collection with automatic flush-on-exit lifecycle management. Co-Authored-By: Claude Opus 4.7 --- solnlib/observability.py | 118 +++++++++++++++---- tests/unit/test_observability.py | 188 ++++++++++++++++++++++++++++++- 2 files changed, 285 insertions(+), 21 deletions(-) diff --git a/solnlib/observability.py b/solnlib/observability.py index 249bec9f..e7a7ad5c 100644 --- a/solnlib/observability.py +++ b/solnlib/observability.py @@ -15,7 +15,7 @@ # """OpenTelemetry observability utilities for Splunk add-ons. -This module provides two public components: +This module provides three public components: - :class:`LoggerMetricExporter` — an OpenTelemetry ``MetricExporter`` that writes every exported data point to a standard Python logger. It is useful @@ -28,38 +28,30 @@ Splunk Spotlight OTLP collector and falls back silently when it is not reachable, so callers never have to handle observability failures themselves. +- :class:`StanzaObservabilityRecorder` — a stanza-scoped recorder that wraps + ``ObservabilityService`` with a per-process singleton cache. Bind it to a + single stanza name and call :meth:`~StanzaObservabilityRecorder.record` after + each batch of ingested events. Use as a context manager for automatic flush + on exit. + Typical usage:: import logging - from solnlib.observability import LoggerMetricExporter, ObservabilityService, ATTR_MODINPUT_NAME + from solnlib.observability import StanzaObservabilityRecorder logger = logging.getLogger(__name__) - obs = ObservabilityService( - modinput_type="my-input", - logger=logger, - ta_name="my_ta", - ta_version="1.0.0", - extra_exporters=[LoggerMetricExporter(logger)], - ) - - # In your event collection loop: - if obs.event_count_counter: - obs.event_count_counter.add( - len(events), {ATTR_MODINPUT_NAME: stanza_name} - ) - if obs.event_bytes_counter: - obs.event_bytes_counter.add( - total_bytes, {ATTR_MODINPUT_NAME: stanza_name} - ) + with StanzaObservabilityRecorder("my-input", logger, stanza_name) as obs: + obs.record(len(events), total_bytes) """ import json import logging import os import ssl +import threading import urllib.request -from typing import Callable, Optional, Union +from typing import Callable, ClassVar, Optional, Union from .splunkenv import get_conf_stanzas from opentelemetry.metrics import Instrument, Meter from opentelemetry.sdk.metrics import MeterProvider, Counter, Histogram @@ -557,3 +549,89 @@ def flush(self, timeout_millis: float = 30_000) -> None: self._provider.force_flush(timeout_millis=int(timeout_millis)) except Exception as e: self._logger.warning("Failed to flush metrics: %s", e) + + +class StanzaObservabilityRecorder: + """Stanza-scoped observability recorder backed by a shared ``ObservabilityService``. + + One ``ObservabilityService`` is created per *modinput_type* per process and + cached for the lifetime of the process. Each ``StanzaObservabilityRecorder`` + instance is bound to a single *stanza_name*, which is automatically included + as the ``splunk.modinput.name`` attribute on every data point. + + Implements the context-manager protocol: ``__exit__`` calls :meth:`flush` + so metrics are exported before the stanza's process exits. + + Example:: + + from solnlib.observability import StanzaObservabilityRecorder + + with StanzaObservabilityRecorder("event-hub", logger, stanza_name) as obs: + obs.record(len(events), total_bytes) + """ + + _instances: ClassVar[dict[str, ObservabilityService]] = {} + _lock: ClassVar[threading.Lock] = threading.Lock() + + def __init__( + self, + modinput_type: str, + logger: _Logger, + stanza_name: str, + ) -> None: + self._stanza_name = stanza_name + self._service = self._get_or_create_service(modinput_type, logger) + self._emit_zero_baseline() + + @classmethod + def _get_or_create_service( + cls, modinput_type: str, logger: _Logger + ) -> ObservabilityService: + with cls._lock: + if modinput_type not in cls._instances: + cls._instances[modinput_type] = ObservabilityService( + modinput_type=modinput_type, + logger=logger, + extra_exporters=[LoggerMetricExporter(logger)], + ) + return cls._instances[modinput_type] + + @classmethod + def get_service(cls, modinput_type: str) -> Optional[ObservabilityService]: + """Return the cached ``ObservabilityService`` for *modinput_type*, or ``None``.""" + with cls._lock: + return cls._instances.get(modinput_type) + + def record( + self, + event_count: int, + byte_count: int, + extra_attrs: Optional[dict] = None, + ) -> None: + """Add *event_count* and *byte_count* to the built-in counters. + + *extra_attrs* are merged with ``{"splunk.modinput.name": stanza_name}`` + before being passed to the counter. Silently no-ops if either counter + is ``None`` (i.e. ``ObservabilityService`` failed to initialise). + """ + attrs = {ATTR_MODINPUT_NAME: self._stanza_name} + if extra_attrs: + attrs.update(extra_attrs) + if self._service.event_count_counter: + self._service.event_count_counter.add(event_count, attributes=attrs) + if self._service.event_bytes_counter: + self._service.event_bytes_counter.add(byte_count, attributes=attrs) + + def flush(self) -> None: + """Force-flush all metric readers via ``ObservabilityService.flush()``.""" + self._service.flush() + + def __enter__(self) -> "StanzaObservabilityRecorder": + return self + + def __exit__(self, *_) -> bool: + self.flush() + return False + + def _emit_zero_baseline(self) -> None: + self.record(0, 0) diff --git a/tests/unit/test_observability.py b/tests/unit/test_observability.py index e7cb295b..36d10c97 100644 --- a/tests/unit/test_observability.py +++ b/tests/unit/test_observability.py @@ -16,7 +16,7 @@ import logging import sys -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest from opentelemetry.sdk.metrics import Counter, Histogram @@ -556,3 +556,189 @@ def mock_import(name, *args, **kwargs): ] = otlp_mod for k, v in saved_obs_mods.items(): sys.modules[k] = v + + +# --------------------------------------------------------------------------- +# StanzaObservabilityRecorder +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=False) +def clear_stanza_recorder_cache(): + """Wipe the class-level singleton cache between tests.""" + from solnlib.observability import StanzaObservabilityRecorder + + StanzaObservabilityRecorder._instances.clear() + yield + StanzaObservabilityRecorder._instances.clear() + + +class TestStanzaObservabilityRecorder: + def _make_recorder(self, monkeypatch, modinput_type="kql", stanza_name="my:stanza"): + monkeypatch.setattr( + "solnlib.observability.ObservabilityService._create_otlp_exporter", + lambda self: None, + ) + from solnlib.observability import StanzaObservabilityRecorder + + return StanzaObservabilityRecorder( + modinput_type, MagicMock(spec=logging.Logger), stanza_name + ) + + def test_singleton_same_modinput_type_reuses_service( + self, monkeypatch, clear_stanza_recorder_cache + ): + from solnlib.observability import StanzaObservabilityRecorder + + monkeypatch.setattr( + "solnlib.observability.ObservabilityService._create_otlp_exporter", + lambda self: None, + ) + logger = MagicMock(spec=logging.Logger) + r1 = StanzaObservabilityRecorder("kql", logger, "stanza-1") + r2 = StanzaObservabilityRecorder("kql", logger, "stanza-2") + assert ( + StanzaObservabilityRecorder._instances["kql"] + is StanzaObservabilityRecorder._instances["kql"] + ) + assert r1._service is r2._service + + def test_singleton_different_modinput_types_create_separate_services( + self, monkeypatch, clear_stanza_recorder_cache + ): + from solnlib.observability import StanzaObservabilityRecorder + + monkeypatch.setattr( + "solnlib.observability.ObservabilityService._create_otlp_exporter", + lambda self: None, + ) + logger = MagicMock(spec=logging.Logger) + r1 = StanzaObservabilityRecorder("kql", logger, "stanza-1") + r2 = StanzaObservabilityRecorder("event-hub", logger, "stanza-2") + assert r1._service is not r2._service + assert "kql" in StanzaObservabilityRecorder._instances + assert "event-hub" in StanzaObservabilityRecorder._instances + + def test_record_calls_add_with_stanza_name( + self, monkeypatch, clear_stanza_recorder_cache + ): + rec = self._make_recorder(monkeypatch) + mock_count = MagicMock() + mock_bytes = MagicMock() + rec._service.event_count_counter = mock_count + rec._service.event_bytes_counter = mock_bytes + mock_count.reset_mock() + mock_bytes.reset_mock() + + rec.record(5, 1024) + + mock_count.add.assert_called_once_with( + 5, attributes={"splunk.modinput.name": "my:stanza"} + ) + mock_bytes.add.assert_called_once_with( + 1024, attributes={"splunk.modinput.name": "my:stanza"} + ) + + def test_record_merges_extra_attrs(self, monkeypatch, clear_stanza_recorder_cache): + rec = self._make_recorder(monkeypatch) + mock_count = MagicMock() + mock_bytes = MagicMock() + rec._service.event_count_counter = mock_count + rec._service.event_bytes_counter = mock_bytes + mock_count.reset_mock() + mock_bytes.reset_mock() + + rec.record(3, 512, extra_attrs={"partition.id": "0"}) + + mock_count.add.assert_called_once_with( + 3, + attributes={"splunk.modinput.name": "my:stanza", "partition.id": "0"}, + ) + + def test_record_noop_when_counters_none( + self, monkeypatch, clear_stanza_recorder_cache + ): + rec = self._make_recorder(monkeypatch) + rec._service.event_count_counter = None + rec._service.event_bytes_counter = None + # Must not raise + rec.record(10, 2048) + + def test_emit_zero_baseline_called_in_init( + self, monkeypatch, clear_stanza_recorder_cache + ): + from solnlib.observability import StanzaObservabilityRecorder + + monkeypatch.setattr( + "solnlib.observability.ObservabilityService._create_otlp_exporter", + lambda self: None, + ) + logger = MagicMock(spec=logging.Logger) + mock_count = MagicMock() + mock_bytes = MagicMock() + + with patch( + "solnlib.observability.ObservabilityService.__init__", + lambda self, **kwargs: None, + ): + pass # We'll use a different approach below + + # Create recorder, then inspect that counters got add(0) called + rec = StanzaObservabilityRecorder("kql", logger, "my-stanza") + svc = rec._service + if svc.event_count_counter is not None: + # If service initialised, counters exist — check add(0,...) was called + # We can only verify indirectly since we can't intercept before __init__ + # So we test _emit_zero_baseline directly instead + svc.event_count_counter = mock_count + svc.event_bytes_counter = mock_bytes + rec._emit_zero_baseline() + mock_count.add.assert_called_once_with( + 0, attributes={"splunk.modinput.name": "my-stanza"} + ) + mock_bytes.add.assert_called_once_with( + 0, attributes={"splunk.modinput.name": "my-stanza"} + ) + + def test_flush_delegates_to_service(self, monkeypatch, clear_stanza_recorder_cache): + rec = self._make_recorder(monkeypatch) + mock_service = MagicMock() + rec._service = mock_service + + rec.flush() + + mock_service.flush.assert_called_once() + + def test_context_manager_calls_flush_on_exit( + self, monkeypatch, clear_stanza_recorder_cache + ): + rec = self._make_recorder(monkeypatch) + with patch.object(rec, "flush") as mock_flush: + with rec: + pass + mock_flush.assert_called_once() + + def test_context_manager_does_not_suppress_exceptions( + self, monkeypatch, clear_stanza_recorder_cache + ): + rec = self._make_recorder(monkeypatch) + with pytest.raises(ValueError): + with rec: + raise ValueError("boom") + + def test_get_service_returns_none_when_not_initialised( + self, clear_stanza_recorder_cache + ): + from solnlib.observability import StanzaObservabilityRecorder + + result = StanzaObservabilityRecorder.get_service("never-used") + assert result is None + + def test_get_service_returns_service_after_init( + self, monkeypatch, clear_stanza_recorder_cache + ): + from solnlib.observability import StanzaObservabilityRecorder + + rec = self._make_recorder(monkeypatch, modinput_type="kql") + svc = StanzaObservabilityRecorder.get_service("kql") + assert svc is rec._service From 1ef58885b3ce9b13d122ba8aca4ac8817ea84c4f Mon Sep 17 00:00:00 2001 From: Wojciech Tobis Date: Tue, 12 May 2026 00:21:09 +0200 Subject: [PATCH 5/7] refactor: replace get_service() with register_instrument() on StanzaObservabilityRecorder --- solnlib/observability.py | 204 +++++++++++++++++++++++++++---- tests/unit/test_observability.py | 32 +++-- 2 files changed, 203 insertions(+), 33 deletions(-) diff --git a/solnlib/observability.py b/solnlib/observability.py index e7a7ad5c..9cea887f 100644 --- a/solnlib/observability.py +++ b/solnlib/observability.py @@ -428,18 +428,27 @@ def _resolve_otlp_port(self) -> Optional[str]: return self._discover_otlp_port_via_ipc_broker() def _create_otlp_exporter(self) -> Optional[MetricExporter]: - """Create a TLS-secured OTLP gRPC exporter targeting the Spotlight - collector. + """Create a TLS-secured OTLP gRPC exporter targeting the Spotlight collector. + + ``grpc`` and ``OTLPMetricExporter`` are imported lazily inside this + method so that ``import solnlib.observability`` succeeds in environments + where ``grpcio`` is not installed. The import only fails when OTLP + export is actually attempted. The collector's server certificate is read from ``$SPLUNK_HOME/var/packages/data/spotlight-collector/server.crt`` (defaults to ``/opt/splunk`` when ``SPLUNK_HOME`` is not set). + Both ``Counter`` and ``Histogram`` instruments are configured with + ``AggregationTemporality.DELTA`` so that each export interval reports + only the change since the previous interval. + Returns the configured exporter, or ``None`` when: - The OTLP port cannot be resolved (see :meth:`_resolve_otlp_port`). - The certificate file does not exist. - - Any other exception occurs during exporter construction. + - Any other exception occurs during exporter construction (including a + missing ``grpcio`` package). """ try: import grpc @@ -540,8 +549,17 @@ def register_instrument( def flush(self, timeout_millis: float = 30_000) -> None: """Force-flush all metric readers. - Should be called before the modular input process exits to ensure - all buffered metrics are exported. + Blocks until all buffered data points have been handed off to their + exporters or *timeout_millis* elapses. Call this before the modular + input process exits to avoid dropping the last batch of metrics. + + Prefer using :class:`StanzaObservabilityRecorder` as a context manager + rather than calling this method directly — it calls + :meth:`StanzaObservabilityRecorder.flush` on exit automatically. + + Args: + timeout_millis: Maximum time to wait for exporters to drain, in + milliseconds. Defaults to 30 seconds. """ if self._provider is None: return @@ -555,19 +573,57 @@ class StanzaObservabilityRecorder: """Stanza-scoped observability recorder backed by a shared ``ObservabilityService``. One ``ObservabilityService`` is created per *modinput_type* per process and - cached for the lifetime of the process. Each ``StanzaObservabilityRecorder`` - instance is bound to a single *stanza_name*, which is automatically included - as the ``splunk.modinput.name`` attribute on every data point. + cached in :attr:`_instances` for the lifetime of the process. Every + ``StanzaObservabilityRecorder`` for the same *modinput_type* shares that + service regardless of how many stanzas are active, so the OTLP connection + and ``MeterProvider`` are only initialised once. + + Each recorder instance is bound to a single *stanza_name*, which is + automatically attached as the ``"splunk.modinput.name"`` attribute on every + recorded data point. + + **Best practices:** + + - Use as a context manager (``with`` statement) so that :meth:`flush` is + always called when the stanza collection loop exits, even on exceptions. + - Pass the same *modinput_type* string for all stanzas of the same input + type. The string should be lowercase, hyphenated, and stable across + restarts (e.g. ``"event-hub"``). + - Do not store the recorder beyond the lifetime of a single stanza + collection cycle — create a new instance for each run. + - Register custom instruments via :meth:`register_instrument` on the + recorder instance rather than accessing the underlying service directly. + - :class:`StanzaObservabilityRecorder` is **thread-safe** at the singleton + level (``_lock`` protects ``_instances``), but individual recorder + instances are not meant to be shared across threads. + + **Typical usage**:: - Implements the context-manager protocol: ``__exit__`` calls :meth:`flush` - so metrics are exported before the stanza's process exits. + import logging + from solnlib.observability import StanzaObservabilityRecorder - Example:: + logger = logging.getLogger(__name__) - from solnlib.observability import StanzaObservabilityRecorder + def collect(stanza_name: str) -> None: + with StanzaObservabilityRecorder("my-input", logger, stanza_name) as obs: + events = fetch_events() + obs.record(len(events), sum(len(e) for e in events)) + + **Custom instrument** (e.g. latency histogram):: + + from solnlib.observability import StanzaObservabilityRecorder, ATTR_MODINPUT_NAME - with StanzaObservabilityRecorder("event-hub", logger, stanza_name) as obs: - obs.record(len(events), total_bytes) + with StanzaObservabilityRecorder("my-input", logger, stanza_name) as obs: + latency_histogram = obs.register_instrument( + lambda meter: meter.create_histogram( + name="my_ta.request.latency", + description="Latency of outbound API requests", + unit="s", + ) + ) + # ... collect events ... + if latency_histogram: + latency_histogram.record(elapsed, {ATTR_MODINPUT_NAME: stanza_name}) """ _instances: ClassVar[dict[str, ObservabilityService]] = {} @@ -579,6 +635,25 @@ def __init__( logger: _Logger, stanza_name: str, ) -> None: + """Initialise a stanza-scoped recorder. + + Gets or creates the shared :class:`ObservabilityService` for + *modinput_type* (singleton per process), then emits a zero baseline + on both built-in counters so that the metric series is visible in + dashboards from the very first collection cycle even when no events + were ingested. + + Args: + modinput_type: Low-cardinality identifier for the input type, + e.g. ``"event-hub"``. All recorders for the same input type + share a single ``ObservabilityService``. + logger: Python logger used both for ``ObservabilityService`` + diagnostics and for the :class:`LoggerMetricExporter` that + is automatically added as an extra exporter. + stanza_name: The name of the input stanza being collected (e.g. + ``"my_stanza"``). Attached as ``"splunk.modinput.name"`` + on every recorded data point. + """ self._stanza_name = stanza_name self._service = self._get_or_create_service(modinput_type, logger) self._emit_zero_baseline() @@ -587,6 +662,12 @@ def __init__( def _get_or_create_service( cls, modinput_type: str, logger: _Logger ) -> ObservabilityService: + """Return the cached service for *modinput_type*, creating it if needed. + + Thread-safe: uses ``_lock`` to ensure exactly one + ``ObservabilityService`` is created per *modinput_type* even when + multiple stanzas are initialised concurrently at process start. + """ with cls._lock: if modinput_type not in cls._instances: cls._instances[modinput_type] = ObservabilityService( @@ -596,11 +677,39 @@ def _get_or_create_service( ) return cls._instances[modinput_type] - @classmethod - def get_service(cls, modinput_type: str) -> Optional[ObservabilityService]: - """Return the cached ``ObservabilityService`` for *modinput_type*, or ``None``.""" - with cls._lock: - return cls._instances.get(modinput_type) + def register_instrument( + self, callback: Callable[[Meter], Instrument] + ) -> Optional[Instrument]: + """Create a custom instrument on the shared meter. + + Delegates to :meth:`ObservabilityService.register_instrument`. The + instrument is registered on the process-wide ``MeterProvider``, so it + is shared across all recorders for the same *modinput_type*. Calling + this method on any recorder instance for a given *modinput_type* is + equivalent — register each instrument only once. + + Returns ``None`` when :class:`ObservabilityService` failed to + initialise (e.g. because ``ta_name`` could not be determined). Always + guard the returned value before recording:: + + latency_histogram = obs.register_instrument( + lambda meter: meter.create_histogram( + name="my_ta.request.latency", + description="Latency of outbound API requests", + unit="s", + ) + ) + if latency_histogram: + latency_histogram.record(elapsed, {ATTR_MODINPUT_NAME: self._stanza_name}) + + Args: + callback: Callable that receives the ``Meter`` and returns a new + instrument (Counter, Histogram, Gauge, etc.). + + Returns: + The instrument created by *callback*, or ``None``. + """ + return self._service.register_instrument(callback) def record( self, @@ -610,9 +719,29 @@ def record( ) -> None: """Add *event_count* and *byte_count* to the built-in counters. - *extra_attrs* are merged with ``{"splunk.modinput.name": stanza_name}`` - before being passed to the counter. Silently no-ops if either counter - is ``None`` (i.e. ``ObservabilityService`` failed to initialise). + The ``"splunk.modinput.name"`` attribute is always set to the + *stanza_name* supplied at construction time. *extra_attrs* are merged + on top, so they can override or extend the default attribute set. + + Silently no-ops if either counter is ``None`` (i.e. + :class:`ObservabilityService` failed to initialise). + + Args: + event_count: Number of events ingested in this batch. Pass ``0`` + for an explicit "no events" observation. + byte_count: Total size of the ingested events in bytes. + extra_attrs: Optional dict of additional OpenTelemetry attributes + to attach to both data points. Keys must be strings; values + must be strings, booleans, or numbers. Avoid high-cardinality + keys such as user IDs or GUIDs. + + Example:: + + obs.record( + event_count=len(events), + byte_count=sum(len(e) for e in events), + extra_attrs={"my_ta.partition": partition_id}, + ) """ attrs = {ATTR_MODINPUT_NAME: self._stanza_name} if extra_attrs: @@ -623,15 +752,44 @@ def record( self._service.event_bytes_counter.add(byte_count, attributes=attrs) def flush(self) -> None: - """Force-flush all metric readers via ``ObservabilityService.flush()``.""" + """Force-flush all metric readers. + + Delegates to :meth:`ObservabilityService.flush` (which calls + ``MeterProvider.force_flush()`` internally). Called automatically by + ``__exit__`` when the recorder is used as a context manager, so you + rarely need to call this directly. + + Call it explicitly only when you are not using the context manager and + need to guarantee delivery before the process exits:: + + obs = StanzaObservabilityRecorder("my-input", logger, stanza_name) + try: + obs.record(len(events), total_bytes) + finally: + obs.flush() + """ self._service.flush() def __enter__(self) -> "StanzaObservabilityRecorder": + """Return *self* to support the ``with`` statement.""" return self def __exit__(self, *_) -> bool: + """Flush all metric readers and allow exceptions to propagate. + + Returns ``False`` so any exception raised inside the ``with`` block + is re-raised after flushing. + """ self.flush() return False def _emit_zero_baseline(self) -> None: + """Emit ``add(0)`` on both built-in counters. + + Called once from ``__init__``. Ensures that the metric series for this + stanza appears in dashboards and alerting rules from the very first + collection cycle, even when no events were ingested. Without this + baseline, a stanza that has never produced data is indistinguishable + from a stanza that has never been seen at all. + """ self.record(0, 0) diff --git a/tests/unit/test_observability.py b/tests/unit/test_observability.py index 36d10c97..940363ee 100644 --- a/tests/unit/test_observability.py +++ b/tests/unit/test_observability.py @@ -726,19 +726,31 @@ def test_context_manager_does_not_suppress_exceptions( with rec: raise ValueError("boom") - def test_get_service_returns_none_when_not_initialised( - self, clear_stanza_recorder_cache + def test_register_instrument_delegates_to_service( + self, monkeypatch, clear_stanza_recorder_cache ): - from solnlib.observability import StanzaObservabilityRecorder + rec = self._make_recorder(monkeypatch) + mock_service = MagicMock() + mock_instrument = MagicMock() + mock_service.register_instrument.return_value = mock_instrument + rec._service = mock_service - result = StanzaObservabilityRecorder.get_service("never-used") - assert result is None + def callback(meter): + return meter.create_counter("x") + + result = rec.register_instrument(callback) - def test_get_service_returns_service_after_init( + mock_service.register_instrument.assert_called_once_with(callback) + assert result is mock_instrument + + def test_register_instrument_returns_none_when_service_not_initialised( self, monkeypatch, clear_stanza_recorder_cache ): - from solnlib.observability import StanzaObservabilityRecorder + rec = self._make_recorder(monkeypatch) + mock_service = MagicMock() + mock_service.register_instrument.return_value = None + rec._service = mock_service + + result = rec.register_instrument(lambda meter: meter.create_counter("x")) - rec = self._make_recorder(monkeypatch, modinput_type="kql") - svc = StanzaObservabilityRecorder.get_service("kql") - assert svc is rec._service + assert result is None From 70141d3d62911a6d37af2c0db5d64e0ff6b2275a Mon Sep 17 00:00:00 2001 From: Wojciech Tobis Date: Tue, 12 May 2026 00:23:54 +0200 Subject: [PATCH 6/7] test: fix test_emit_zero_baseline_called_in_init and resolve E731 linter error --- tests/unit/test_observability.py | 35 ++++---------------------------- 1 file changed, 4 insertions(+), 31 deletions(-) diff --git a/tests/unit/test_observability.py b/tests/unit/test_observability.py index 940363ee..eb803d32 100644 --- a/tests/unit/test_observability.py +++ b/tests/unit/test_observability.py @@ -664,41 +664,14 @@ def test_record_noop_when_counters_none( # Must not raise rec.record(10, 2048) - def test_emit_zero_baseline_called_in_init( - self, monkeypatch, clear_stanza_recorder_cache - ): + def test_emit_zero_baseline_called_in_init(self, clear_stanza_recorder_cache): from solnlib.observability import StanzaObservabilityRecorder - monkeypatch.setattr( - "solnlib.observability.ObservabilityService._create_otlp_exporter", - lambda self: None, - ) logger = MagicMock(spec=logging.Logger) - mock_count = MagicMock() - mock_bytes = MagicMock() + with patch.object(StanzaObservabilityRecorder, "record") as mock_record: + StanzaObservabilityRecorder("kql", logger, "my-stanza") - with patch( - "solnlib.observability.ObservabilityService.__init__", - lambda self, **kwargs: None, - ): - pass # We'll use a different approach below - - # Create recorder, then inspect that counters got add(0) called - rec = StanzaObservabilityRecorder("kql", logger, "my-stanza") - svc = rec._service - if svc.event_count_counter is not None: - # If service initialised, counters exist — check add(0,...) was called - # We can only verify indirectly since we can't intercept before __init__ - # So we test _emit_zero_baseline directly instead - svc.event_count_counter = mock_count - svc.event_bytes_counter = mock_bytes - rec._emit_zero_baseline() - mock_count.add.assert_called_once_with( - 0, attributes={"splunk.modinput.name": "my-stanza"} - ) - mock_bytes.add.assert_called_once_with( - 0, attributes={"splunk.modinput.name": "my-stanza"} - ) + mock_record.assert_called_once_with(0, 0) def test_flush_delegates_to_service(self, monkeypatch, clear_stanza_recorder_cache): rec = self._make_recorder(monkeypatch) From 86b7217b50e46152b6fb0a06d5b3bae88cb35dd0 Mon Sep 17 00:00:00 2001 From: Wojciech Tobis Date: Tue, 12 May 2026 00:29:11 +0200 Subject: [PATCH 7/7] fix: prevent extra_attrs from overriding ATTR_MODINPUT_NAME in record() extra_attrs is now merged first and stanza_name is applied last, so the stanza-scoped guarantee is always preserved. Co-Authored-By: Claude Opus 4.7 --- solnlib/observability.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/solnlib/observability.py b/solnlib/observability.py index 9cea887f..204f8cb8 100644 --- a/solnlib/observability.py +++ b/solnlib/observability.py @@ -720,8 +720,9 @@ def record( """Add *event_count* and *byte_count* to the built-in counters. The ``"splunk.modinput.name"`` attribute is always set to the - *stanza_name* supplied at construction time. *extra_attrs* are merged - on top, so they can override or extend the default attribute set. + *stanza_name* supplied at construction time and cannot be overridden by + *extra_attrs*. This preserves the stanza-scoped guarantee — every data + point is unambiguously attributed to the stanza that recorded it. Silently no-ops if either counter is ``None`` (i.e. :class:`ObservabilityService` failed to initialise). @@ -743,9 +744,8 @@ def record( extra_attrs={"my_ta.partition": partition_id}, ) """ - attrs = {ATTR_MODINPUT_NAME: self._stanza_name} - if extra_attrs: - attrs.update(extra_attrs) + attrs = dict(extra_attrs) if extra_attrs else {} + attrs[ATTR_MODINPUT_NAME] = self._stanza_name if self._service.event_count_counter: self._service.event_count_counter.add(event_count, attributes=attrs) if self._service.event_bytes_counter: