diff --git a/src/a2a/utils/telemetry.py b/src/a2a/utils/telemetry.py index 7dbfd12c4..855f405a3 100644 --- a/src/a2a/utils/telemetry.py +++ b/src/a2a/utils/telemetry.py @@ -68,12 +68,28 @@ def internal_method(self): import inspect import logging import os +import sys from collections.abc import Callable from typing import TYPE_CHECKING, Any from typing_extensions import Self +# Graceful-shutdown exception allowlist. ``asyncio.CancelledError`` is always +# treated as non-error. On Python 3.13+ ``asyncio.QueueShutDown`` is also +# a graceful signal; on older versions the back-port ``culsans`` exposes it +# as ``AsyncQueueShutDown``. Import whichever is available. +_GRACEFUL_EXCEPTIONS: tuple[type[BaseException], ...] = (asyncio.CancelledError,) +if sys.version_info >= (3, 13): + _GRACEFUL_EXCEPTIONS += (asyncio.QueueShutDown,) # type: ignore[attr-defined] +else: + try: + from culsans import AsyncQueueShutDown + + _GRACEFUL_EXCEPTIONS += (AsyncQueueShutDown,) + except ImportError: + pass # culsans not installed — fall back to CancelledError only + if TYPE_CHECKING: from opentelemetry.trace import ( @@ -233,11 +249,14 @@ async def async_wrapper(*args, **kwargs) -> Any: # Async wrapper, await for the function call to complete. result = await func(*args, **kwargs) span.set_status(StatusCode.OK) - # asyncio.CancelledError extends from BaseException - except asyncio.CancelledError as ce: + # Graceful-shutdown exceptions — record but do NOT mark as ERROR. + # ``asyncio.CancelledError`` is always included; on Python 3.13+ + # ``asyncio.QueueShutDown`` (or the culsans back-port) is added + # when the queue is closed normally. + except _GRACEFUL_EXCEPTIONS as ge: exception = None - logger.debug('CancelledError in span %s', actual_span_name) - span.record_exception(ce) + logger.debug('%s in span %s', type(ge).__name__, actual_span_name) + span.record_exception(ge) raise except Exception as e: exception = e diff --git a/tests/utils/test_telemetry.py b/tests/utils/test_telemetry.py index a43bf1fa3..e73774138 100644 --- a/tests/utils/test_telemetry.py +++ b/tests/utils/test_telemetry.py @@ -266,3 +266,65 @@ def test_env_var_disabled_logs_message( in caplog.text ) assert 'OTEL_INSTRUMENTATION_A2A_SDK_ENABLED' in caplog.text + + +# ─── Graceful-shutdown exception allowlist ─────────────────────────────── + + +class _FakeQueueShutDown(BaseException): + """Simulated graceful-shutdown exception matching QueueShutDown semantics.""" + + +@pytest.mark.asyncio +async def test_trace_function_async_cancelled_error_graceful( + mock_span: mock.MagicMock, +) -> None: + """CancelledError is recorded but does NOT set span status to ERROR.""" + + @trace_function + async def raises_cancelled() -> NoReturn: + raise asyncio.CancelledError() + + with pytest.raises(asyncio.CancelledError): + await raises_cancelled() + + mock_span.record_exception.assert_called() + # Should NOT have set ERROR status + for call in mock_span.set_status.call_args_list: + args, _ = call + if args and hasattr(args[0], 'name'): + assert args[0].name != 'ERROR', ( + 'CancelledError should not set ERROR status' + ) + + +@pytest.mark.asyncio +async def test_trace_function_async_queue_shutdown_graceful( + mock_span: mock.MagicMock, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """QueueShutDown (or back-port) is recorded but does NOT set ERROR status.""" + from a2a.utils import telemetry + + # Patch the graceful-exceptions tuple to include our fake exception + monkeypatch.setattr( + telemetry, + '_GRACEFUL_EXCEPTIONS', + (asyncio.CancelledError, _FakeQueueShutDown), + ) + + @trace_function + async def raises_shutdown() -> NoReturn: + raise _FakeQueueShutDown('queue closed') + + with pytest.raises(_FakeQueueShutDown): + await raises_shutdown() + + mock_span.record_exception.assert_called() + # Should NOT have set ERROR status + for call in mock_span.set_status.call_args_list: + args, _ = call + if args and hasattr(args[0], 'name'): + assert args[0].name != 'ERROR', ( + 'QueueShutDown should not set ERROR status' + )