Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/a2a/server/agent_execution/active_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
Event,
EventQueueSource,
QueueShutDown,
_create_async_queue,
create_async_queue,
)
from a2a.server.tasks import PushNotificationEvent
from a2a.types.a2a_pb2 import (
Expand Down Expand Up @@ -402,7 +402,7 @@ def __init__(

# Queue for incoming requests
self._request_queue: AsyncQueue[tuple[RequestContext, uuid.UUID]] = (
_create_async_queue()
create_async_queue()
)

@property
Expand Down
29 changes: 6 additions & 23 deletions src/a2a/server/events/event_queue.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import logging
import sys
import warnings

from abc import ABC, abstractmethod
Expand All @@ -9,33 +8,17 @@

from typing_extensions import Self


if sys.version_info >= (3, 13):
from asyncio import Queue as AsyncQueue
from asyncio import QueueShutDown

def _create_async_queue(maxsize: int = 0) -> AsyncQueue[Any]:
"""Create a backwards-compatible queue object."""
return AsyncQueue(maxsize=maxsize)
else:
import culsans

from culsans import AsyncQueue # type: ignore[no-redef]
from culsans import (
AsyncQueueShutDown as QueueShutDown, # type: ignore[no-redef]
)

def _create_async_queue(maxsize: int = 0) -> AsyncQueue[Any]:
"""Create a backwards-compatible queue object."""
return culsans.Queue(maxsize=maxsize).async_q # type: ignore[no-any-return]


from a2a.types.a2a_pb2 import (
Message,
Task,
TaskArtifactUpdateEvent,
TaskStatusUpdateEvent,
)
from a2a.utils._async_queue_compat import (
AsyncQueue,
QueueShutDown,
create_async_queue,
)
from a2a.utils.telemetry import SpanKind, trace_class


Expand Down Expand Up @@ -101,7 +84,7 @@ def __init__(self, max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE) -> None:
if max_queue_size <= 0:
raise ValueError('max_queue_size must be greater than 0')

self._queue: AsyncQueue[Event] = _create_async_queue(
self._queue: AsyncQueue[Event] = create_async_queue(
maxsize=max_queue_size
)
self._children: list[EventQueueLegacy] = []
Expand Down
6 changes: 3 additions & 3 deletions src/a2a/server/events/event_queue_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
Event,
EventQueue,
QueueShutDown,
_create_async_queue,
create_async_queue,
)
from a2a.utils.telemetry import SpanKind, trace_class

Expand All @@ -37,7 +37,7 @@ def __init__(
if max_queue_size <= 0:
raise ValueError('max_queue_size must be greater than 0')

self._incoming_queue: AsyncQueue[Event] = _create_async_queue(
self._incoming_queue: AsyncQueue[Event] = create_async_queue(
maxsize=max_queue_size
)
self._lock = asyncio.Lock()
Expand Down Expand Up @@ -293,7 +293,7 @@ def __init__(
raise ValueError('max_queue_size must be greater than 0')

self._parent = parent
self._queue: AsyncQueue[Event] = _create_async_queue(
self._queue: AsyncQueue[Event] = create_async_queue(
maxsize=max_queue_size
)
self._is_closed = False
Expand Down
28 changes: 28 additions & 0 deletions src/a2a/utils/_async_queue_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Cross-version aliases for async queue primitives."""

import sys

from typing import Any


if sys.version_info >= (3, 13):
from asyncio import Queue as AsyncQueue
from asyncio import QueueShutDown

def create_async_queue(maxsize: int = 0) -> AsyncQueue[Any]:
"""Create a backwards-compatible async queue object."""
return AsyncQueue(maxsize=maxsize)
else:
import culsans

from culsans import AsyncQueue # type: ignore[no-redef]
from culsans import (
AsyncQueueShutDown as QueueShutDown, # type: ignore[no-redef]
)

def create_async_queue(maxsize: int = 0) -> AsyncQueue[Any]:
"""Create a backwards-compatible async queue object."""
return culsans.Queue(maxsize=maxsize).async_q # type: ignore[no-any-return]


__all__ = ['AsyncQueue', 'QueueShutDown', 'create_async_queue']
19 changes: 15 additions & 4 deletions src/a2a/utils/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@

from typing_extensions import Self

from a2a.utils._async_queue_compat import QueueShutDown


if TYPE_CHECKING:
from opentelemetry.trace import (
Expand Down Expand Up @@ -144,6 +146,12 @@
__all__ = ['SpanKind']


_NON_ERROR_EXCEPTIONS: tuple[type[BaseException], ...] = (
asyncio.CancelledError,
QueueShutDown,
)


def trace_function( # noqa: PLR0915
func: Callable | None = None,
*,
Expand Down Expand Up @@ -233,31 +241,34 @@
# Async wrapper, await for the function call to complete.
result = await func(*args, **kwargs)
span.set_status(StatusCode.OK)
# asyncio.CancelledError extends from BaseException
except asyncio.CancelledError as ce:
except _NON_ERROR_EXCEPTIONS as ge:
exception = None
logger.debug('CancelledError in span %s', actual_span_name)
span.record_exception(ce)
logger.debug(
'%s in span %s',
type(ge).__name__,
actual_span_name,
)
span.record_exception(ge)
raise
except Exception as e:
exception = e
span.record_exception(e)
span.set_status(StatusCode.ERROR, description=str(e))
raise
finally:
if attribute_extractor:
try:
attribute_extractor(
span, args, kwargs, result, exception
)
except Exception:
logger.exception(
'attribute_extractor error in span %s',
actual_span_name,
)
return result

@functools.wraps(func)

Check notice on line 271 in src/a2a/utils/telemetry.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/utils/telemetry.py (288-306)
def sync_wrapper(*args, **kwargs) -> Any:
"""Sync Wrapper for the decorator."""
tracer = trace.get_tracer(INSTRUMENTING_MODULE_NAME)
Expand Down
28 changes: 28 additions & 0 deletions tests/utils/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import pytest

from a2a.server.events.event_queue import QueueShutDown
from a2a.utils.telemetry import trace_class, trace_function


Expand Down Expand Up @@ -266,3 +267,30 @@ def test_env_var_disabled_logs_message(
in caplog.text
)
assert 'OTEL_INSTRUMENTATION_A2A_SDK_ENABLED' in caplog.text


@pytest.mark.asyncio
@pytest.mark.parametrize('exc_cls', [asyncio.CancelledError, QueueShutDown])
async def test_trace_function_async_non_error_exception_does_not_mark_span_error(
mock_span: mock.MagicMock,
exc_cls: type[BaseException],
) -> None:
"""`trace_function` records non-error exceptions but never marks span ERROR.

Covers `asyncio.CancelledError` and `QueueShutDown`.
"""

@trace_function
async def non_error_exception() -> NoReturn:
await asyncio.sleep(0)
raise exc_cls('operation ended with non-error exception')

with pytest.raises(exc_cls):
await non_error_exception()

mock_span.record_exception.assert_called()
# The wrapper only passes `description=` when calling
# `set_status(StatusCode.ERROR, ...)`. Its absence on every call proves
# the span was never marked as failed.
for call in mock_span.set_status.call_args_list:
assert 'description' not in call.kwargs
Loading