Skip to content

[Bug] non determinism when using asyncio.gather with local activities #1578

@paulcacheux

Description

@paulcacheux

What are you really trying to do?

At a high level, our code is using some local activities at the beginning and end of some workflows parts. We got some reports of non determinism errors when those workflow parts are called using asyncio.gather.

Describe the bug

According to my understanding (might be wrong of course):
During first execution, local activity resolve_activity jobs arrive in completion order (wall-clock timing). During replay, they arrive in sequence number order. This ordering difference causes coroutines to resume in a different order, assigning different sequence numbers to subsequent commands, which no longer match the recorded markers.

Minimal Reproduction

Extracted from: #1573

@activity.defn
async def local_activity_slow(index: int) -> None:
    if index % 2 == 0:
        await asyncio.sleep(0.05)


@activity.defn
async def local_activity_fast(index: int) -> None:
    return None


@activity.defn
async def local_activity_gate() -> None:
    await asyncio.sleep(0.05)


@workflow.defn
class ConcurrentLocalActivityReplayWorkflow:
    """Workflow that runs two concurrent coroutines with local activities.

    This reproduces a replay nondeterminism bug: during first execution,
    local activities take real time, creating a deterministic interleaving.
    During replay, all local activities return instantly from markers, which
    can reorder coroutine scheduling and produce a different command sequence.
    """

    @workflow.run
    async def run(self) -> list[int]:
        async def lifecycle_a(index: int) -> int:
            await workflow.execute_local_activity(
                local_activity_slow,
                args=[index * 2],
                start_to_close_timeout=timedelta(seconds=5),
            )
            await workflow.execute_local_activity(
                local_activity_fast,
                args=[index * 2],
                start_to_close_timeout=timedelta(seconds=5),
            )
            return index * 2

        async def lifecycle_b(index: int) -> int:
            await workflow.execute_local_activity(
                local_activity_gate,
                start_to_close_timeout=timedelta(seconds=5),
            )
            await workflow.execute_local_activity(
                local_activity_slow,
                args=[index * 2 + 1],
                start_to_close_timeout=timedelta(seconds=5),
            )
            await workflow.execute_local_activity(
                local_activity_fast,
                args=[index * 2 + 1],
                start_to_close_timeout=timedelta(seconds=5),
            )
            return index * 2 + 1

        results: list[int] = []
        for index in range(20):
            results.extend(await asyncio.gather(lifecycle_a(index), lifecycle_b(index)))
        return results


async def test_workflow_concurrent_local_activity_replay(client: Client):
    """Test that concurrent local activities replay deterministically.

    Runs a workflow with two concurrent coroutines that each issue multiple
    local activities, then replays the history. Without the fix, replay
    fails with NondeterminismError because the local activity command order
    diverges from the recorded marker order.
    """
    async with new_worker(
        client,
        ConcurrentLocalActivityReplayWorkflow,
        activities=[local_activity_slow, local_activity_fast, local_activity_gate],
    ) as worker:
        handle = await client.start_workflow(
            ConcurrentLocalActivityReplayWorkflow.run,
            id=f"workflow-{uuid.uuid4()}",
            task_queue=worker.task_queue,
        )
        expected = [v for i in range(20) for v in (i * 2, i * 2 + 1)]
        assert await handle.result() == expected

        history = await handle.fetch_history()

    await Replayer(
        workflows=[ConcurrentLocalActivityReplayWorkflow],
    ).replay_workflow(history)

Environment/Versions

  • OS and processor: M1 Mac and Linux at least (I believe this is more general than that)
  • Temporal Version: server=1.30.4, sdk=1.27.2, but also reproducing on main of sdk-python
  • Building Temporal server from source

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions