Skip to content
1 change: 1 addition & 0 deletions temporalio/bridge/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class OpenTelemetryConfig:
metric_temporality_delta: bool
durations_as_seconds: bool
http: bool
histogram_bucket_overrides: Mapping[str, Sequence[float]] | None = None


@dataclass(frozen=True)
Expand Down
6 changes: 6 additions & 0 deletions temporalio/bridge/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub struct OpenTelemetryConfig {
metric_temporality_delta: bool,
durations_as_seconds: bool,
http: bool,
histogram_bucket_overrides: Option<HashMap<String, Vec<f64>>>,
}

#[derive(FromPyObject)]
Expand Down Expand Up @@ -357,6 +358,11 @@ impl TryFrom<MetricsConfig> for Arc<dyn CoreMeter> {
} else {
None
})
.maybe_histogram_bucket_overrides(otel_conf.histogram_bucket_overrides.map(
|overrides| temporalio_common::telemetry::HistogramBucketOverrides {
overrides,
},
))
.build();
Ok(Arc::new(build_otlp_metric_exporter(otel_options).map_err(
|err| PyValueError::new_err(format!("Failed building OTel exporter: {err}")),
Expand Down
6 changes: 6 additions & 0 deletions temporalio/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ class OpenTelemetryConfig:
When enabled, the ``url`` should point to the HTTP endpoint
(e.g. ``"http://localhost:4318/v1/metrics"``).
Defaults to ``False`` (gRPC).
histogram_bucket_overrides: Override the default histogram bucket
boundaries for specific metrics. Keys are metric names and
values are sequences of bucket boundaries (e.g.
``{"workflow_task_schedule_to_start_latency": [0.01, 0.05, 0.1, 0.5, 1.0, 5.0]}``).
"""

url: str
Expand All @@ -345,6 +349,7 @@ class OpenTelemetryConfig:
)
durations_as_seconds: bool = False
http: bool = False
histogram_bucket_overrides: Mapping[str, Sequence[float]] | None = None

def _to_bridge_config(self) -> temporalio.bridge.runtime.OpenTelemetryConfig:
return temporalio.bridge.runtime.OpenTelemetryConfig(
Expand All @@ -360,6 +365,7 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.OpenTelemetryConfig:
),
durations_as_seconds=self.durations_as_seconds,
http=self.http,
histogram_bucket_overrides=self.histogram_bucket_overrides,
)


Expand Down
104 changes: 103 additions & 1 deletion tests/test_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import re
import uuid
from datetime import timedelta
from typing import cast
from typing import Any, cast
from urllib.request import urlopen

import pytest
Expand All @@ -14,6 +14,7 @@
from temporalio.runtime import (
LogForwardingConfig,
LoggingConfig,
OpenTelemetryConfig,
PrometheusConfig,
Runtime,
TelemetryConfig,
Expand Down Expand Up @@ -269,6 +270,107 @@ async def check_metrics() -> None:
await assert_eventually(check_metrics)


async def test_opentelemetry_histogram_bucket_overrides(client: Client):
# Set up an OpenTelemetry configuration with custom histogram bucket overrides
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer

from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import (
ExportMetricsServiceRequest,
ExportMetricsServiceResponse,
)

special_value = float(1234.5678)
histogram_overrides = {
"temporal_long_request_latency": [special_value / 2, special_value],
"custom_histogram": [special_value / 2, special_value],
}

captured: dict[str, list[float]] = {}
lock = threading.Lock()

class Handler(BaseHTTPRequestHandler):
def log_message(self, format: str, *args: Any):
pass # silence default stderr logging

def do_POST(self):
length = int(self.headers.get("Content-Length", "0"))
req = ExportMetricsServiceRequest()
req.ParseFromString(self.rfile.read(length))
with lock:
for rm in req.resource_metrics:
for sm in rm.scope_metrics:
for m in sm.metrics:
if m.HasField("histogram"):
for dp in m.histogram.data_points:
captured[m.name] = list(dp.explicit_bounds)
body = ExportMetricsServiceResponse().SerializeToString()
self.send_response(200)
self.send_header("Content-Type", "application/x-protobuf")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)

otel_port = find_free_port()
server = HTTPServer(("127.0.0.1", otel_port), Handler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
try:
runtime = Runtime(
telemetry=TelemetryConfig(
metrics=OpenTelemetryConfig(
url=f"http://127.0.0.1:{otel_port}/v1/metrics",
http=True,
metric_periodicity=timedelta(milliseconds=100),
durations_as_seconds=False,
histogram_bucket_overrides=histogram_overrides,
),
),
)

# Create and record to a custom histogram
custom_histogram = runtime.metric_meter.create_histogram(
"custom_histogram", "Custom histogram", "ms"
)
custom_histogram.record(600)

# Run a workflow so built-in histograms (e.g. temporal_long_request_latency)
# are recorded and exported.
client_with_overrides = await Client.connect(
client.service_client.config.target_host,
namespace=client.namespace,
runtime=runtime,
)
task_queue = f"task-queue-{uuid.uuid4()}"
async with Worker(
client_with_overrides,
task_queue=task_queue,
workflows=[HelloWorkflow],
):
assert "Hello, World!" == await client_with_overrides.execute_workflow(
HelloWorkflow.run,
"World",
id=f"workflow-{uuid.uuid4()}",
task_queue=task_queue,
)

async def check_metrics() -> None:
with lock:
snapshot = dict(captured)
for key, buckets in histogram_overrides.items():
assert (
key in snapshot
), f"Missing {key} in captured metrics: {list(snapshot)}"
assert snapshot[key] == pytest.approx(
buckets
), f"Bucket mismatch for {key}: got {snapshot[key]} expected {buckets}"

await assert_eventually(check_metrics)
finally:
server.shutdown()
server.server_close()


def test_runtime_options_invalid_heartbeat() -> None:
with pytest.raises(ValueError):
Runtime(
Expand Down