From f2987e8acbb787a2dbfb100565d164906165f877 Mon Sep 17 00:00:00 2001 From: Matteo Librizzi Date: Wed, 27 May 2026 17:41:09 +0100 Subject: [PATCH 1/2] feat(lib): expose data_converter kwarg on AgentexWorker and Temporal client APIs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a `data_converter` kwarg to: - `AgentexWorker.__init__` and `worker.get_temporal_client` - `clients.temporal.utils.get_temporal_client` - `TemporalClient` / `TemporalACP` / `TemporalACPConfig` / `FastACP` This unlocks composing `OpenAIAgentsPlugin` with a payload codec by passing a pre-built `DataConverter(payload_converter_class=OpenAIPayloadConverter, payload_codec=...)`. Previously the plugin would silently drop a standalone `payload_codec` kwarg because its `_data_converter(None)` transformer builds a fresh converter without any codec — the existing guard rejected this combination outright. The guard is refined: it now fires only when both the plugin is present AND `payload_codec` is the standalone kwarg AND no `data_converter` was supplied, and the error message points callers at the working composition path. An additional guard rejects passing `payload_codec` and `data_converter` together as ambiguous. No behavior change for existing callers (none currently pass `data_converter`). Co-Authored-By: Claude Opus 4.7 --- .../core/clients/temporal/temporal_client.py | 33 +++- .../lib/core/clients/temporal/utils.py | 51 ++++-- .../lib/core/temporal/workers/worker.py | 45 +++-- src/agentex/lib/sdk/fastacp/fastacp.py | 2 + .../lib/sdk/fastacp/impl/temporal_acp.py | 16 +- src/agentex/lib/types/fastacp.py | 12 +- tests/lib/test_payload_codec.py | 161 +++++++++++++++++- 7 files changed, 285 insertions(+), 35 deletions(-) diff --git a/src/agentex/lib/core/clients/temporal/temporal_client.py b/src/agentex/lib/core/clients/temporal/temporal_client.py index f44648da2..9f9cbfecf 100644 --- a/src/agentex/lib/core/clients/temporal/temporal_client.py +++ b/src/agentex/lib/core/clients/temporal/temporal_client.py @@ -7,7 +7,7 @@ from temporalio.client import Client, WorkflowExecutionStatus from temporalio.common import RetryPolicy as TemporalRetryPolicy, WorkflowIDReusePolicy from temporalio.service import RPCError, RPCStatusCode -from temporalio.converter import PayloadCodec +from temporalio.converter import DataConverter, PayloadCodec from agentex.lib.utils.logging import make_logger from agentex.lib.utils.model_utils import BaseModel @@ -78,11 +78,16 @@ class TemporalClient: def __init__( - self, temporal_client: Client | None = None, plugins: list[Any] = [], payload_codec: PayloadCodec | None = None + self, + temporal_client: Client | None = None, + plugins: list[Any] = [], + payload_codec: PayloadCodec | None = None, + data_converter: DataConverter | None = None, ): self._client: Client | None = temporal_client self._plugins = plugins self._payload_codec = payload_codec + self._data_converter = data_converter @property def client(self) -> Client: @@ -92,7 +97,13 @@ def client(self) -> Client: return self._client @classmethod - async def create(cls, temporal_address: str, plugins: list[Any] = [], payload_codec: PayloadCodec | None = None): + async def create( + cls, + temporal_address: str, + plugins: list[Any] = [], + payload_codec: PayloadCodec | None = None, + data_converter: DataConverter | None = None, + ): if temporal_address in [ "false", "False", @@ -105,8 +116,13 @@ async def create(cls, temporal_address: str, plugins: list[Any] = [], payload_co ]: _client = None else: - _client = await get_temporal_client(temporal_address, plugins=plugins, payload_codec=payload_codec) - return cls(_client, plugins, payload_codec) + _client = await get_temporal_client( + temporal_address, + plugins=plugins, + payload_codec=payload_codec, + data_converter=data_converter, + ) + return cls(_client, plugins, payload_codec, data_converter) async def setup(self, temporal_address: str): self._client = await self._get_temporal_client(temporal_address=temporal_address) @@ -124,7 +140,12 @@ async def _get_temporal_client(self, temporal_address: str) -> Client | None: ]: return None else: - return await get_temporal_client(temporal_address, plugins=self._plugins, payload_codec=self._payload_codec) + return await get_temporal_client( + temporal_address, + plugins=self._plugins, + payload_codec=self._payload_codec, + data_converter=self._data_converter, + ) async def start_workflow( self, diff --git a/src/agentex/lib/core/clients/temporal/utils.py b/src/agentex/lib/core/clients/temporal/utils.py index 8c2241c62..7b0f6aa01 100644 --- a/src/agentex/lib/core/clients/temporal/utils.py +++ b/src/agentex/lib/core/clients/temporal/utils.py @@ -6,7 +6,7 @@ from temporalio.client import Client, Plugin as ClientPlugin from temporalio.worker import Interceptor from temporalio.runtime import Runtime, TelemetryConfig, OpenTelemetryConfig -from temporalio.converter import PayloadCodec +from temporalio.converter import DataConverter, PayloadCodec from temporalio.contrib.pydantic import pydantic_data_converter # class DateTimeJSONEncoder(AdvancedJSONEncoder): @@ -86,6 +86,7 @@ async def get_temporal_client( metrics_url: str | None = None, plugins: list[Any] = [], payload_codec: PayloadCodec | None = None, + data_converter: DataConverter | None = None, ) -> Client: """ Create a Temporal client with plugin integration. @@ -94,7 +95,14 @@ async def get_temporal_client( temporal_address: Temporal server address metrics_url: Optional metrics endpoint URL plugins: List of Temporal plugins to include - payload_codec: Optional payload codec for encoding/decoding payloads (e.g. encryption, compression) + payload_codec: Optional payload codec for encoding/decoding payloads + (e.g. encryption, compression). Cannot be combined with the + OpenAIAgentsPlugin via this kwarg — see ``data_converter``. + data_converter: Optional pre-built ``DataConverter``. Use this when + composing the OpenAIAgentsPlugin with a payload codec: build a + ``DataConverter(payload_converter_class=OpenAIPayloadConverter, + payload_codec=...)`` and pass it here. Mutually exclusive with + ``payload_codec``. Returns: Configured Temporal client @@ -103,29 +111,50 @@ async def get_temporal_client( if plugins: validate_client_plugins(plugins) + if payload_codec is not None and data_converter is not None: + raise ValueError( + "Pass payload_codec inside `data_converter` " + "(DataConverter(..., payload_codec=...)) instead of as a separate " + "kwarg. Specifying both is ambiguous." + ) + # Check if OpenAI plugin is present - it needs to configure its own data converter # Lazy import to avoid pulling in opentelemetry.sdk for non-Temporal agents from temporalio.contrib.openai_agents import OpenAIAgentsPlugin has_openai_plugin = any(isinstance(p, OpenAIAgentsPlugin) for p in (plugins or [])) - if has_openai_plugin and payload_codec is not None: + # When the OpenAI plugin is present, its `_data_converter` transformer + # builds a fresh DataConverter (without any codec) if none is supplied, + # so a standalone `payload_codec` kwarg would be silently dropped and + # payloads would land in Temporal in plain text. Guide the caller to + # the working composition path instead. + if has_openai_plugin and payload_codec is not None and data_converter is None: raise ValueError( - "payload_codec is not supported alongside OpenAIAgentsPlugin: the plugin " - "installs its own data converter and the codec would be silently ignored, " - "leaving payloads unencoded. Remove one or the other." + "payload_codec passed as a kwarg alongside OpenAIAgentsPlugin would " + "be silently dropped by the plugin's data-converter transformer. " + "Build a DataConverter explicitly with " + "`payload_converter_class=OpenAIPayloadConverter` (or a subclass) " + "and `payload_codec=...`, then pass it via the `data_converter` " + "kwarg instead." ) - connect_kwargs = { + connect_kwargs: dict[str, Any] = { "target_host": temporal_address, "plugins": plugins, } - if not has_openai_plugin: - data_converter = pydantic_data_converter - if payload_codec: - data_converter = dataclasses.replace(data_converter, payload_codec=payload_codec) + if data_converter is not None: + # Caller supplied a pre-built converter. With the OpenAI plugin present + # and `payload_converter_class=OpenAIPayloadConverter` (or subclass), + # the plugin's `_data_converter` transformer passes it through intact, + # preserving any payload_codec. connect_kwargs["data_converter"] = data_converter + elif not has_openai_plugin: + dc = pydantic_data_converter + if payload_codec: + dc = dataclasses.replace(dc, payload_codec=payload_codec) + connect_kwargs["data_converter"] = dc if not metrics_url: client = await Client.connect(**connect_kwargs) diff --git a/src/agentex/lib/core/temporal/workers/worker.py b/src/agentex/lib/core/temporal/workers/worker.py index 2e8591242..1bb1972b5 100644 --- a/src/agentex/lib/core/temporal/workers/worker.py +++ b/src/agentex/lib/core/temporal/workers/worker.py @@ -95,35 +95,55 @@ async def get_temporal_client( metrics_url: str | None = None, plugins: list = [], payload_codec: PayloadCodec | None = None, + data_converter: DataConverter | None = None, ) -> Client: if plugins != []: # We don't need to validate the plugins if they are empty _validate_plugins(plugins) + if payload_codec is not None and data_converter is not None: + raise ValueError( + "Pass payload_codec inside `data_converter` " + "(DataConverter(..., payload_codec=...)) instead of as a separate " + "kwarg. Specifying both is ambiguous." + ) + # Check if OpenAI plugin is present - it needs to configure its own data converter # Lazy import to avoid pulling in opentelemetry.sdk for non-Temporal agents from temporalio.contrib.openai_agents import OpenAIAgentsPlugin has_openai_plugin = any(isinstance(p, OpenAIAgentsPlugin) for p in (plugins or [])) - if has_openai_plugin and payload_codec is not None: + # When the OpenAI plugin is present, its `_data_converter` transformer + # builds a fresh DataConverter (without any codec) if none is supplied, + # so a standalone `payload_codec` kwarg would be silently dropped and + # payloads would land in Temporal in plain text. Guide the caller to + # the working composition path instead. + if has_openai_plugin and payload_codec is not None and data_converter is None: raise ValueError( - "payload_codec is not supported alongside OpenAIAgentsPlugin: the plugin " - "installs its own data converter and the codec would be silently ignored, " - "leaving payloads unencoded. Remove one or the other." + "payload_codec passed as a kwarg alongside OpenAIAgentsPlugin would " + "be silently dropped by the plugin's data-converter transformer. " + "Build a DataConverter explicitly with " + "`payload_converter_class=OpenAIPayloadConverter` (or a subclass) " + "and `payload_codec=...`, then pass it via the `data_converter` " + "kwarg instead." ) - # Build connection kwargs - connect_kwargs = { + connect_kwargs: dict[str, Any] = { "target_host": temporal_address, "plugins": plugins, } - # Only set data_converter if OpenAI plugin is not present - if not has_openai_plugin: - data_converter = custom_data_converter - if payload_codec: - data_converter = dataclasses.replace(data_converter, payload_codec=payload_codec) + if data_converter is not None: + # Caller supplied a pre-built converter. With the OpenAI plugin present + # and `payload_converter_class=OpenAIPayloadConverter` (or subclass), + # the plugin's `_data_converter` transformer passes it through intact, + # preserving any payload_codec. connect_kwargs["data_converter"] = data_converter + elif not has_openai_plugin: + dc = custom_data_converter + if payload_codec: + dc = dataclasses.replace(dc, payload_codec=payload_codec) + connect_kwargs["data_converter"] = dc if not metrics_url: client = await Client.connect(**connect_kwargs) @@ -145,6 +165,7 @@ def __init__( interceptors: list = [], metrics_url: str | None = None, payload_codec: PayloadCodec | None = None, + data_converter: DataConverter | None = None, ): self.task_queue = task_queue self.activity_handles = [] @@ -159,6 +180,7 @@ def __init__( self.interceptors = interceptors self.metrics_url = metrics_url self.payload_codec = payload_codec + self.data_converter = data_converter @overload async def run( @@ -195,6 +217,7 @@ async def run( plugins=self.plugins, metrics_url=self.metrics_url, payload_codec=self.payload_codec, + data_converter=self.data_converter, ) # Enable debug mode if AgentEx debug is enabled (disables deadlock detection) diff --git a/src/agentex/lib/sdk/fastacp/fastacp.py b/src/agentex/lib/sdk/fastacp/fastacp.py index fbd4f0511..42859793d 100644 --- a/src/agentex/lib/sdk/fastacp/fastacp.py +++ b/src/agentex/lib/sdk/fastacp/fastacp.py @@ -65,6 +65,8 @@ def create_async_acp(config: AsyncACPConfig, **kwargs) -> BaseACPServer: temporal_config["interceptors"] = config.interceptors # type: ignore[attr-defined] if hasattr(config, "payload_codec"): temporal_config["payload_codec"] = config.payload_codec # type: ignore[attr-defined] + if hasattr(config, "data_converter"): + temporal_config["data_converter"] = config.data_converter # type: ignore[attr-defined] return implementation_class.create(**temporal_config) else: return implementation_class.create(**kwargs) diff --git a/src/agentex/lib/sdk/fastacp/impl/temporal_acp.py b/src/agentex/lib/sdk/fastacp/impl/temporal_acp.py index 54fe72e6c..32c19533d 100644 --- a/src/agentex/lib/sdk/fastacp/impl/temporal_acp.py +++ b/src/agentex/lib/sdk/fastacp/impl/temporal_acp.py @@ -4,7 +4,7 @@ from contextlib import asynccontextmanager from fastapi import FastAPI -from temporalio.converter import PayloadCodec +from temporalio.converter import DataConverter, PayloadCodec from agentex.protocol.acp import ( SendEventParams, @@ -33,6 +33,7 @@ def __init__( plugins: list[Any] | None = None, interceptors: list[Any] | None = None, payload_codec: PayloadCodec | None = None, + data_converter: DataConverter | None = None, ): super().__init__() self._temporal_task_service = temporal_task_service @@ -40,6 +41,7 @@ def __init__( self._plugins = plugins or [] self._interceptors = interceptors or [] self._payload_codec = payload_codec + self._data_converter = data_converter @classmethod @override @@ -49,12 +51,17 @@ def create( plugins: list[Any] | None = None, interceptors: list[Any] | None = None, payload_codec: PayloadCodec | None = None, + data_converter: DataConverter | None = None, ) -> "TemporalACP": logger.info("Initializing TemporalACP instance") # Create instance without temporal client initially temporal_acp = cls( - temporal_address=temporal_address, plugins=plugins, interceptors=interceptors, payload_codec=payload_codec + temporal_address=temporal_address, + plugins=plugins, + interceptors=interceptors, + payload_codec=payload_codec, + data_converter=data_converter, ) temporal_acp._setup_handlers() logger.info("TemporalACP instance initialized now") @@ -71,7 +78,10 @@ async def lifespan(app: FastAPI): if self._temporal_task_service is None: env_vars = EnvironmentVariables.refresh() temporal_client = await TemporalClient.create( - temporal_address=self._temporal_address, plugins=self._plugins, payload_codec=self._payload_codec + temporal_address=self._temporal_address, + plugins=self._plugins, + payload_codec=self._payload_codec, + data_converter=self._data_converter, ) self._temporal_task_service = TemporalTaskService( temporal_client=temporal_client, diff --git a/src/agentex/lib/types/fastacp.py b/src/agentex/lib/types/fastacp.py index e11091e93..2399790e3 100644 --- a/src/agentex/lib/types/fastacp.py +++ b/src/agentex/lib/types/fastacp.py @@ -56,7 +56,16 @@ class TemporalACPConfig(AsyncACPConfig): encoding/decoding payloads (e.g. encryption, compression). NOTE: this only configures the ACP (client) side. The worker side must be configured separately via ``AgentexWorker(payload_codec=...)`` - with the SAME codec, or decode will fail at runtime. + with the SAME codec, or decode will fail at runtime. Cannot be + combined with ``OpenAIAgentsPlugin``; use ``data_converter`` + instead in that case. + data_converter: Optional pre-built ``temporalio.converter.DataConverter``. + Use this when composing the ``OpenAIAgentsPlugin`` with a payload + codec: build a ``DataConverter(payload_converter_class= + OpenAIPayloadConverter, payload_codec=...)`` and pass it here. + Mutually exclusive with ``payload_codec``. The worker side must + be configured separately via ``AgentexWorker(data_converter=...)`` + with the SAME converter, or decode will fail at runtime. """ type: Literal["temporal"] = Field(default="temporal", frozen=True) @@ -64,6 +73,7 @@ class TemporalACPConfig(AsyncACPConfig): plugins: list[Any] = Field(default=[], frozen=True) interceptors: list[Any] = Field(default=[], frozen=True) payload_codec: Any = Field(default=None, frozen=True) + data_converter: Any = Field(default=None, frozen=True) @field_validator("plugins") @classmethod diff --git a/tests/lib/test_payload_codec.py b/tests/lib/test_payload_codec.py index bb2b24228..9089c829b 100644 --- a/tests/lib/test_payload_codec.py +++ b/tests/lib/test_payload_codec.py @@ -5,7 +5,11 @@ import pytest from temporalio.client import Client, Plugin as ClientPlugin -from temporalio.converter import PayloadCodec +from temporalio.converter import ( + DataConverter, + PayloadCodec, + DefaultPayloadConverter, +) from temporalio.contrib.pydantic import pydantic_data_converter @@ -68,6 +72,28 @@ async def test_create_propagates_codec_to_get_temporal_client(self): mock_get.assert_awaited_once() assert mock_get.await_args.kwargs["payload_codec"] is codec + def test_init_stores_data_converter(self): + from agentex.lib.core.clients.temporal.temporal_client import TemporalClient + + dc = DataConverter(payload_codec=_NoopCodec()) + client = TemporalClient(data_converter=dc) + assert client._data_converter is dc + + def test_init_default_data_converter_is_none(self): + from agentex.lib.core.clients.temporal.temporal_client import TemporalClient + + assert TemporalClient()._data_converter is None + + async def test_create_propagates_data_converter_to_get_temporal_client(self): + import agentex.lib.core.clients.temporal.temporal_client as module + + dc = DataConverter(payload_codec=_NoopCodec()) + with patch.object(module, "get_temporal_client", new=AsyncMock(return_value=object())) as mock_get: + await module.TemporalClient.create(temporal_address="localhost:7233", plugins=[], data_converter=dc) + + mock_get.assert_awaited_once() + assert mock_get.await_args.kwargs["data_converter"] is dc + class TestGetTemporalClientUtils: async def test_no_codec_uses_pydantic_data_converter_unchanged(self): @@ -96,7 +122,7 @@ async def test_codec_with_openai_plugin_raises(self): codec = _NoopCodec() with _patch_openai_plugin(), _mock_connect() as mock_connect: - with pytest.raises(ValueError, match="payload_codec is not supported alongside OpenAIAgentsPlugin"): + with pytest.raises(ValueError, match="silently dropped by the plugin's data-converter transformer"): await get_temporal_client( temporal_address="localhost:7233", plugins=[_FakeOpenAIPlugin()], @@ -112,6 +138,42 @@ async def test_openai_plugin_without_codec_omits_data_converter(self): assert "data_converter" not in mock_connect.await_args.kwargs + async def test_data_converter_passthrough_with_openai_plugin(self): + from agentex.lib.core.clients.temporal.utils import get_temporal_client + + dc = DataConverter(payload_codec=_NoopCodec()) + with _patch_openai_plugin(), _mock_connect() as mock_connect: + await get_temporal_client( + temporal_address="localhost:7233", + plugins=[_FakeOpenAIPlugin()], + data_converter=dc, + ) + + assert mock_connect.await_args.kwargs["data_converter"] is dc + + async def test_data_converter_passthrough_without_openai_plugin(self): + from agentex.lib.core.clients.temporal.utils import get_temporal_client + + dc = DataConverter(payload_converter_class=DefaultPayloadConverter) + with _mock_connect() as mock_connect: + await get_temporal_client(temporal_address="localhost:7233", data_converter=dc) + + assert mock_connect.await_args.kwargs["data_converter"] is dc + + async def test_codec_and_data_converter_together_raises(self): + from agentex.lib.core.clients.temporal.utils import get_temporal_client + + codec = _NoopCodec() + dc = DataConverter(payload_codec=codec) + with _mock_connect() as mock_connect: + with pytest.raises(ValueError, match="Pass payload_codec inside `data_converter`"): + await get_temporal_client( + temporal_address="localhost:7233", + payload_codec=codec, + data_converter=dc, + ) + mock_connect.assert_not_awaited() + class TestGetTemporalClientWorker: async def test_no_codec_uses_custom_data_converter_unchanged(self): @@ -140,7 +202,7 @@ async def test_codec_with_openai_plugin_raises(self): codec = _NoopCodec() with _patch_openai_plugin(), _mock_connect() as mock_connect: - with pytest.raises(ValueError, match="payload_codec is not supported alongside OpenAIAgentsPlugin"): + with pytest.raises(ValueError, match="silently dropped by the plugin's data-converter transformer"): await get_temporal_client( temporal_address="localhost:7233", plugins=[_FakeOpenAIPlugin()], @@ -156,6 +218,42 @@ async def test_openai_plugin_without_codec_omits_data_converter(self): assert "data_converter" not in mock_connect.await_args.kwargs + async def test_data_converter_passthrough_with_openai_plugin(self): + from agentex.lib.core.temporal.workers.worker import get_temporal_client + + dc = DataConverter(payload_codec=_NoopCodec()) + with _patch_openai_plugin(), _mock_connect() as mock_connect: + await get_temporal_client( + temporal_address="localhost:7233", + plugins=[_FakeOpenAIPlugin()], + data_converter=dc, + ) + + assert mock_connect.await_args.kwargs["data_converter"] is dc + + async def test_data_converter_passthrough_without_openai_plugin(self): + from agentex.lib.core.temporal.workers.worker import get_temporal_client + + dc = DataConverter(payload_converter_class=DefaultPayloadConverter) + with _mock_connect() as mock_connect: + await get_temporal_client(temporal_address="localhost:7233", data_converter=dc) + + assert mock_connect.await_args.kwargs["data_converter"] is dc + + async def test_codec_and_data_converter_together_raises(self): + from agentex.lib.core.temporal.workers.worker import get_temporal_client + + codec = _NoopCodec() + dc = DataConverter(payload_codec=codec) + with _mock_connect() as mock_connect: + with pytest.raises(ValueError, match="Pass payload_codec inside `data_converter`"): + await get_temporal_client( + temporal_address="localhost:7233", + payload_codec=codec, + data_converter=dc, + ) + mock_connect.assert_not_awaited() + class TestAgentexWorkerCodec: def test_worker_stores_payload_codec(self): @@ -171,6 +269,19 @@ def test_worker_default_payload_codec_is_none(self): worker = AgentexWorker(task_queue="test-queue", health_check_port=80) assert worker.payload_codec is None + def test_worker_stores_data_converter(self): + from agentex.lib.core.temporal.workers.worker import AgentexWorker + + dc = DataConverter(payload_codec=_NoopCodec()) + worker = AgentexWorker(task_queue="test-queue", health_check_port=80, data_converter=dc) + assert worker.data_converter is dc + + def test_worker_default_data_converter_is_none(self): + from agentex.lib.core.temporal.workers.worker import AgentexWorker + + worker = AgentexWorker(task_queue="test-queue", health_check_port=80) + assert worker.data_converter is None + class TestTemporalACPCodec: def test_create_stores_payload_codec(self): @@ -186,6 +297,19 @@ def test_create_default_payload_codec_is_none(self): acp = TemporalACP.create(temporal_address="localhost:7233") assert acp._payload_codec is None + def test_create_stores_data_converter(self): + from agentex.lib.sdk.fastacp.impl.temporal_acp import TemporalACP + + dc = DataConverter(payload_codec=_NoopCodec()) + acp = TemporalACP.create(temporal_address="localhost:7233", data_converter=dc) + assert acp._data_converter is dc + + def test_create_default_data_converter_is_none(self): + from agentex.lib.sdk.fastacp.impl.temporal_acp import TemporalACP + + acp = TemporalACP.create(temporal_address="localhost:7233") + assert acp._data_converter is None + class TestFastACPConfigCodec: def test_config_default_codec_is_none(self): @@ -218,3 +342,34 @@ def fake_create(**kwargs): FastACP.create("async", config=config) assert captured.get("payload_codec") is codec + + def test_config_default_data_converter_is_none(self): + from agentex.lib.types.fastacp import TemporalACPConfig + + assert TemporalACPConfig().data_converter is None + + def test_config_accepts_data_converter(self): + from agentex.lib.types.fastacp import TemporalACPConfig + + dc = DataConverter(payload_codec=_NoopCodec()) + assert TemporalACPConfig(data_converter=dc).data_converter is dc + + def test_fastacp_forwards_data_converter_from_config(self): + from agentex.lib.types.fastacp import TemporalACPConfig + from agentex.lib.sdk.fastacp.fastacp import FastACP + + dc = DataConverter(payload_codec=_NoopCodec()) + config = TemporalACPConfig(data_converter=dc) + captured: dict[str, Any] = {} + + def fake_create(**kwargs): + captured.update(kwargs) + return object() + + with patch( + "agentex.lib.sdk.fastacp.impl.temporal_acp.TemporalACP.create", + side_effect=fake_create, + ): + FastACP.create("async", config=config) + + assert captured.get("data_converter") is dc From 55ccc59aacdd6cc2e11a05109f0b0e8927f6e5e2 Mon Sep 17 00:00:00 2001 From: Matteo Librizzi Date: Wed, 27 May 2026 22:55:58 +0100 Subject: [PATCH 2/2] fix(lib): tighten TemporalACPConfig + concise comments + import order MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add `model_validator(mode="after")` on `TemporalACPConfig` that rejects setting both `payload_codec` and `data_converter`, mirroring the `get_temporal_client` ambiguity guard at config-construction time (consistent with the existing `plugins` / `interceptors` validators). - Collapse multi-line comments in `worker.py` and `clients/temporal/utils.py` to single-line context — the PR description carries the long-form rationale. - Ruff import-order autofixes on touched files (resolves CI lint). Co-Authored-By: Claude Opus 4.7 --- .../lib/core/clients/temporal/temporal_client.py | 2 +- src/agentex/lib/core/clients/temporal/utils.py | 12 +----------- src/agentex/lib/core/temporal/workers/worker.py | 10 ---------- src/agentex/lib/sdk/fastacp/impl/temporal_acp.py | 2 +- src/agentex/lib/types/fastacp.py | 12 +++++++++++- tests/lib/test_payload_codec.py | 12 +++++++++++- 6 files changed, 25 insertions(+), 25 deletions(-) diff --git a/src/agentex/lib/core/clients/temporal/temporal_client.py b/src/agentex/lib/core/clients/temporal/temporal_client.py index 9f9cbfecf..ca17d30ff 100644 --- a/src/agentex/lib/core/clients/temporal/temporal_client.py +++ b/src/agentex/lib/core/clients/temporal/temporal_client.py @@ -7,7 +7,7 @@ from temporalio.client import Client, WorkflowExecutionStatus from temporalio.common import RetryPolicy as TemporalRetryPolicy, WorkflowIDReusePolicy from temporalio.service import RPCError, RPCStatusCode -from temporalio.converter import DataConverter, PayloadCodec +from temporalio.converter import PayloadCodec, DataConverter from agentex.lib.utils.logging import make_logger from agentex.lib.utils.model_utils import BaseModel diff --git a/src/agentex/lib/core/clients/temporal/utils.py b/src/agentex/lib/core/clients/temporal/utils.py index 7b0f6aa01..95319720a 100644 --- a/src/agentex/lib/core/clients/temporal/utils.py +++ b/src/agentex/lib/core/clients/temporal/utils.py @@ -6,7 +6,7 @@ from temporalio.client import Client, Plugin as ClientPlugin from temporalio.worker import Interceptor from temporalio.runtime import Runtime, TelemetryConfig, OpenTelemetryConfig -from temporalio.converter import DataConverter, PayloadCodec +from temporalio.converter import PayloadCodec, DataConverter from temporalio.contrib.pydantic import pydantic_data_converter # class DateTimeJSONEncoder(AdvancedJSONEncoder): @@ -118,17 +118,11 @@ async def get_temporal_client( "kwarg. Specifying both is ambiguous." ) - # Check if OpenAI plugin is present - it needs to configure its own data converter # Lazy import to avoid pulling in opentelemetry.sdk for non-Temporal agents from temporalio.contrib.openai_agents import OpenAIAgentsPlugin has_openai_plugin = any(isinstance(p, OpenAIAgentsPlugin) for p in (plugins or [])) - # When the OpenAI plugin is present, its `_data_converter` transformer - # builds a fresh DataConverter (without any codec) if none is supplied, - # so a standalone `payload_codec` kwarg would be silently dropped and - # payloads would land in Temporal in plain text. Guide the caller to - # the working composition path instead. if has_openai_plugin and payload_codec is not None and data_converter is None: raise ValueError( "payload_codec passed as a kwarg alongside OpenAIAgentsPlugin would " @@ -145,10 +139,6 @@ async def get_temporal_client( } if data_converter is not None: - # Caller supplied a pre-built converter. With the OpenAI plugin present - # and `payload_converter_class=OpenAIPayloadConverter` (or subclass), - # the plugin's `_data_converter` transformer passes it through intact, - # preserving any payload_codec. connect_kwargs["data_converter"] = data_converter elif not has_openai_plugin: dc = pydantic_data_converter diff --git a/src/agentex/lib/core/temporal/workers/worker.py b/src/agentex/lib/core/temporal/workers/worker.py index 1bb1972b5..253b6759f 100644 --- a/src/agentex/lib/core/temporal/workers/worker.py +++ b/src/agentex/lib/core/temporal/workers/worker.py @@ -107,17 +107,11 @@ async def get_temporal_client( "kwarg. Specifying both is ambiguous." ) - # Check if OpenAI plugin is present - it needs to configure its own data converter # Lazy import to avoid pulling in opentelemetry.sdk for non-Temporal agents from temporalio.contrib.openai_agents import OpenAIAgentsPlugin has_openai_plugin = any(isinstance(p, OpenAIAgentsPlugin) for p in (plugins or [])) - # When the OpenAI plugin is present, its `_data_converter` transformer - # builds a fresh DataConverter (without any codec) if none is supplied, - # so a standalone `payload_codec` kwarg would be silently dropped and - # payloads would land in Temporal in plain text. Guide the caller to - # the working composition path instead. if has_openai_plugin and payload_codec is not None and data_converter is None: raise ValueError( "payload_codec passed as a kwarg alongside OpenAIAgentsPlugin would " @@ -134,10 +128,6 @@ async def get_temporal_client( } if data_converter is not None: - # Caller supplied a pre-built converter. With the OpenAI plugin present - # and `payload_converter_class=OpenAIPayloadConverter` (or subclass), - # the plugin's `_data_converter` transformer passes it through intact, - # preserving any payload_codec. connect_kwargs["data_converter"] = data_converter elif not has_openai_plugin: dc = custom_data_converter diff --git a/src/agentex/lib/sdk/fastacp/impl/temporal_acp.py b/src/agentex/lib/sdk/fastacp/impl/temporal_acp.py index 32c19533d..69d843720 100644 --- a/src/agentex/lib/sdk/fastacp/impl/temporal_acp.py +++ b/src/agentex/lib/sdk/fastacp/impl/temporal_acp.py @@ -4,7 +4,7 @@ from contextlib import asynccontextmanager from fastapi import FastAPI -from temporalio.converter import DataConverter, PayloadCodec +from temporalio.converter import PayloadCodec, DataConverter from agentex.protocol.acp import ( SendEventParams, diff --git a/src/agentex/lib/types/fastacp.py b/src/agentex/lib/types/fastacp.py index 2399790e3..493ca5f11 100644 --- a/src/agentex/lib/types/fastacp.py +++ b/src/agentex/lib/types/fastacp.py @@ -2,7 +2,7 @@ from typing import Any, Literal -from pydantic import Field, BaseModel, field_validator +from pydantic import Field, BaseModel, field_validator, model_validator from agentex.lib.core.clients.temporal.utils import validate_client_plugins, validate_worker_interceptors @@ -89,6 +89,16 @@ def validate_interceptors(cls, v: list[Any]) -> list[Any]: validate_worker_interceptors(v) return v + @model_validator(mode="after") + def _validate_codec_and_data_converter_mutually_exclusive(self) -> "TemporalACPConfig": + if self.payload_codec is not None and self.data_converter is not None: + raise ValueError( + "Pass payload_codec inside `data_converter` " + "(DataConverter(..., payload_codec=...)) instead of as a separate " + "field. Specifying both is ambiguous." + ) + return self + class AsyncBaseACPConfig(AsyncACPConfig): """Configuration for AsyncBaseACP implementation diff --git a/tests/lib/test_payload_codec.py b/tests/lib/test_payload_codec.py index 9089c829b..59736dc21 100644 --- a/tests/lib/test_payload_codec.py +++ b/tests/lib/test_payload_codec.py @@ -6,8 +6,8 @@ import pytest from temporalio.client import Client, Plugin as ClientPlugin from temporalio.converter import ( - DataConverter, PayloadCodec, + DataConverter, DefaultPayloadConverter, ) from temporalio.contrib.pydantic import pydantic_data_converter @@ -354,6 +354,16 @@ def test_config_accepts_data_converter(self): dc = DataConverter(payload_codec=_NoopCodec()) assert TemporalACPConfig(data_converter=dc).data_converter is dc + def test_config_rejects_codec_and_data_converter_together(self): + from pydantic import ValidationError + + from agentex.lib.types.fastacp import TemporalACPConfig + + codec = _NoopCodec() + dc = DataConverter(payload_codec=codec) + with pytest.raises(ValidationError, match="Pass payload_codec inside `data_converter`"): + TemporalACPConfig(payload_codec=codec, data_converter=dc) + def test_fastacp_forwards_data_converter_from_config(self): from agentex.lib.types.fastacp import TemporalACPConfig from agentex.lib.sdk.fastacp.fastacp import FastACP