Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -478,13 +478,9 @@ class InvokeConfig(Generic[P, R]):
Configuration for invoke operations.

This class configures how function invocations are executed, including
timeout behavior, serialization, and tenant isolation.
serialization and tenant isolation.

Args:
timeout: Maximum duration to wait for the invoked function to complete.
Default is no timeout. Use this to prevent long-running invocations
from blocking execution indefinitely.

serdes_payload: Custom serialization/deserialization for the payload
sent to the invoked function. Defaults to DEFAULT_JSON_SERDES when
not set.
Expand All @@ -498,16 +494,10 @@ class InvokeConfig(Generic[P, R]):
"""

# retry_strategy: Callable[[Exception, int], RetryDecision] | None = None
timeout: Duration = field(default_factory=Duration)
serdes_payload: SerDes[P] | None = None
serdes_result: SerDes[R] | None = None
tenant_id: str | None = None

@property
def timeout_seconds(self) -> int:
"""Get timeout in seconds."""
return self.timeout.to_seconds()


@dataclass(frozen=True)
class CallbackConfig:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def execute(self, _checkpointed_result: CheckpointedResult) -> R:
ExecutionError: If suspend doesn't raise (should never happen)
"""
msg: str = f"Invoke {self.operation_identifier.operation_id} started, suspending for completion"
suspend_with_optional_resume_delay(msg, self.config.timeout_seconds)
suspend_with_optional_resume_delay(msg)
# This line should never be reached since suspend_with_optional_resume_delay always raises
error_msg: str = "suspend_with_optional_resume_delay should have raised an exception, but did not."
raise ExecutionError(error_msg) from None
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@ def test_invoke_config_defaults():
"""Test InvokeConfig defaults."""
config = InvokeConfig()
assert config.tenant_id is None
assert config.timeout_seconds == 0


def test_invoke_config_with_tenant_id():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ def test_invoke_with_name_and_config(mock_executor_class):
mock_state.durable_execution_arn = (
"arn:aws:durable:us-east-1:123456789012:execution/test"
)
config = InvokeConfig[str, str](timeout=Duration.from_seconds(30))
config = InvokeConfig[str, str]()

context = create_test_context(state=mock_state)
[context._create_step_id() for _ in range(5)] # Set counter to 5 # noqa: SLF001
Expand Down Expand Up @@ -782,7 +782,6 @@ def test_invoke_with_custom_serdes(mock_executor_class):
config = InvokeConfig[dict, dict](
serdes_payload=payload_serdes,
serdes_result=result_serdes,
timeout=Duration.from_minutes(1),
)

context = create_test_context(state=mock_state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import pytest

from aws_durable_execution_sdk_python.config import Duration, InvokeConfig
from aws_durable_execution_sdk_python.config import InvokeConfig
from aws_durable_execution_sdk_python.exceptions import (
CallableRuntimeError,
ExecutionError,
Expand Down Expand Up @@ -218,8 +218,8 @@ def test_invoke_handler_already_started(status):


@pytest.mark.parametrize("status", [OperationStatus.STARTED, OperationStatus.PENDING])
def test_invoke_handler_already_started_with_timeout(status):
"""Test invoke_handler when operation is already started with timeout config."""
def test_invoke_handler_already_started_suspends(status):
"""Test invoke_handler when operation is already started suspends indefinitely."""
mock_state = Mock(spec=ExecutionState)
mock_state.durable_execution_arn = "test_arn"

Expand All @@ -232,9 +232,9 @@ def test_invoke_handler_already_started_with_timeout(status):
mock_result = CheckpointedResult.create_from_operation(operation)
mock_state.get_checkpoint_result.return_value = mock_result

config = InvokeConfig[str, str](timeout=Duration.from_seconds(30))
config = InvokeConfig[str, str]()

with pytest.raises(TimedSuspendExecution):
with pytest.raises(SuspendExecution):
invoke_handler(
function_name="test_function",
payload="test_input",
Expand All @@ -261,7 +261,7 @@ def test_invoke_handler_new_operation():
started = CheckpointedResult.create_from_operation(started_op)
mock_state.get_checkpoint_result.side_effect = [not_found, started]

config = InvokeConfig[str, str](timeout=Duration.from_minutes(1))
config = InvokeConfig[str, str]()

with pytest.raises(
SuspendExecution, match="Invoke invoke8 started, suspending for completion"
Expand All @@ -288,62 +288,6 @@ def test_invoke_handler_new_operation():
assert operation_update.chained_invoke_options.function_name == "test_function"


def test_invoke_handler_new_operation_with_timeout():
"""Test invoke_handler when starting a new operation with timeout."""
mock_state = Mock(spec=ExecutionState)
mock_state.durable_execution_arn = "test_arn"

not_found = CheckpointedResult.create_not_found()
started_op = Operation(
operation_id="invoke_test",
operation_type=OperationType.CHAINED_INVOKE,
status=OperationStatus.STARTED,
)
started = CheckpointedResult.create_from_operation(started_op)
mock_state.get_checkpoint_result.side_effect = [not_found, started]

config = InvokeConfig[str, str](timeout=Duration.from_seconds(30))

with pytest.raises(TimedSuspendExecution):
invoke_handler(
function_name="test_function",
payload="test_input",
state=mock_state,
operation_identifier=OperationIdentifier(
"invoke9", OperationSubType.CHAINED_INVOKE, None, "test_invoke"
),
config=config,
)


def test_invoke_handler_new_operation_no_timeout():
"""Test invoke_handler when starting a new operation without timeout."""
mock_state = Mock(spec=ExecutionState)
mock_state.durable_execution_arn = "test_arn"

not_found = CheckpointedResult.create_not_found()
started_op = Operation(
operation_id="invoke_test",
operation_type=OperationType.CHAINED_INVOKE,
status=OperationStatus.STARTED,
)
started = CheckpointedResult.create_from_operation(started_op)
mock_state.get_checkpoint_result.side_effect = [not_found, started]

config = InvokeConfig[str, str](timeout=Duration.from_seconds(0))

with pytest.raises(SuspendExecution):
invoke_handler(
function_name="test_function",
payload="test_input",
state=mock_state,
operation_identifier=OperationIdentifier(
"invoke10", OperationSubType.CHAINED_INVOKE, None, "test_invoke"
),
config=config,
)


def test_invoke_handler_no_config():
"""Test invoke_handler when no config is provided."""
mock_state = Mock(spec=ExecutionState)
Expand Down Expand Up @@ -1067,8 +1011,8 @@ def test_invoke_immediate_response_already_completed():
assert mock_state.get_checkpoint_result.call_count == 1


def test_invoke_immediate_response_with_timeout_immediate_success():
"""Test immediate success with timeout configuration."""
def test_invoke_immediate_response_immediate_success():
"""Test immediate success response."""
mock_state = Mock(spec=ExecutionState)
mock_state.durable_execution_arn = "test_arn"

Expand All @@ -1079,13 +1023,13 @@ def test_invoke_immediate_response_with_timeout_immediate_success():
operation_type=OperationType.CHAINED_INVOKE,
status=OperationStatus.SUCCEEDED,
chained_invoke_details=ChainedInvokeDetails(
result=json.dumps("timeout_result")
result=json.dumps("immediate_result")
),
)
succeeded = CheckpointedResult.create_from_operation(succeeded_op)
mock_state.get_checkpoint_result.side_effect = [not_found, succeeded]

config = InvokeConfig[str, str](timeout=Duration.from_seconds(30))
config = InvokeConfig[str, str]()

result = invoke_handler(
function_name="test_function",
Expand All @@ -1098,15 +1042,12 @@ def test_invoke_immediate_response_with_timeout_immediate_success():
)

# Verify result was returned without suspend
assert result == "timeout_result"
assert result == "immediate_result"
assert mock_state.get_checkpoint_result.call_count == 2


def test_invoke_immediate_response_with_timeout_no_immediate_response():
"""Test no immediate response with timeout configuration.

When no immediate response, operation should suspend with timeout.
"""
def test_invoke_immediate_response_no_immediate_response():
"""Test no immediate response — operation suspends indefinitely."""
mock_state = Mock(spec=ExecutionState)
mock_state.durable_execution_arn = "test_arn"

Expand All @@ -1120,10 +1061,9 @@ def test_invoke_immediate_response_with_timeout_no_immediate_response():
started = CheckpointedResult.create_from_operation(started_op)
mock_state.get_checkpoint_result.side_effect = [not_found, started]

config = InvokeConfig[str, str](timeout=Duration.from_seconds(30))
config = InvokeConfig[str, str]()

# Verify operation suspends with timeout
with pytest.raises(TimedSuspendExecution):
with pytest.raises(SuspendExecution):
invoke_handler(
function_name="test_function",
payload="test_input",
Expand Down