Skip to content
318 changes: 290 additions & 28 deletions solnlib/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#
"""OpenTelemetry observability utilities for Splunk add-ons.

This module provides two public components:
This module provides three public components:

- :class:`LoggerMetricExporter` — an OpenTelemetry ``MetricExporter`` that
writes every exported data point to a standard Python logger. It is useful
Expand All @@ -28,39 +28,30 @@
Splunk Spotlight OTLP collector and falls back silently when it is not
reachable, so callers never have to handle observability failures themselves.

- :class:`StanzaObservabilityRecorder` — a stanza-scoped recorder that wraps
``ObservabilityService`` with a per-process singleton cache. Bind it to a
single stanza name and call :meth:`~StanzaObservabilityRecorder.record` after
each batch of ingested events. Use as a context manager for automatic flush
on exit.

Typical usage::

import logging
from solnlib.observability import LoggerMetricExporter, ObservabilityService, ATTR_MODINPUT_NAME
from solnlib.observability import StanzaObservabilityRecorder

logger = logging.getLogger(__name__)

obs = ObservabilityService(
modinput_type="my-input",
logger=logger,
ta_name="my_ta",
ta_version="1.0.0",
extra_exporters=[LoggerMetricExporter(logger)],
)

# In your event collection loop:
if obs.event_count_counter:
obs.event_count_counter.add(
len(events), {ATTR_MODINPUT_NAME: stanza_name}
)
if obs.event_bytes_counter:
obs.event_bytes_counter.add(
total_bytes, {ATTR_MODINPUT_NAME: stanza_name}
)
with StanzaObservabilityRecorder("my-input", logger, stanza_name) as obs:
obs.record(len(events), total_bytes)
"""

import json
import logging
import os
import ssl
import threading
import urllib.request
from typing import Callable, Optional, Union
import grpc
from typing import Callable, ClassVar, Optional, Union
from .splunkenv import get_conf_stanzas
from opentelemetry.metrics import Instrument, Meter
from opentelemetry.sdk.metrics import MeterProvider, Counter, Histogram
Expand All @@ -72,7 +63,6 @@
AggregationTemporality,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter

_Logger = Union[logging.Logger, logging.LoggerAdapter]

Expand Down Expand Up @@ -283,6 +273,7 @@ def __init__(
self.event_count_counter: Optional[Counter] = None
self.event_bytes_counter: Optional[Counter] = None
self._meter: Optional[Meter] = None
self._provider: Optional[MeterProvider] = None

try:
if ta_name is None or ta_version is None:
Expand Down Expand Up @@ -313,8 +304,10 @@ def __init__(
for exporter in extra_exporters or []:
metric_readers.append(PeriodicExportingMetricReader(exporter))

provider = MeterProvider(resource=resource, metric_readers=metric_readers)
self._meter = provider.get_meter(ta_name, ta_version)
self._provider = MeterProvider(
resource=resource, metric_readers=metric_readers
)
self._meter = self._provider.get_meter(ta_name, ta_version)

self.event_count_counter = self._meter.create_counter(
name="splunk.addon.events",
Expand Down Expand Up @@ -435,20 +428,34 @@ def _resolve_otlp_port(self) -> Optional[str]:
return self._discover_otlp_port_via_ipc_broker()

def _create_otlp_exporter(self) -> Optional[MetricExporter]:
"""Create a TLS-secured OTLP gRPC exporter targeting the Spotlight
collector.
"""Create a TLS-secured OTLP gRPC exporter targeting the Spotlight collector.

``grpc`` and ``OTLPMetricExporter`` are imported lazily inside this
method so that ``import solnlib.observability`` succeeds in environments
where ``grpcio`` is not installed. The import only fails when OTLP
export is actually attempted.

The collector's server certificate is read from
``$SPLUNK_HOME/var/packages/data/spotlight-collector/server.crt``
(defaults to ``/opt/splunk`` when ``SPLUNK_HOME`` is not set).

Both ``Counter`` and ``Histogram`` instruments are configured with
``AggregationTemporality.DELTA`` so that each export interval reports
only the change since the previous interval.

Returns the configured exporter, or ``None`` when:

- The OTLP port cannot be resolved (see :meth:`_resolve_otlp_port`).
- The certificate file does not exist.
- Any other exception occurs during exporter construction.
- Any other exception occurs during exporter construction (including a
missing ``grpcio`` package).
"""
try:
import grpc
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
OTLPMetricExporter,
)

splunk_home = os.environ.get("SPLUNK_HOME", "/opt/splunk")
otel_port = self._resolve_otlp_port()

Expand Down Expand Up @@ -484,7 +491,14 @@ def _create_otlp_exporter(self) -> Optional[MetricExporter]:
server_cert = f.read()

credentials = grpc.ssl_channel_credentials(root_certificates=server_cert)
exporter = OTLPMetricExporter(endpoint=endpoint, credentials=credentials)
exporter = OTLPMetricExporter(
endpoint=endpoint,
credentials=credentials,
preferred_temporality={
Counter: AggregationTemporality.DELTA,
Histogram: AggregationTemporality.DELTA,
},
)
self._logger.info("OTLP gRPC exporter configured with TLS for %s", endpoint)
return exporter

Expand Down Expand Up @@ -531,3 +545,251 @@ def register_instrument(
if self._meter is None:
return None
return callback(self._meter)

def flush(self, timeout_millis: float = 30_000) -> None:
"""Force-flush all metric readers.

Blocks until all buffered data points have been handed off to their
exporters or *timeout_millis* elapses. Call this before the modular
input process exits to avoid dropping the last batch of metrics.

Prefer using :class:`StanzaObservabilityRecorder` as a context manager
rather than calling this method directly — it calls
:meth:`StanzaObservabilityRecorder.flush` on exit automatically.

Args:
timeout_millis: Maximum time to wait for exporters to drain, in
milliseconds. Defaults to 30 seconds.
"""
if self._provider is None:
return
try:
self._provider.force_flush(timeout_millis=int(timeout_millis))
except Exception as e:
self._logger.warning("Failed to flush metrics: %s", e)


class StanzaObservabilityRecorder:
"""Stanza-scoped observability recorder backed by a shared ``ObservabilityService``.

One ``ObservabilityService`` is created per *modinput_type* per process and
cached in :attr:`_instances` for the lifetime of the process. Every
``StanzaObservabilityRecorder`` for the same *modinput_type* shares that
service regardless of how many stanzas are active, so the OTLP connection
and ``MeterProvider`` are only initialised once.

Each recorder instance is bound to a single *stanza_name*, which is
automatically attached as the ``"splunk.modinput.name"`` attribute on every
recorded data point.

**Best practices:**

- Use as a context manager (``with`` statement) so that :meth:`flush` is
always called when the stanza collection loop exits, even on exceptions.
- Pass the same *modinput_type* string for all stanzas of the same input
type. The string should be lowercase, hyphenated, and stable across
restarts (e.g. ``"event-hub"``).
- Do not store the recorder beyond the lifetime of a single stanza
collection cycle — create a new instance for each run.
- Register custom instruments via :meth:`register_instrument` on the
recorder instance rather than accessing the underlying service directly.
- :class:`StanzaObservabilityRecorder` is **thread-safe** at the singleton
level (``_lock`` protects ``_instances``), but individual recorder
instances are not meant to be shared across threads.

**Typical usage**::

import logging
from solnlib.observability import StanzaObservabilityRecorder

logger = logging.getLogger(__name__)

def collect(stanza_name: str) -> None:
with StanzaObservabilityRecorder("my-input", logger, stanza_name) as obs:
events = fetch_events()
obs.record(len(events), sum(len(e) for e in events))

**Custom instrument** (e.g. latency histogram)::

from solnlib.observability import StanzaObservabilityRecorder, ATTR_MODINPUT_NAME

with StanzaObservabilityRecorder("my-input", logger, stanza_name) as obs:
latency_histogram = obs.register_instrument(
lambda meter: meter.create_histogram(
name="my_ta.request.latency",
description="Latency of outbound API requests",
unit="s",
)
)
# ... collect events ...
if latency_histogram:
latency_histogram.record(elapsed, {ATTR_MODINPUT_NAME: stanza_name})
"""

_instances: ClassVar[dict[str, ObservabilityService]] = {}
_lock: ClassVar[threading.Lock] = threading.Lock()

def __init__(
self,
modinput_type: str,
logger: _Logger,
stanza_name: str,
) -> None:
"""Initialise a stanza-scoped recorder.

Gets or creates the shared :class:`ObservabilityService` for
*modinput_type* (singleton per process), then emits a zero baseline
on both built-in counters so that the metric series is visible in
dashboards from the very first collection cycle even when no events
were ingested.

Args:
modinput_type: Low-cardinality identifier for the input type,
e.g. ``"event-hub"``. All recorders for the same input type
share a single ``ObservabilityService``.
logger: Python logger used both for ``ObservabilityService``
diagnostics and for the :class:`LoggerMetricExporter` that
is automatically added as an extra exporter.
stanza_name: The name of the input stanza being collected (e.g.
``"my_stanza"``). Attached as ``"splunk.modinput.name"``
on every recorded data point.
"""
self._stanza_name = stanza_name
self._service = self._get_or_create_service(modinput_type, logger)
self._emit_zero_baseline()

@classmethod
def _get_or_create_service(
cls, modinput_type: str, logger: _Logger
) -> ObservabilityService:
"""Return the cached service for *modinput_type*, creating it if needed.

Thread-safe: uses ``_lock`` to ensure exactly one
``ObservabilityService`` is created per *modinput_type* even when
multiple stanzas are initialised concurrently at process start.
"""
with cls._lock:
if modinput_type not in cls._instances:
cls._instances[modinput_type] = ObservabilityService(
modinput_type=modinput_type,
logger=logger,
extra_exporters=[LoggerMetricExporter(logger)],
)
return cls._instances[modinput_type]

def register_instrument(
self, callback: Callable[[Meter], Instrument]
) -> Optional[Instrument]:
"""Create a custom instrument on the shared meter.

Delegates to :meth:`ObservabilityService.register_instrument`. The
instrument is registered on the process-wide ``MeterProvider``, so it
is shared across all recorders for the same *modinput_type*. Calling
this method on any recorder instance for a given *modinput_type* is
equivalent — register each instrument only once.

Returns ``None`` when :class:`ObservabilityService` failed to
initialise (e.g. because ``ta_name`` could not be determined). Always
guard the returned value before recording::

latency_histogram = obs.register_instrument(
lambda meter: meter.create_histogram(
name="my_ta.request.latency",
description="Latency of outbound API requests",
unit="s",
)
)
if latency_histogram:
latency_histogram.record(elapsed, {ATTR_MODINPUT_NAME: self._stanza_name})

Args:
callback: Callable that receives the ``Meter`` and returns a new
instrument (Counter, Histogram, Gauge, etc.).

Returns:
The instrument created by *callback*, or ``None``.
"""
return self._service.register_instrument(callback)

def record(
self,
event_count: int,
byte_count: int,
extra_attrs: Optional[dict] = None,
) -> None:
"""Add *event_count* and *byte_count* to the built-in counters.

The ``"splunk.modinput.name"`` attribute is always set to the
*stanza_name* supplied at construction time and cannot be overridden by
*extra_attrs*. This preserves the stanza-scoped guarantee — every data
point is unambiguously attributed to the stanza that recorded it.

Silently no-ops if either counter is ``None`` (i.e.
:class:`ObservabilityService` failed to initialise).

Args:
event_count: Number of events ingested in this batch. Pass ``0``
for an explicit "no events" observation.
byte_count: Total size of the ingested events in bytes.
extra_attrs: Optional dict of additional OpenTelemetry attributes
to attach to both data points. Keys must be strings; values
must be strings, booleans, or numbers. Avoid high-cardinality
keys such as user IDs or GUIDs.

Example::

obs.record(
event_count=len(events),
byte_count=sum(len(e) for e in events),
extra_attrs={"my_ta.partition": partition_id},
)
"""
attrs = dict(extra_attrs) if extra_attrs else {}
attrs[ATTR_MODINPUT_NAME] = self._stanza_name
if self._service.event_count_counter:
self._service.event_count_counter.add(event_count, attributes=attrs)
if self._service.event_bytes_counter:
self._service.event_bytes_counter.add(byte_count, attributes=attrs)

def flush(self) -> None:
"""Force-flush all metric readers.

Delegates to :meth:`ObservabilityService.flush` (which calls
``MeterProvider.force_flush()`` internally). Called automatically by
``__exit__`` when the recorder is used as a context manager, so you
rarely need to call this directly.

Call it explicitly only when you are not using the context manager and
need to guarantee delivery before the process exits::

obs = StanzaObservabilityRecorder("my-input", logger, stanza_name)
try:
obs.record(len(events), total_bytes)
finally:
obs.flush()
"""
self._service.flush()

def __enter__(self) -> "StanzaObservabilityRecorder":
"""Return *self* to support the ``with`` statement."""
return self

def __exit__(self, *_) -> bool:
"""Flush all metric readers and allow exceptions to propagate.

Returns ``False`` so any exception raised inside the ``with`` block
is re-raised after flushing.
"""
self.flush()
return False

def _emit_zero_baseline(self) -> None:
"""Emit ``add(0)`` on both built-in counters.

Called once from ``__init__``. Ensures that the metric series for this
stanza appears in dashboards and alerting rules from the very first
collection cycle, even when no events were ingested. Without this
baseline, a stanza that has never produced data is indistinguishable
from a stanza that has never been seen at all.
"""
self.record(0, 0)
Loading
Loading