diff --git a/README.md b/README.md index c24a8c0..a927116 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ **A Python HTTP client framework with sync and async clients for building resilient service clients.** -`httpware` is a thin opinionated wrapper around `httpx2`. It re-exports `httpx2.Request`/`httpx2.Response`, adds a middleware chain composed at client construction, supports opt-in typed response decoding (pydantic and msgspec are both extras), and raises a status-keyed exception tree automatically on 4xx/5xx. It also ships a small resilience suite — `AsyncRetry`/`Retry` middleware with a Finagle-style `RetryBudget`, plus an `AsyncBulkhead`/`Bulkhead` concurrency limiter — under `httpware.middleware.resilience`. +`httpware` is a thin opinionated wrapper around `httpx2`. It re-exports `httpx2.Request`/`httpx2.Response`, adds a middleware chain composed at client construction, supports opt-in typed response decoding (pydantic and msgspec are both extras), and raises a status-keyed exception tree automatically on 4xx/5xx. It also ships a resilience suite under `httpware.middleware.resilience` — `AsyncRetry`/`Retry` with a Finagle-style `RetryBudget`, `AsyncBulkhead`/`Bulkhead` concurrency limiter, `AsyncCircuitBreaker`/`CircuitBreaker` consecutive-failure breaker, and `AsyncTimeout` for overall-operation wall-clock bounds. > **Status:** Pre-1.0. Public API is subject to change between minor releases until v1.0. @@ -116,16 +116,18 @@ All 4xx/5xx responses raise typed exceptions automatically: `NotFoundError`, `Se ## Observability -`AsyncRetry`/`Retry` and `AsyncBulkhead`/`Bulkhead` emit operational events via two channels — stdlib `logging` records (always on) and OpenTelemetry span events (when `opentelemetry-api` is installed). Event names and payloads are identical across sync and async; dashboards built against one class apply unchanged to the other. +All resilience middleware emit operational events via two channels — stdlib `logging` records (always on) and OpenTelemetry span events (when `opentelemetry-api` is installed). Event names and payloads are identical across sync and async; dashboards built against one class apply unchanged to the other. -Logger names (`httpware.retry`, `httpware.bulkhead`) and event names (`retry.giving_up`, `retry.budget_refused`, `retry.streaming_refused`, `bulkhead.rejected`) are the stable public contract. +Logger names and event names are the stable public contract: `httpware.retry` (`retry.giving_up`, `retry.budget_refused`, `retry.streaming_refused`), `httpware.bulkhead` (`bulkhead.rejected`), `httpware.circuit_breaker` (`circuit.opened`, `circuit.rejected`, `circuit.half_open`, `circuit.closed`), and `httpware.timeout` (`timeout.exceeded`). ```python import logging -# Enable visibility into retry / bulkhead operational events +# Enable visibility into resilience operational events logging.getLogger("httpware.retry").setLevel(logging.WARNING) logging.getLogger("httpware.bulkhead").setLevel(logging.WARNING) +logging.getLogger("httpware.circuit_breaker").setLevel(logging.WARNING) +logging.getLogger("httpware.timeout").setLevel(logging.WARNING) ``` For OTel attribute enrichment on the active span — install the extra: diff --git a/docs/errors.md b/docs/errors.md index 69927b2..0b9a594 100644 --- a/docs/errors.md +++ b/docs/errors.md @@ -2,7 +2,7 @@ `httpware` raises typed exceptions automatically — everything inherits `ClientError`, and HTTP responses with 4xx/5xx status raise status-keyed `StatusError` subclasses without you having to call `response.raise_for_status()`. -For the resilience-specific errors (`RetryBudgetExhaustedError`, `BulkheadFullError`) see the [Resilience reference](resilience.md). +For the resilience-specific errors (`RetryBudgetExhaustedError`, `BulkheadFullError`, `CircuitOpenError`) see the [Resilience reference](resilience.md). The status-keyed exception tree is shared between `Client` and `AsyncClient`. Catching `NotFoundError` in sync code uses the same import as catching it in async code (`from httpware import NotFoundError`). @@ -27,6 +27,7 @@ ClientError (catch-all for anything httpware raises) │ └── ServiceUnavailableError (503) ├── RetryBudgetExhaustedError (a retry was needed but the budget refused) ├── BulkheadFullError (acquire_timeout elapsed before a slot opened) +├── CircuitOpenError (circuit is OPEN or HALF_OPEN probe slot taken; request not forwarded) ├── DecodeError (response_model= decoder failed; HTTP call itself succeeded) └── MissingDecoderError (no registered decoder claims response_model=; fires before the HTTP call) ``` @@ -119,6 +120,9 @@ exc.response.request.method # the HTTP method - `max_concurrent: int` — the configured cap - `acquire_timeout: float | None` — the configured timeout +`CircuitOpenError` carries: +- `retry_after: float | None` — seconds until the circuit will next admit a probe; `None` when a concurrent probe is already in flight (HALF_OPEN slot taken). + Use these for caller-side logging / alerting: ```python diff --git a/docs/resilience.md b/docs/resilience.md index 0f77b88..94eca70 100644 --- a/docs/resilience.md +++ b/docs/resilience.md @@ -6,7 +6,7 @@ - **`RetryBudget`** — Finagle-style token bucket; safe to share across sync `Client` and `AsyncClient` in the same process. (Finagle-style bounds the global retry rate to prevent retry storms when downstreams degrade.) - **`Bulkhead` / `AsyncBulkhead`** — concurrency limiter with bounded acquire-wait (`threading.Semaphore` and `asyncio.Semaphore` respectively) -The canonical composition is `middleware=[AsyncBulkhead(...), AsyncRetry()]` — `AsyncBulkhead` outside `AsyncRetry` so one slot covers all retry attempts of a single call. Reach for the [Middleware guide](middleware.md) when you want to write your own resilience policy. +A key ordering constraint: `AsyncBulkhead` must sit outside `AsyncRetry` (before it in `middleware=`) so one slot covers all retry attempts of a single call. For the full recommended ordering across all four primitives, see [Composition](#composition). Reach for the [Middleware guide](middleware.md) when you want to write your own resilience policy. ## `AsyncRetry` @@ -144,19 +144,148 @@ async with ( When `acquire_timeout` elapses without a slot opening, `AsyncBulkhead` raises `BulkheadFullError` (carries the configured `max_concurrent` and `acquire_timeout` for caller logging). See the [Errors reference](errors.md). The `httpware.bulkhead` `bulkhead.rejected` observability event fires at the same site — see [Observability](index.md#observability). +## `AsyncCircuitBreaker` / `CircuitBreaker` + +```python +from httpware.middleware.resilience import AsyncCircuitBreaker # async +from httpware.middleware.resilience import CircuitBreaker # sync +``` + +Classic consecutive-failure circuit breaker. Counts failures and prevents requests from reaching a downstream that is known to be broken. + +### States + +- **CLOSED** — normal operation. Each counted failure increments the consecutive-failure counter. Once `failure_threshold` consecutive counted failures accumulate, the circuit opens. +- **OPEN** — fast-fail. All requests are rejected immediately with `CircuitOpenError` (carrying `retry_after` seconds until the next probe window). After `reset_timeout` seconds the circuit moves to HALF_OPEN. +- **HALF_OPEN** — exactly one probe is admitted. If `success_threshold` consecutive probe successes are observed, the circuit closes. A single probe failure re-opens the circuit. + +### Constructor + +| Parameter | Default | Effect | +|---|---|---| +| `failure_threshold` | `5` | Consecutive counted failures required to open. `<1` raises `ValueError`. | +| `reset_timeout` | `30.0` (s) | Seconds to stay OPEN before admitting a probe. `<0` raises `ValueError`. | +| `success_threshold` | `1` | Consecutive probe successes required to close. `<1` raises `ValueError`. | +| `failure_status_codes` | `None` | Which status codes count as failures. `None` → all 5xx (`500`–`599`). | + +### Failure classification + +A **counted failure** is a `NetworkError`, an httpware `TimeoutError`, or a `StatusError` whose status code is in `failure_status_codes`. All other exceptions propagate without affecting circuit state. + +**4xx responses — including 429 — count as successes.** A 429 means the service is healthy but throttling; tripping the circuit on it would amplify an incident by adding circuit-open rejections on top of the throttle. + +### `CircuitOpenError` + +Raised when the circuit is OPEN (with a positive `retry_after: float`) or when HALF_OPEN with a probe already in flight (`retry_after=None`). Inherits `httpware.ClientError`. See the [Errors reference](errors.md). + +### Observability + +Emitted on logger `httpware.circuit_breaker`: + +| Event | When | +|---|---| +| `circuit.opened` | Failure threshold reached; circuit transitions CLOSED → OPEN | +| `circuit.rejected` | Request fast-failed (OPEN or HALF_OPEN probe slot taken) | +| `circuit.half_open` | Reset timeout elapsed; circuit transitions OPEN → HALF_OPEN | +| `circuit.closed` | Success threshold reached; circuit transitions HALF_OPEN → CLOSED | + +### Sharing + +Pass the same instance to multiple clients to enforce one shared circuit across them. A `CircuitBreaker` (sync) cannot be shared with an `AsyncCircuitBreaker` — they use different concurrency primitives. + +### Async example + +```python +from httpware import AsyncClient +from httpware.middleware.resilience import AsyncCircuitBreaker + + +breaker = AsyncCircuitBreaker(failure_threshold=3, reset_timeout=60.0) + +async with AsyncClient( + base_url="https://api.example.com", + middleware=[breaker], +) as client: + response = await client.get("/users/1") +``` + +### Sync example + +```python +from httpware import Client +from httpware.middleware.resilience import CircuitBreaker + + +breaker = CircuitBreaker(failure_threshold=3, reset_timeout=60.0) + +with Client( + base_url="https://api.example.com", + middleware=[breaker], +) as client: + client.get("/users/1") +``` + +## `AsyncTimeout` + +```python +from httpware.middleware.resilience import AsyncTimeout +``` + +Bounds total wall-clock time across the entire inner pipeline. Place it outermost to enforce "this whole operation must finish within `timeout` seconds, even across retries and backoff sleeps." On expiry it raises `httpware.TimeoutError`. + +| Parameter | Default | Effect | +|---|---|---| +| `timeout` | **REQUIRED** | Overall deadline in seconds. Must be `> 0`; `≤0` raises `ValueError`. | + +**This is not a per-call timeout.** httpx2's connect/read/write/pool timeouts are the right tool for bounding a single outbound call; `AsyncTimeout` doesn't duplicate them. What httpx2 cannot bound is the total wall-clock across a whole retry sequence — `AsyncTimeout` fills that gap. + +**No sync `Timeout` exists.** Sync Python has no cancellation primitive that can interrupt a blocking httpx2 call mid-flight. For sync per-call bounds, configure `httpx2.Timeout` on the wrapped client or pass `timeout=` per request. + +Observability event: `timeout.exceeded` on logger `httpware.timeout`. + +```python +from httpware import AsyncClient +from httpware.middleware.resilience import AsyncCircuitBreaker, AsyncRetry, AsyncTimeout + + +async with AsyncClient( + base_url="https://api.example.com", + middleware=[ + AsyncTimeout(timeout=10.0), # overall deadline across the whole chain + AsyncRetry(max_attempts=3), + ], +) as client: + response = await client.get("/users/1") +``` + ## Composition -The canonical ordering is `middleware=[AsyncBulkhead, AsyncRetry]` — `AsyncBulkhead` outermost so one slot covers all retry attempts of a single call: +The recommended ordering (not enforced, but each position has a reason): + +``` +AsyncTimeout → AsyncCircuitBreaker → AsyncBulkhead → AsyncRetry → terminal +``` + +- `AsyncTimeout` outermost so the overall deadline covers the entire sequence including retries and backoff. +- `AsyncCircuitBreaker` outside `AsyncRetry` so an open circuit short-circuits the whole retry loop without attempting any calls. This also means the breaker counts one outcome per fully-exhausted retry sequence rather than one per individual attempt. Placing it outside `AsyncBulkhead` too means a request the open circuit rejects never consumes a concurrency slot. +- `AsyncBulkhead` outside `AsyncRetry` so one slot covers all retry attempts of a single call. Flip those two (`[AsyncRetry, AsyncBulkhead]`) and each retry grabs a fresh slot — defeating the bulkhead under load. ```python from httpware import AsyncClient -from httpware.middleware.resilience import AsyncBulkhead, AsyncRetry +from httpware.middleware.resilience import ( + AsyncBulkhead, + AsyncCircuitBreaker, + AsyncRetry, + AsyncTimeout, +) async def main() -> None: async with AsyncClient( base_url="https://api.example.com", middleware=[ + AsyncTimeout(timeout=30.0), + AsyncCircuitBreaker(), AsyncBulkhead(max_concurrent=10), AsyncRetry(), ], @@ -164,8 +293,6 @@ async def main() -> None: await client.get("/users/1") ``` -Flipping the order (`[AsyncRetry, AsyncBulkhead]`) means each retry attempt grabs a fresh slot — defeating the bulkhead under load. Don't do that. - Cross-cutting middleware that emit per-call state (e.g., the Request-ID middleware in the [Middleware guide](middleware.md)) should sit outside `AsyncRetry` for the same reason — so all attempts of one call share one ID rather than getting a fresh ID per attempt. ## Sync Retry and Bulkhead @@ -227,6 +354,6 @@ with Client( ## See also - **[Middleware guide](middleware.md)** — write your own resilience middleware against the same protocol `AsyncRetry` and `AsyncBulkhead` use. -- **[Errors reference](errors.md)** — `RetryBudgetExhaustedError`, `BulkheadFullError`, and the broader exception tree. -- **[Observability](index.md#observability)** — the four operational events these middleware emit. +- **[Errors reference](errors.md)** — `RetryBudgetExhaustedError`, `BulkheadFullError`, `CircuitOpenError`, and the broader exception tree. +- **[Observability](index.md#observability)** — the operational events these middleware emit. - **`planning/engineering.md` §3** — the formal Middleware/Seam-A contract. diff --git a/planning/plans/2026-06-13-circuit-breaker-and-timeout.md b/planning/plans/2026-06-13-circuit-breaker-and-timeout.md new file mode 100644 index 0000000..2ce7b06 --- /dev/null +++ b/planning/plans/2026-06-13-circuit-breaker-and-timeout.md @@ -0,0 +1,1550 @@ +# CircuitBreaker + AsyncTimeout Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add a classic consecutive-failure `CircuitBreaker`/`AsyncCircuitBreaker` and an async-only `AsyncTimeout` (overall pipeline deadline) to httpware's resilience suite, shipping as `0.10.0`. + +**Architecture:** Two new middleware modules under `src/httpware/middleware/resilience/`, plus one new error (`CircuitOpenError`). The breaker's transition logic lives in a lock-free private `_CircuitBreakerState` shared by both wrappers; `AsyncCircuitBreaker` relies on asyncio atomicity + a single-event-loop guard (carried from `AsyncBulkhead`), `CircuitBreaker` serializes transitions with a `threading.Lock`. `AsyncTimeout` wraps `next` in `asyncio.timeout` and uses `cm.expired()` to re-wrap only its own deadline. All observability flows through the existing `_emit_event` helper. Pure stdlib; no new optional extra. + +**Tech Stack:** Python 3.11+, `httpx2`, `asyncio.timeout`, `time.monotonic`, `threading.Lock`, `enum`. Tests: `pytest` (+ `pytest-asyncio` auto mode), `httpx2.MockTransport`, `hypothesis`. Lint: `ruff` + `ty`. Coverage is enforced at 100% (`--cov-fail-under=100`). + +**Spec:** `planning/specs/2026-06-13-circuit-breaker-and-timeout-design.md` + +**Branch:** `feat/circuit-breaker-timeout` (already created off `main`; the spec is committed there). + +--- + +## File Structure + +| File | Responsibility | Tasks | +|------|----------------|-------| +| `src/httpware/errors.py` | + `CircuitOpenError` (`ClientError`, `retry_after` field, `__reduce__`) | 1 | +| `src/httpware/middleware/resilience/timeout.py` | NEW — `AsyncTimeout` | 2 | +| `src/httpware/middleware/resilience/circuit_breaker.py` | NEW — `_CircuitBreakerState`, `AsyncCircuitBreaker`, `CircuitBreaker` | 3, 4 | +| `src/httpware/middleware/resilience/__init__.py` | re-export new names | 2, 3, 4 | +| `src/httpware/__init__.py` | re-export new names + `__all__` | 1, 2, 3, 4 | +| `tests/test_errors.py` | `CircuitOpenError` fields/summary/pickle | 1 | +| `tests/test_timeout.py` | NEW — `AsyncTimeout` | 2 | +| `tests/test_circuit_breaker.py` | NEW — async breaker, all branches + cross-loop | 3 | +| `tests/test_circuit_breaker_sync.py` | NEW — sync breaker mirror | 4 | +| `tests/test_circuit_breaker_props.py` | NEW — hypothesis invariant | 5 | +| `docs/resilience.md`, `README.md`, `planning/releases/0.10.0.md` | docs + release | 6 | + +**Note on `test_observability.py`:** it tests the `_emit_event` helper only — there is no central event-name registry to extend. Event-name assertions live in the feature test files via `caplog` (Tasks 2–4). Do **not** modify `test_observability.py`. + +**Export ordering invariant:** the project has a test asserting `__init__.py` imports and `__all__` stay symmetric. Every task that adds a public name must add it to **both** the import block and `__all__`, alphabetically, in the same commit. + +--- + +## Task 1: `CircuitOpenError` + +**Files:** +- Modify: `src/httpware/errors.py` (append after `BulkheadFullError`, ~line 214) +- Modify: `src/httpware/__init__.py` (imports block + `__all__`) +- Test: `tests/test_errors.py` (append) + +- [ ] **Step 1: Write the failing tests** + +Append to `tests/test_errors.py` (the file already imports `pickle`, `ClientError`, and defines module-level int constants; add the import of `CircuitOpenError` to the existing `from httpware.errors import (...)` block, alphabetically): + +```python +def test_circuit_open_error_is_client_error() -> None: + exc = CircuitOpenError(retry_after=2.5) + assert isinstance(exc, ClientError) + assert exc.retry_after == 2.5 + + +def test_circuit_open_error_accepts_none_retry_after() -> None: + exc = CircuitOpenError(retry_after=None) + assert exc.retry_after is None + + +def test_circuit_open_error_summary_with_retry_after() -> None: + exc = CircuitOpenError(retry_after=2.5) + assert str(exc) == "circuit open (retry_after=2.500s)" + + +def test_circuit_open_error_summary_with_none_retry_after() -> None: + exc = CircuitOpenError(retry_after=None) + assert str(exc) == "circuit open (a probe request is already in flight)" + + +def test_circuit_open_error_pickleable_with_float() -> None: + exc = CircuitOpenError(retry_after=2.5) + restored = pickle.loads(pickle.dumps(exc)) # noqa: S301 + assert isinstance(restored, CircuitOpenError) + assert restored.retry_after == 2.5 + + +def test_circuit_open_error_pickleable_with_none() -> None: + exc = CircuitOpenError(retry_after=None) + restored = pickle.loads(pickle.dumps(exc)) # noqa: S301 + assert isinstance(restored, CircuitOpenError) + assert restored.retry_after is None +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `just test tests/test_errors.py -k circuit_open` +Expected: FAIL — `ImportError: cannot import name 'CircuitOpenError'`. + +- [ ] **Step 3: Implement `CircuitOpenError`** + +Append to `src/httpware/errors.py` (after `BulkheadFullError`, before `_reconstruct_decode_error`). `Any` is already imported at the top of the file. + +```python +def _reconstruct_circuit_open( + cls: "type[CircuitOpenError]", + retry_after: float | None, +) -> "CircuitOpenError": + return cls(retry_after=retry_after) + + +class CircuitOpenError(ClientError): + """Raised when a CircuitBreaker refuses a request because the circuit is not closed. + + Fires when the circuit is OPEN, or when it is HALF_OPEN and the single probe + slot is already taken. The request is never forwarded to ``next``. ``retry_after`` + carries the seconds until the circuit will next admit a probe, when known + (``None`` when a concurrent probe is already in flight). + """ + + retry_after: float | None + + def __init__(self, *, retry_after: float | None) -> None: + self.retry_after = retry_after + if retry_after is None: + super().__init__("circuit open (a probe request is already in flight)") + else: + super().__init__(f"circuit open (retry_after={retry_after:.3f}s)") + + def __reduce__(self) -> tuple[Any, ...]: + return (_reconstruct_circuit_open, (type(self), self.retry_after)) +``` + +- [ ] **Step 4: Export from `httpware/__init__.py`** + +In `src/httpware/__init__.py`, add `CircuitOpenError` to the `from httpware.errors import (...)` block and to `__all__`. Alphabetically, `"Circuit…"` sorts **before** `"Client…"` (`i` < `l` at the 3rd char) and after `"Bulkhead…"`: + +Import block (errors) — insert after `BulkheadFullError,`, before `ClientError,`: +```python + CircuitOpenError, +``` +`__all__` — insert after `"BulkheadFullError",`, before `"Client",`: +```python + "CircuitOpenError", +``` + +> If `ruff`'s RUF022/import sorting is enabled, `just lint` will finalize ordering automatically; the requirement is only that the name appears in **both** lists (the symmetric-`__all__` test). + +- [ ] **Step 5: Run tests + import check to verify pass** + +Run: `just test tests/test_errors.py -k circuit_open` +Expected: PASS (6 tests). +Run: `uv run python -c "import httpware; assert httpware.CircuitOpenError"` +Expected: no output, exit 0. + +- [ ] **Step 6: Lint** + +Run: `just lint` +Expected: clean (no errors). + +- [ ] **Step 7: Commit** + +```bash +git add src/httpware/errors.py src/httpware/__init__.py tests/test_errors.py +git commit -m "feat(errors): add CircuitOpenError + +Co-Authored-By: Claude Fable 5 " +``` + +--- + +## Task 2: `AsyncTimeout` + +**Files:** +- Create: `src/httpware/middleware/resilience/timeout.py` +- Modify: `src/httpware/middleware/resilience/__init__.py` +- Modify: `src/httpware/__init__.py` +- Test: `tests/test_timeout.py` + +- [ ] **Step 1: Write the failing tests** + +Create `tests/test_timeout.py`. Tests call the middleware directly with an injected `next` — fully deterministic, no client/transport needed. Expiry uses a tiny timeout against a long sleep (100×+ margin, never flaky). + +```python +"""Tests for the AsyncTimeout middleware. + +Calls the middleware directly with an injected `next` callable. Expiry tests use a +tiny timeout against a long sleep (large margin → not wall-clock flaky); the +inner-timeout test raises immediately so no real time passes. +""" + +import asyncio +import builtins +import logging + +import httpx2 +import pytest + +from httpware.errors import TimeoutError as HttpwareTimeoutError # noqa: A004 +from httpware.middleware.resilience.timeout import AsyncTimeout + + +def _request() -> httpx2.Request: + return httpx2.Request("GET", "https://example.test/x") + + +async def test_passes_through_response_when_under_budget() -> None: + async def _next(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(200, request=request) + + middleware = AsyncTimeout(timeout=10.0) + response = await middleware(_request(), _next) + assert response.status_code == 200 + + +async def test_expiry_raises_httpware_timeout_chained_from_builtin( + caplog: pytest.LogCaptureFixture, +) -> None: + async def _next(request: httpx2.Request) -> httpx2.Response: + await asyncio.sleep(5.0) + return httpx2.Response(200, request=request) # pragma: no cover — deadline fires first + + middleware = AsyncTimeout(timeout=0.01) + with ( + caplog.at_level(logging.WARNING, logger="httpware.timeout"), + pytest.raises(HttpwareTimeoutError) as info, + ): + await middleware(_request(), _next) + + assert "overall timeout of 0.01s exceeded" in str(info.value) + assert isinstance(info.value.__cause__, builtins.TimeoutError) + + records = [r for r in caplog.records if r.name == "httpware.timeout"] + assert len(records) == 1 + assert records[0].levelno == logging.WARNING + assert records[0].timeout == 0.01 # ty: ignore[unresolved-attribute] + assert records[0].method == "GET" # ty: ignore[unresolved-attribute] + assert "example.test/x" in records[0].url # ty: ignore[unresolved-attribute] + + +async def test_inner_timeout_propagates_unchanged() -> None: + """A TimeoutError from next (not our deadline) is re-raised untouched.""" + + async def _next(request: httpx2.Request) -> httpx2.Response: + raise HttpwareTimeoutError("inner read timeout") + + middleware = AsyncTimeout(timeout=10.0) + with pytest.raises(HttpwareTimeoutError) as info: + await middleware(_request(), _next) + + assert "inner read timeout" in str(info.value) + assert "overall timeout" not in str(info.value) + + +def test_zero_timeout_rejected() -> None: + with pytest.raises(ValueError, match="timeout must be > 0"): + AsyncTimeout(timeout=0) + + +def test_negative_timeout_rejected() -> None: + with pytest.raises(ValueError, match="timeout must be > 0"): + AsyncTimeout(timeout=-1.0) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `just test tests/test_timeout.py` +Expected: FAIL — `ModuleNotFoundError: No module named 'httpware.middleware.resilience.timeout'`. + +- [ ] **Step 3: Implement `AsyncTimeout`** + +Create `src/httpware/middleware/resilience/timeout.py`: + +```python +"""AsyncTimeout middleware — overall wall-clock deadline across the inner pipeline. + +See planning/specs/2026-06-13-circuit-breaker-and-timeout-design.md for the contract. + +This is NOT a per-call timeout — httpx2's connect/read/write/pool timeouts are the +right tool for bounding a single outbound call, and AsyncTimeout does not duplicate +them. What httpx2 cannot bound is the total wall-clock across the whole middleware +pipeline (most importantly across an AsyncRetry loop, whose attempts and backoff +sleeps it knows nothing about). Place AsyncTimeout outermost to enforce +"this whole operation must finish within `timeout` seconds, even across retries." + +Async-only by design: a sync total-deadline cannot interrupt a blocking httpx2 call +mid-flight (sync Python has no cancellation), and httpx2 already covers sync per-call +timeouts. Sync callers configure httpx2's timeouts directly; there is no sync Timeout. +""" + +import asyncio +import logging + +import httpx2 + +from httpware._internal.observability import _emit_event +from httpware.errors import TimeoutError as HttpwareTimeoutError # noqa: A004 +from httpware.middleware import AsyncNext + + +_TIMEOUT_INVALID = "timeout must be > 0" + +_LOGGER = logging.getLogger("httpware.timeout") + + +class AsyncTimeout: + """Bounds total wall-clock time spent in the inner pipeline. + + Parameters + ---------- + timeout + Required. Overall deadline in seconds for ``next(request)`` to complete, + including everything it wraps (retries, backoff sleeps, the call itself). + Must be ``> 0``. On expiry the middleware raises ``httpware.TimeoutError``. + + Place outermost in the chain for an overall-operation deadline. For bounding a + single outbound call (connect/read/write/pool), configure ``httpx2`` instead. + """ + + def __init__(self, *, timeout: float) -> None: + if timeout <= 0: + raise ValueError(_TIMEOUT_INVALID) + self._timeout = timeout + + async def __call__(self, request: httpx2.Request, next: AsyncNext) -> httpx2.Response: # noqa: A002 + """Invoke next under an asyncio.timeout; raise httpware.TimeoutError on expiry. + + Only a deadline THIS middleware imposed is re-wrapped: ``cm.expired()`` + distinguishes our own expiry from an inner ``TimeoutError`` (e.g. an httpx2 + per-call timeout surfacing through a retry), which propagates unchanged. + """ + try: + async with asyncio.timeout(self._timeout) as cm: + return await next(request) + except TimeoutError as exc: + if not cm.expired(): + raise # inner TimeoutError, not our deadline — leave it untouched + _emit_event( + _LOGGER, + "timeout.exceeded", + level=logging.WARNING, + message="overall timeout exceeded", + attributes={ + "timeout": self._timeout, + "method": request.method, + "url": str(request.url), + }, + ) + raise HttpwareTimeoutError(f"overall timeout of {self._timeout}s exceeded") from exc +``` + +- [ ] **Step 4: Export `AsyncTimeout`** + +In `src/httpware/middleware/resilience/__init__.py`, update the docstring and add the import + `__all__` entry: + +```python +"""Resilience primitives: Bulkhead, Retry, RetryBudget, AsyncTimeout.""" + +from httpware.middleware.resilience.budget import RetryBudget +from httpware.middleware.resilience.bulkhead import AsyncBulkhead, Bulkhead +from httpware.middleware.resilience.retry import AsyncRetry, Retry +from httpware.middleware.resilience.timeout import AsyncTimeout + + +__all__ = ["AsyncBulkhead", "AsyncRetry", "AsyncTimeout", "Bulkhead", "Retry", "RetryBudget"] +``` + +In `src/httpware/__init__.py`: change the resilience import line and add to `__all__`: + +```python +from httpware.middleware.resilience import AsyncBulkhead, AsyncRetry, AsyncTimeout, Bulkhead, Retry, RetryBudget +``` +`__all__` — insert `"AsyncTimeout",` after `"AsyncRetry",`. + +- [ ] **Step 5: Run tests + import check** + +Run: `just test tests/test_timeout.py` +Expected: PASS (5 tests). +Run: `uv run python -c "import httpware; assert httpware.AsyncTimeout"` +Expected: exit 0. + +- [ ] **Step 6: Lint** + +Run: `just lint` +Expected: clean. + +- [ ] **Step 7: Commit** + +```bash +git add src/httpware/middleware/resilience/timeout.py src/httpware/middleware/resilience/__init__.py src/httpware/__init__.py tests/test_timeout.py +git commit -m "feat(resilience): add AsyncTimeout overall-deadline middleware + +Co-Authored-By: Claude Fable 5 " +``` + +--- + +## Task 3: `_CircuitBreakerState` + `AsyncCircuitBreaker` + +**Files:** +- Create: `src/httpware/middleware/resilience/circuit_breaker.py` +- Modify: `src/httpware/middleware/resilience/__init__.py` +- Modify: `src/httpware/__init__.py` +- Test: `tests/test_circuit_breaker.py` + +**Design note (deliberate deviation from pure async/sync duplication):** `bulkhead.py` and `retry.py` fully duplicate their async and sync classes. The breaker's state machine is far more complex, and duplicating it twice is exactly the parity-bug risk the delta audit surfaced. Instead, the lock-free transition logic lives once in a private `_CircuitBreakerState`; both wrappers are thin. This preserves the three properties the spec requires (shared/sharable instance, async loop-guard, sync `threading.Lock`). + +- [ ] **Step 1: Write the failing tests** + +Create `tests/test_circuit_breaker.py`. A `_Clock` drives `_now` deterministically; a `_FailFor`/sequence handler drives responses; `httpx2.ConnectError` from the handler maps to `NetworkError` at the terminal. + +```python +"""Tests for the AsyncCircuitBreaker middleware. + +Time is driven by an injected _now (a _Clock); the transport is mocked via +httpx2.MockTransport. 5xx responses surface as StatusError at the client terminal; +httpx2.ConnectError surfaces as NetworkError. +""" + +import asyncio +import logging +from collections.abc import Callable +from http import HTTPStatus + +import httpx2 +import pytest + +from httpware import AsyncClient, CircuitOpenError, InternalServerError, NetworkError, NotFoundError +from httpware.middleware.resilience.circuit_breaker import AsyncCircuitBreaker + + +class _Clock: + """Manually-advanced monotonic clock for deterministic reset_timeout tests.""" + + def __init__(self) -> None: + self.t = 0.0 + + def __call__(self) -> float: + return self.t + + def advance(self, seconds: float) -> None: + self.t += seconds + + +class _StatusSequence: + """Mock-transport handler returning a fixed sequence of status codes (default 200).""" + + def __init__(self, statuses: list[int]) -> None: + self._statuses = list(statuses) + self.calls = 0 + + def __call__(self, request: httpx2.Request) -> httpx2.Response: + self.calls += 1 + status = self._statuses.pop(0) if self._statuses else HTTPStatus.OK + return httpx2.Response(status, request=request) + + +def _client( + handler: Callable[[httpx2.Request], httpx2.Response], + *, + breaker: AsyncCircuitBreaker, +) -> AsyncClient: + return AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=httpx2.MockTransport(handler)), + middleware=[breaker], + ) + + +# ───── construction validation ────────────────────────────────────────────── + + +def test_failure_threshold_below_one_rejected() -> None: + with pytest.raises(ValueError, match="failure_threshold must be >= 1"): + AsyncCircuitBreaker(failure_threshold=0) + + +def test_negative_reset_timeout_rejected() -> None: + with pytest.raises(ValueError, match="reset_timeout must be >= 0"): + AsyncCircuitBreaker(reset_timeout=-1.0) + + +def test_success_threshold_below_one_rejected() -> None: + with pytest.raises(ValueError, match="success_threshold must be >= 1"): + AsyncCircuitBreaker(success_threshold=0) + + +# ───── closed-state behavior ──────────────────────────────────────────────── + + +async def test_closed_passes_through() -> None: + handler = _StatusSequence([HTTPStatus.OK]) + breaker = AsyncCircuitBreaker(failure_threshold=3, _now=_Clock()) + async with _client(handler, breaker=breaker) as client: + response = await client.get("https://example.test/x") + assert response.status_code == HTTPStatus.OK + assert handler.calls == 1 + + +async def test_consecutive_failures_open_the_circuit(caplog: pytest.LogCaptureFixture) -> None: + handler = _StatusSequence([500, 500, 500]) + breaker = AsyncCircuitBreaker(failure_threshold=3, _now=_Clock()) + async with _client(handler, breaker=breaker) as client: + for _ in range(3): + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") + # circuit is now OPEN — the 4th call must NOT reach the transport + with pytest.raises(CircuitOpenError) as info: + await client.get("https://example.test/x") + assert handler.calls == 3 # 4th was short-circuited + assert info.value.retry_after is not None + # (the circuit.opened event is asserted in test_open_emits_opened_event_and_rejects) + + +async def test_open_emits_opened_event_and_rejects(caplog: pytest.LogCaptureFixture) -> None: + handler = _StatusSequence([500, 500]) + breaker = AsyncCircuitBreaker(failure_threshold=2, _now=_Clock()) + async with _client(handler, breaker=breaker) as client: + with caplog.at_level(logging.WARNING, logger="httpware.circuit_breaker"): + for _ in range(2): + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") + with pytest.raises(CircuitOpenError): + await client.get("https://example.test/y") + records = [r for r in caplog.records if r.name == "httpware.circuit_breaker"] + opened = [r for r in records if "opened" in r.message] + rejected = [r for r in records if "rejecting" in r.message] + assert len(opened) == 1 + assert opened[0].failure_threshold == 2 # ty: ignore[unresolved-attribute] + assert opened[0].failures == 2 # ty: ignore[unresolved-attribute] + assert len(rejected) == 1 + assert rejected[0].retry_after is not None # ty: ignore[unresolved-attribute] + assert rejected[0].method == "GET" # ty: ignore[unresolved-attribute] + + +async def test_success_resets_failure_streak() -> None: + handler = _StatusSequence([500, 500, 200, 500, 500]) + breaker = AsyncCircuitBreaker(failure_threshold=3, _now=_Clock()) + async with _client(handler, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") + await client.get("https://example.test/x") # 200 resets the streak + for _ in range(2): + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") + # only 2 consecutive failures after the reset — still CLOSED + response = await client.get("https://example.test/x") # 6th -> default 200 + assert response.status_code == HTTPStatus.OK + assert handler.calls == 6 + + +async def test_404_and_429_do_not_count_as_failures() -> None: + handler = _StatusSequence([404, 429, 404, 429, 404]) + breaker = AsyncCircuitBreaker(failure_threshold=2, _now=_Clock()) + async with _client(handler, breaker=breaker) as client: + for _ in range(5): + with pytest.raises((NotFoundError, type(None))): # 404 -> NotFoundError; 429 -> RateLimitedError + await client.get("https://example.test/x") + # never opened — all 5 reached the transport + assert handler.calls == 5 + + +async def test_network_error_counts_as_failure() -> None: + def _raise(request: httpx2.Request) -> httpx2.Response: + msg = "connect failed" + raise httpx2.ConnectError(msg) + + breaker = AsyncCircuitBreaker(failure_threshold=2, _now=_Clock()) + async with _client(_raise, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(NetworkError): + await client.get("https://example.test/x") + with pytest.raises(CircuitOpenError): + await client.get("https://example.test/x") + + +async def test_non_counted_exception_propagates_without_state_change() -> None: + """A ValueError from inner middleware is neither success nor failure; state unchanged.""" + + class _Boom: + async def __call__(self, request: httpx2.Request, next: object) -> httpx2.Response: + msg = "boom" + raise ValueError(msg) + + handler = _StatusSequence([200]) + breaker = AsyncCircuitBreaker(failure_threshold=1, _now=_Clock()) + client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=httpx2.MockTransport(handler)), + middleware=[breaker, _Boom()], + ) + async with client: + for _ in range(3): + with pytest.raises(ValueError, match="boom"): + await client.get("https://example.test/x") + # failure_threshold=1 but ValueError never counted -> still CLOSED, transport reachable + # (remove _Boom by using a fresh client would change state; instead assert no CircuitOpenError raised above) + + +# ───── half-open / reset_timeout ──────────────────────────────────────────── + + +async def test_reset_timeout_admits_probe_then_closes(caplog: pytest.LogCaptureFixture) -> None: + clock = _Clock() + handler = _StatusSequence([500, 500, 200]) # 2 fails -> open; probe (3rd) -> 200 -> close + breaker = AsyncCircuitBreaker(failure_threshold=2, reset_timeout=30.0, success_threshold=1, _now=clock) + async with _client(handler, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") + # OPEN; before reset_timeout -> rejected, transport untouched + with pytest.raises(CircuitOpenError): + await client.get("https://example.test/x") + assert handler.calls == 2 + clock.advance(30.0) + with caplog.at_level(logging.INFO, logger="httpware.circuit_breaker"): + response = await client.get("https://example.test/x") # probe admitted -> 200 -> CLOSED + assert response.status_code == HTTPStatus.OK + assert handler.calls == 3 + messages = [r.message for r in caplog.records if r.name == "httpware.circuit_breaker"] + assert any("half-open" in m for m in messages) + assert any("closed" in m for m in messages) + + +async def test_probe_failure_reopens_circuit() -> None: + clock = _Clock() + handler = _StatusSequence([500, 500, 500]) # open after 2; probe (3rd) fails -> reopen + breaker = AsyncCircuitBreaker(failure_threshold=2, reset_timeout=10.0, _now=clock) + async with _client(handler, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") + clock.advance(10.0) + with pytest.raises(InternalServerError): # probe runs, fails + await client.get("https://example.test/x") + # reopened with fresh opened_at; immediate retry is rejected + with pytest.raises(CircuitOpenError): + await client.get("https://example.test/x") + assert handler.calls == 3 + + +async def test_success_threshold_requires_multiple_probes() -> None: + clock = _Clock() + handler = _StatusSequence([500, 500, 200, 200]) # open; then 2 successful probes to close + breaker = AsyncCircuitBreaker(failure_threshold=2, reset_timeout=5.0, success_threshold=2, _now=clock) + async with _client(handler, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") + clock.advance(5.0) + await client.get("https://example.test/x") # probe 1 -> 200 (still HALF_OPEN, 1/2) + await client.get("https://example.test/x") # probe 2 -> 200 -> CLOSED + response = await client.get("https://example.test/x") # default 200, CLOSED + assert response.status_code == HTTPStatus.OK + assert handler.calls == 4 + + +async def test_half_open_second_concurrent_request_rejected_with_none_retry_after() -> None: + """While the single probe is in flight, a concurrent request fast-fails (retry_after=None).""" + clock = _Clock() + probe_started = asyncio.Event() + release_probe = asyncio.Event() + + async def _handler_async(request: httpx2.Request) -> httpx2.Response: + probe_started.set() + await release_probe.wait() + return httpx2.Response(HTTPStatus.OK, request=request) + + breaker = AsyncCircuitBreaker(failure_threshold=1, reset_timeout=1.0, _now=clock) + # open the circuit with one 500 + open_handler = _StatusSequence([500]) + async with _client(open_handler, breaker=breaker) as opener: + with pytest.raises(InternalServerError): + await opener.get("https://example.test/x") + clock.advance(1.0) + + client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=httpx2.MockTransport(_handler_async)), + middleware=[breaker], + ) + async with client: + probe_task = asyncio.create_task(client.get("https://example.test/probe")) + await probe_started.wait() # probe is now in flight, HALF_OPEN + with pytest.raises(CircuitOpenError) as info: + await client.get("https://example.test/concurrent") + assert info.value.retry_after is None + release_probe.set() + await probe_task + + +# ───── single-event-loop guard ────────────────────────────────────────────── + + +def test_cross_loop_use_raises_runtimeerror() -> None: + breaker = AsyncCircuitBreaker(_now=_Clock()) + handler = _StatusSequence([200]) + + async def _run_once() -> None: + async with _client(handler, breaker=breaker) as client: + await client.get("https://example.test/x") + + asyncio.run(_run_once()) # binds to loop L1 + with pytest.raises(RuntimeError, match="bound to a single event loop"): + asyncio.run(_run_once()) +``` + +> Note for the implementer: in `test_404_and_429_do_not_count_as_failures` import `RateLimitedError` too and use `pytest.raises((NotFoundError, RateLimitedError))`. Replace the `(NotFoundError, type(None))` placeholder with `(NotFoundError, RateLimitedError)` and add `RateLimitedError` to the `from httpware import ...` line. In `test_non_counted_exception_propagates_without_state_change`, assert that no `CircuitOpenError` was raised across the three `ValueError`s (the `pytest.raises(ValueError)` already guarantees each call raised `ValueError`, not `CircuitOpenError`). + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `just test tests/test_circuit_breaker.py` +Expected: FAIL — `ModuleNotFoundError: No module named 'httpware.middleware.resilience.circuit_breaker'`. + +- [ ] **Step 3: Implement `_CircuitBreakerState` + `AsyncCircuitBreaker`** + +Create `src/httpware/middleware/resilience/circuit_breaker.py`: + +```python +"""CircuitBreaker + AsyncCircuitBreaker — classic consecutive-failure circuit breaker. + +See planning/specs/2026-06-13-circuit-breaker-and-timeout-design.md for the contract. + +A counted failure is a NetworkError, an httpware TimeoutError, or a StatusError whose +status_code is in the effective failure set (default: all 5xx). 4xx — including 429 — +count as successes: 429 means healthy-but-throttling, and tripping on it amplifies +incidents. Any other exception propagates without affecting circuit state. + +State machine (classic / consecutive-failure): + CLOSED — forward; count consecutive counted-failures; open at failure_threshold. + OPEN — fast-fail with CircuitOpenError; after reset_timeout the next request + becomes the half-open probe. + HALF_OPEN — admit exactly one probe at a time; success_threshold consecutive probe + successes close the circuit; one probe failure re-opens it. + +The lock-free _CircuitBreakerState holds the transition logic, shared by both wrappers. +AsyncCircuitBreaker relies on asyncio atomicity (no await inside a transition) plus a +single-event-loop guard; CircuitBreaker serializes transitions with a threading.Lock. +Both are sharable across clients (one shared circuit); a sync instance cannot be shared +with an async one. +""" + +import asyncio +import enum +import logging +import threading +import time +import typing +from collections.abc import Callable + +import httpx2 + +from httpware._internal.observability import _emit_event +from httpware.errors import CircuitOpenError, NetworkError, StatusError, TimeoutError # noqa: A004 +from httpware.middleware import AsyncNext, Next + + +_FAILURE_THRESHOLD_INVALID = "failure_threshold must be >= 1" +_RESET_TIMEOUT_INVALID = "reset_timeout must be >= 0" +_SUCCESS_THRESHOLD_INVALID = "success_threshold must be >= 1" +_CROSS_LOOP_MSG = ( + "AsyncCircuitBreaker is bound to a single event loop. First seen on {first!r}; " + "current request is on {current!r}. Use one AsyncCircuitBreaker per loop; " + "cross-thread sharing requires the sync CircuitBreaker primitive." +) + +_DEFAULT_FAILURE_STATUS_CODES = frozenset(range(500, 600)) + +_ROLE_CLOSED = "closed" +_ROLE_PROBE = "probe" + +_LOGGER = logging.getLogger("httpware.circuit_breaker") + + +class _CircuitState(enum.Enum): + CLOSED = "closed" + OPEN = "open" + HALF_OPEN = "half_open" + + +class _CircuitBreakerState: + """Lock-free circuit-breaker state machine shared by the sync + async wrappers. + + Every method is synchronous and performs no I/O beyond logging. The async wrapper + calls these directly (atomic under a single event loop because no await occurs + inside a transition); the sync wrapper wraps each call in a threading.Lock. + """ + + def __init__( + self, + *, + failure_threshold: int, + reset_timeout: float, + success_threshold: int, + failure_status_codes: frozenset[int] | None, + now: Callable[[], float], + ) -> None: + if failure_threshold < 1: + raise ValueError(_FAILURE_THRESHOLD_INVALID) + if reset_timeout < 0: + raise ValueError(_RESET_TIMEOUT_INVALID) + if success_threshold < 1: + raise ValueError(_SUCCESS_THRESHOLD_INVALID) + self._failure_threshold = failure_threshold + self._reset_timeout = reset_timeout + self._success_threshold = success_threshold + self._failure_status_codes = ( + failure_status_codes if failure_status_codes is not None else _DEFAULT_FAILURE_STATUS_CODES + ) + self._now = now + self._state = _CircuitState.CLOSED + self._consecutive_failures = 0 + self._consecutive_successes = 0 + self._opened_at = 0.0 + self._probe_in_flight = False + + def is_failure_status(self, status_code: int) -> bool: + return status_code in self._failure_status_codes + + def admit(self, request: httpx2.Request) -> str: + """Decide the request's role, or raise CircuitOpenError. No await inside.""" + if self._state is _CircuitState.CLOSED: + return _ROLE_CLOSED + if self._state is _CircuitState.OPEN: + elapsed = self._now() - self._opened_at + if elapsed >= self._reset_timeout: + self._state = _CircuitState.HALF_OPEN + self._probe_in_flight = True + self._emit(request, "circuit.half_open", logging.INFO, "circuit half-open — admitting probe", {}) + return _ROLE_PROBE + retry_after = max(0.0, self._reset_timeout - elapsed) + self._emit( + request, + "circuit.rejected", + logging.WARNING, + "circuit open — rejecting request", + {"retry_after": retry_after}, + ) + raise CircuitOpenError(retry_after=retry_after) + # HALF_OPEN + if self._probe_in_flight: + self._emit( + request, + "circuit.rejected", + logging.WARNING, + "circuit half-open — rejecting request (probe in flight)", + {"retry_after": None}, + ) + raise CircuitOpenError(retry_after=None) + self._probe_in_flight = True + return _ROLE_PROBE + + def on_success(self, role: str, request: httpx2.Request) -> None: + if role == _ROLE_PROBE: + self._probe_in_flight = False + if self._state is _CircuitState.CLOSED: + self._consecutive_failures = 0 + elif self._state is _CircuitState.HALF_OPEN: + self._consecutive_successes += 1 + if self._consecutive_successes >= self._success_threshold: + self._state = _CircuitState.CLOSED + self._consecutive_failures = 0 + self._consecutive_successes = 0 + self._emit(request, "circuit.closed", logging.INFO, "circuit closed — service recovered", {}) + + def on_failure(self, role: str, request: httpx2.Request) -> None: + if role == _ROLE_PROBE: + self._probe_in_flight = False + if self._state is _CircuitState.CLOSED: + self._consecutive_failures += 1 + if self._consecutive_failures >= self._failure_threshold: + self._open(request, failures=self._consecutive_failures) + elif self._state is _CircuitState.HALF_OPEN: + self._consecutive_successes = 0 + self._open(request, failures=1) + + def release_probe(self, role: str) -> None: + """Release the probe slot without recording success or failure (non-counted exc).""" + if role == _ROLE_PROBE: + self._probe_in_flight = False + + def _open(self, request: httpx2.Request, *, failures: int) -> None: + self._state = _CircuitState.OPEN + self._opened_at = self._now() + self._emit( + request, + "circuit.opened", + logging.WARNING, + "circuit opened — failure threshold reached", + {"failure_threshold": self._failure_threshold, "failures": failures}, + ) + + def _emit( + self, + request: httpx2.Request, + event_name: str, + level: int, + message: str, + attributes: dict[str, typing.Any], + ) -> None: + _emit_event( + _LOGGER, + event_name, + level=level, + message=message, + attributes={**attributes, "method": request.method, "url": str(request.url)}, + ) + + +class AsyncCircuitBreaker: + """Async classic circuit breaker middleware. See the module docstring for the contract.""" + + def __init__( + self, + *, + failure_threshold: int = 5, + reset_timeout: float = 30.0, + success_threshold: int = 1, + failure_status_codes: frozenset[int] | None = None, + _now: Callable[[], float] = time.monotonic, + ) -> None: + self._state = _CircuitBreakerState( + failure_threshold=failure_threshold, + reset_timeout=reset_timeout, + success_threshold=success_threshold, + failure_status_codes=failure_status_codes, + now=_now, + ) + self._loop: asyncio.AbstractEventLoop | None = None + self._loop_lock = threading.Lock() + + def _check_loop(self) -> None: + current = asyncio.get_running_loop() + cached = self._loop + if cached is current: + return + if cached is not None: + raise RuntimeError(_CROSS_LOOP_MSG.format(first=cached, current=current)) + with self._loop_lock: + if self._loop is None: + self._loop = current + # pragma below: inner double-check-with-lock race arm; only reachable when + # two threads simultaneously pass the outer check, which single-threaded + # tests can't trigger. + elif self._loop is not current: # pragma: no cover + raise RuntimeError(_CROSS_LOOP_MSG.format(first=self._loop, current=current)) + + async def __call__(self, request: httpx2.Request, next: AsyncNext) -> httpx2.Response: # noqa: A002 + """Admit, forward, then record the outcome. Fast-fail when the circuit is not closed.""" + self._check_loop() + role = self._state.admit(request) + try: + response = await next(request) + except StatusError as exc: + if self._state.is_failure_status(exc.response.status_code): + self._state.on_failure(role, request) + else: + self._state.on_success(role, request) + raise + except (NetworkError, TimeoutError): + self._state.on_failure(role, request) + raise + except BaseException: + self._state.release_probe(role) + raise + self._state.on_success(role, request) + return response +``` + +- [ ] **Step 4: Export `AsyncCircuitBreaker`** + +In `src/httpware/middleware/resilience/__init__.py` add the import and `__all__` entry (only the async name in this task; the sync name lands in Task 4): + +```python +from httpware.middleware.resilience.circuit_breaker import AsyncCircuitBreaker +``` +Update docstring to mention CircuitBreaker, and `__all__` — insert `"AsyncCircuitBreaker",` after `"AsyncBulkhead",`. + +In `src/httpware/__init__.py` change the resilience import to include `AsyncCircuitBreaker` (alphabetical: after `AsyncBulkhead`) and add `"AsyncCircuitBreaker",` to `__all__` after `"AsyncBulkhead",`: + +```python +from httpware.middleware.resilience import ( + AsyncBulkhead, + AsyncCircuitBreaker, + AsyncRetry, + AsyncTimeout, + Bulkhead, + Retry, + RetryBudget, +) +``` + +- [ ] **Step 5: Run tests + import check** + +Run: `just test tests/test_circuit_breaker.py` +Expected: PASS (all async breaker tests). +Run: `uv run python -c "import httpware; assert httpware.AsyncCircuitBreaker"` +Expected: exit 0. + +- [ ] **Step 6: Lint + coverage check for this module** + +Run: `just lint` +Expected: clean. +Run: `just test tests/test_circuit_breaker.py --cov=httpware.middleware.resilience.circuit_breaker --cov-report=term-missing` +Expected: PASS; note any uncovered lines in `circuit_breaker.py`. The only acceptable uncovered line is the `# pragma: no cover` race arm in `_check_loop`. (Full-suite 100% is verified in Task 4 once the sync class lands.) + +- [ ] **Step 7: Commit** + +```bash +git add src/httpware/middleware/resilience/circuit_breaker.py src/httpware/middleware/resilience/__init__.py src/httpware/__init__.py tests/test_circuit_breaker.py +git commit -m "feat(resilience): add AsyncCircuitBreaker (classic consecutive-failure breaker) + +Co-Authored-By: Claude Fable 5 " +``` + +--- + +## Task 4: `CircuitBreaker` (sync) + +**Files:** +- Modify: `src/httpware/middleware/resilience/circuit_breaker.py` (append `CircuitBreaker`) +- Modify: `src/httpware/middleware/resilience/__init__.py` +- Modify: `src/httpware/__init__.py` +- Test: `tests/test_circuit_breaker_sync.py` + +- [ ] **Step 1: Write the failing tests** + +Create `tests/test_circuit_breaker_sync.py` — the sync mirror. Uses `Client` + `httpx2.Client(transport=...)`. These are plain sync tests (no `async def`). + +```python +"""Tests for the sync CircuitBreaker middleware (mirror of AsyncCircuitBreaker).""" + +import logging +import threading +from collections.abc import Callable +from http import HTTPStatus + +import httpx2 +import pytest + +from httpware import CircuitOpenError, Client, InternalServerError, NetworkError, NotFoundError, RateLimitedError +from httpware.middleware.resilience.circuit_breaker import CircuitBreaker + + +class _Clock: + def __init__(self) -> None: + self.t = 0.0 + + def __call__(self) -> float: + return self.t + + def advance(self, seconds: float) -> None: + self.t += seconds + + +class _StatusSequence: + def __init__(self, statuses: list[int]) -> None: + self._statuses = list(statuses) + self.calls = 0 + + def __call__(self, request: httpx2.Request) -> httpx2.Response: + self.calls += 1 + status = self._statuses.pop(0) if self._statuses else HTTPStatus.OK + return httpx2.Response(status, request=request) + + +def _client(handler: Callable[[httpx2.Request], httpx2.Response], *, breaker: CircuitBreaker) -> Client: + return Client( + httpx2_client=httpx2.Client(transport=httpx2.MockTransport(handler)), + middleware=[breaker], + ) + + +def test_failure_threshold_below_one_rejected() -> None: + with pytest.raises(ValueError, match="failure_threshold must be >= 1"): + CircuitBreaker(failure_threshold=0) + + +def test_negative_reset_timeout_rejected() -> None: + with pytest.raises(ValueError, match="reset_timeout must be >= 0"): + CircuitBreaker(reset_timeout=-1.0) + + +def test_success_threshold_below_one_rejected() -> None: + with pytest.raises(ValueError, match="success_threshold must be >= 1"): + CircuitBreaker(success_threshold=0) + + +def test_closed_passes_through() -> None: + handler = _StatusSequence([HTTPStatus.OK]) + breaker = CircuitBreaker(failure_threshold=3, _now=_Clock()) + with _client(handler, breaker=breaker) as client: + response = client.get("https://example.test/x") + assert response.status_code == HTTPStatus.OK + + +def test_open_emits_opened_event_and_rejects(caplog: pytest.LogCaptureFixture) -> None: + handler = _StatusSequence([500, 500]) + breaker = CircuitBreaker(failure_threshold=2, _now=_Clock()) + with _client(handler, breaker=breaker) as client: + with caplog.at_level(logging.WARNING, logger="httpware.circuit_breaker"): + for _ in range(2): + with pytest.raises(InternalServerError): + client.get("https://example.test/x") + with pytest.raises(CircuitOpenError) as info: + client.get("https://example.test/y") + assert info.value.retry_after is not None + assert handler.calls == 2 + records = [r for r in caplog.records if r.name == "httpware.circuit_breaker"] + assert any("opened" in r.message for r in records) + assert any("rejecting" in r.message for r in records) + + +def test_success_resets_failure_streak() -> None: + handler = _StatusSequence([500, 500, 200, 500, 500]) + breaker = CircuitBreaker(failure_threshold=3, _now=_Clock()) + with _client(handler, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(InternalServerError): + client.get("https://example.test/x") + client.get("https://example.test/x") + for _ in range(2): + with pytest.raises(InternalServerError): + client.get("https://example.test/x") + response = client.get("https://example.test/x") + assert response.status_code == HTTPStatus.OK + assert handler.calls == 6 + + +def test_404_and_429_do_not_count_as_failures() -> None: + handler = _StatusSequence([404, 429, 404, 429, 404]) + breaker = CircuitBreaker(failure_threshold=2, _now=_Clock()) + with _client(handler, breaker=breaker) as client: + for _ in range(5): + with pytest.raises((NotFoundError, RateLimitedError)): + client.get("https://example.test/x") + assert handler.calls == 5 + + +def test_network_error_counts_as_failure() -> None: + def _raise(request: httpx2.Request) -> httpx2.Response: + msg = "connect failed" + raise httpx2.ConnectError(msg) + + breaker = CircuitBreaker(failure_threshold=2, _now=_Clock()) + with _client(_raise, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(NetworkError): + client.get("https://example.test/x") + with pytest.raises(CircuitOpenError): + client.get("https://example.test/x") + + +def test_non_counted_exception_propagates_without_state_change() -> None: + class _Boom: + def __call__(self, request: httpx2.Request, next: object) -> httpx2.Response: + msg = "boom" + raise ValueError(msg) + + handler = _StatusSequence([200]) + breaker = CircuitBreaker(failure_threshold=1, _now=_Clock()) + client = Client( + httpx2_client=httpx2.Client(transport=httpx2.MockTransport(handler)), + middleware=[breaker, _Boom()], + ) + with client: + for _ in range(3): + with pytest.raises(ValueError, match="boom"): + client.get("https://example.test/x") + + +def test_reset_timeout_admits_probe_then_closes(caplog: pytest.LogCaptureFixture) -> None: + clock = _Clock() + handler = _StatusSequence([500, 500, 200]) + breaker = CircuitBreaker(failure_threshold=2, reset_timeout=30.0, success_threshold=1, _now=clock) + with _client(handler, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(InternalServerError): + client.get("https://example.test/x") + with pytest.raises(CircuitOpenError): + client.get("https://example.test/x") + assert handler.calls == 2 + clock.advance(30.0) + with caplog.at_level(logging.INFO, logger="httpware.circuit_breaker"): + response = client.get("https://example.test/x") + assert response.status_code == HTTPStatus.OK + assert handler.calls == 3 + messages = [r.message for r in caplog.records if r.name == "httpware.circuit_breaker"] + assert any("half-open" in m for m in messages) + assert any("closed" in m for m in messages) + + +def test_probe_failure_reopens_circuit() -> None: + clock = _Clock() + handler = _StatusSequence([500, 500, 500]) + breaker = CircuitBreaker(failure_threshold=2, reset_timeout=10.0, _now=clock) + with _client(handler, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(InternalServerError): + client.get("https://example.test/x") + clock.advance(10.0) + with pytest.raises(InternalServerError): + client.get("https://example.test/x") + with pytest.raises(CircuitOpenError): + client.get("https://example.test/x") + assert handler.calls == 3 + + +def test_success_threshold_requires_multiple_probes() -> None: + clock = _Clock() + handler = _StatusSequence([500, 500, 200, 200]) + breaker = CircuitBreaker(failure_threshold=2, reset_timeout=5.0, success_threshold=2, _now=clock) + with _client(handler, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(InternalServerError): + client.get("https://example.test/x") + clock.advance(5.0) + client.get("https://example.test/x") + client.get("https://example.test/x") + response = client.get("https://example.test/x") + assert response.status_code == HTTPStatus.OK + assert handler.calls == 4 + + +def test_half_open_second_concurrent_request_rejected_with_none_retry_after() -> None: + """Two threads hit a half-open breaker; exactly one is the probe, the other is rejected.""" + clock = _Clock() + probe_started = threading.Event() + release_probe = threading.Event() + + def _handler(request: httpx2.Request) -> httpx2.Response: + probe_started.set() + release_probe.wait(timeout=5.0) + return httpx2.Response(HTTPStatus.OK, request=request) + + breaker = CircuitBreaker(failure_threshold=1, reset_timeout=1.0, _now=clock) + open_handler = _StatusSequence([500]) + with _client(open_handler, breaker=breaker) as opener: + with pytest.raises(InternalServerError): + opener.get("https://example.test/x") + clock.advance(1.0) + + client = Client( + httpx2_client=httpx2.Client(transport=httpx2.MockTransport(_handler)), + middleware=[breaker], + ) + rejected: list[CircuitOpenError] = [] + + def _probe() -> None: + client.get("https://example.test/probe") + + with client: + t = threading.Thread(target=_probe) + t.start() + assert probe_started.wait(timeout=5.0) + with pytest.raises(CircuitOpenError) as info: + client.get("https://example.test/concurrent") + rejected.append(info.value) + release_probe.set() + t.join(timeout=5.0) + + assert rejected[0].retry_after is None +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `just test tests/test_circuit_breaker_sync.py` +Expected: FAIL — `ImportError: cannot import name 'CircuitBreaker' from 'httpware.middleware.resilience.circuit_breaker'`. + +- [ ] **Step 3: Implement `CircuitBreaker` (sync)** + +Append to `src/httpware/middleware/resilience/circuit_breaker.py`: + +```python +class CircuitBreaker: + """Sync classic circuit breaker middleware. Mirror of AsyncCircuitBreaker. + + Serializes every state transition with a threading.Lock. Sharable across Clients + (one shared circuit); a sync instance cannot be shared with an AsyncClient. + """ + + def __init__( + self, + *, + failure_threshold: int = 5, + reset_timeout: float = 30.0, + success_threshold: int = 1, + failure_status_codes: frozenset[int] | None = None, + _now: Callable[[], float] = time.monotonic, + ) -> None: + self._state = _CircuitBreakerState( + failure_threshold=failure_threshold, + reset_timeout=reset_timeout, + success_threshold=success_threshold, + failure_status_codes=failure_status_codes, + now=_now, + ) + self._lock = threading.Lock() + + def __call__(self, request: httpx2.Request, next: Next) -> httpx2.Response: # noqa: A002 + """Admit, forward, then record the outcome. Fast-fail when the circuit is not closed.""" + with self._lock: + role = self._state.admit(request) + try: + response = next(request) + except StatusError as exc: + with self._lock: + if self._state.is_failure_status(exc.response.status_code): + self._state.on_failure(role, request) + else: + self._state.on_success(role, request) + raise + except (NetworkError, TimeoutError): + with self._lock: + self._state.on_failure(role, request) + raise + except BaseException: + with self._lock: + self._state.release_probe(role) + raise + with self._lock: + self._state.on_success(role, request) + return response +``` + +- [ ] **Step 4: Export `CircuitBreaker`** + +In `src/httpware/middleware/resilience/__init__.py`: +```python +from httpware.middleware.resilience.circuit_breaker import AsyncCircuitBreaker, CircuitBreaker +``` +`__all__` — insert `"CircuitBreaker",` after `"Bulkhead",`. + +In `src/httpware/__init__.py` add `CircuitBreaker` to the resilience import (after `Bulkhead`) and `"CircuitBreaker",` to `__all__`. Alphabetical order around it: `BulkheadFullError`, **`CircuitBreaker`**, `CircuitOpenError`, `Client` — so insert `"CircuitBreaker",` after `"BulkheadFullError",` and before `"CircuitOpenError",` (added in Task 1). + +```python +from httpware.middleware.resilience import ( + AsyncBulkhead, + AsyncCircuitBreaker, + AsyncRetry, + AsyncTimeout, + Bulkhead, + CircuitBreaker, + Retry, + RetryBudget, +) +``` + +- [ ] **Step 5: Run sync tests + import check** + +Run: `just test tests/test_circuit_breaker_sync.py` +Expected: PASS. +Run: `uv run python -c "import httpware; assert httpware.CircuitBreaker"` +Expected: exit 0. + +- [ ] **Step 6: Full suite + 100% coverage** + +Run: `just test` +Expected: PASS, coverage 100% (`--cov-fail-under=100` does not fail). If any line in `circuit_breaker.py` is uncovered (other than the `_check_loop` race `# pragma: no cover`), add the missing test before committing. + +- [ ] **Step 7: Lint** + +Run: `just lint` +Expected: clean. + +- [ ] **Step 8: Commit** + +```bash +git add src/httpware/middleware/resilience/circuit_breaker.py src/httpware/middleware/resilience/__init__.py src/httpware/__init__.py tests/test_circuit_breaker_sync.py +git commit -m "feat(resilience): add sync CircuitBreaker + +Co-Authored-By: Claude Fable 5 " +``` + +--- + +## Task 5: Property test — OPEN-state invariant + +**Files:** +- Test: `tests/test_circuit_breaker_props.py` + +- [ ] **Step 1: Write the property test** + +Create `tests/test_circuit_breaker_props.py`. Invariant: while OPEN and before `reset_timeout` elapses, `next` is never called (the transport call-count does not advance). Exercises the state machine directly (no HTTP) for speed and determinism. + +```python +"""Property test: while OPEN and before reset_timeout, the breaker never forwards. + +Drives the state machine directly via the public AsyncCircuitBreaker with a stub +`next` that records calls. Hypothesis generates random advance/outcome sequences. +""" + +import httpx2 +import pytest +from hypothesis import given, strategies as st + +from httpware import CircuitOpenError, InternalServerError +from httpware.middleware.resilience.circuit_breaker import AsyncCircuitBreaker + + +class _Clock: + def __init__(self) -> None: + self.t = 0.0 + + def __call__(self) -> float: + return self.t + + +def _request() -> httpx2.Request: + return httpx2.Request("GET", "https://example.test/x") + + +@given( + failure_threshold=st.integers(min_value=1, max_value=5), + reset_timeout=st.floats(min_value=1.0, max_value=100.0), + advances=st.lists(st.floats(min_value=0.0, max_value=0.5), min_size=1, max_size=20), +) +async def test_open_circuit_never_forwards_before_reset_timeout( + failure_threshold: int, + reset_timeout: float, + advances: list[float], +) -> None: + clock = _Clock() + breaker = AsyncCircuitBreaker( + failure_threshold=failure_threshold, + reset_timeout=reset_timeout, + _now=clock, + ) + forwarded = 0 + + async def _ok(request: httpx2.Request) -> httpx2.Response: + nonlocal forwarded + forwarded += 1 + return httpx2.Response(500, request=request) + + # Open the circuit: failure_threshold consecutive 500s (500 -> InternalServerError -> failure). + async def _five_hundred(request: httpx2.Request) -> httpx2.Response: + raise InternalServerError(httpx2.Response(500, request=request)) + + for _ in range(failure_threshold): + with pytest.raises(InternalServerError): + await breaker(_request(), _five_hundred) + + # Now OPEN. Each advance stays strictly below reset_timeout (sum of advances <= 10 < reset_timeout + # is NOT guaranteed; clamp by only advancing while total < reset_timeout). + calls_before = forwarded + total = 0.0 + for step in advances: + if total + step >= reset_timeout: + break + total += step + clock.t = total + with pytest.raises(CircuitOpenError): + await breaker(_request(), _ok) + + assert forwarded == calls_before # next was never called while OPEN pre-timeout +``` + +- [ ] **Step 2: Run the property test to verify it passes** + +Run: `just test tests/test_circuit_breaker_props.py` +Expected: PASS (hypothesis explores many sequences; no `next` call while OPEN pre-timeout). + +- [ ] **Step 3: Lint** + +Run: `just lint` +Expected: clean (confirm `InternalServerError` is imported at module top, no `PLC0415`). + +- [ ] **Step 4: Commit** + +```bash +git add tests/test_circuit_breaker_props.py +git commit -m "test(circuit-breaker): property test — OPEN never forwards pre-timeout + +Co-Authored-By: Claude Fable 5 " +``` + +--- + +## Task 6: Docs + release notes + +**Files:** +- Modify: `docs/resilience.md` +- Modify: `README.md` +- Create: `planning/releases/0.10.0.md` + +- [ ] **Step 1: Read the existing docs to match tone/structure** + +Read `docs/resilience.md` (existing Retry + Bulkhead sections) and the resilience paragraph in `README.md`. Match heading depth, code-fence style, and the "why" framing used for Bulkhead. + +- [ ] **Step 2: Add the CircuitBreaker section to `docs/resilience.md`** + +Add a `## Circuit breaker` section covering: what it does (classic consecutive-failure breaker), the three states, the constructor knobs (`failure_threshold`, `reset_timeout`, `success_threshold`, `failure_status_codes`), the failure classification (5xx + network + timeout; **429/4xx count as successes** — explain why tripping on 429 amplifies incidents), `CircuitOpenError` (and its `retry_after`), the observability events, and that the instance is sharable across clients (one shared circuit) but a sync instance can't be shared with an async one. Include a short async example and a sync example using `CircuitBreaker`. + +- [ ] **Step 3: Add the AsyncTimeout section to `docs/resilience.md`** + +Add a `## Overall timeout (async only)` section: what it bounds (total wall-clock across the inner pipeline, especially across an `AsyncRetry` loop), the `cm.expired()` distinction (our deadline → `httpware.TimeoutError`; inner timeout → propagates), and the two "why" notes: **why there is no sync `Timeout`** (no cancellation in sync Python) and **why it does not duplicate httpx2's per-call timeouts**. + +- [ ] **Step 4: Add the recommended ordering note** + +Add a short subsection (or extend the existing ordering guidance) documenting the recommended chain order and that it is **not enforced**: + +``` +AsyncTimeout → AsyncCircuitBreaker → AsyncBulkhead → AsyncRetry → terminal +``` + +Explain the consequence of breaker-outside-retry: an open circuit short-circuits the whole retry loop, and the breaker counts one outcome per fully-exhausted retry sequence. + +- [ ] **Step 5: Update the README resilience paragraph** + +In `README.md`, extend the resilience paragraph from "Retry + Bulkhead" to also list CircuitBreaker and AsyncTimeout (one phrase each), consistent with the existing style. + +- [ ] **Step 6: Write `planning/releases/0.10.0.md`** + +Create `planning/releases/0.10.0.md` following the format of the most recent release note in `planning/releases/`. Cover: new public names (`CircuitBreaker`, `AsyncCircuitBreaker`, `AsyncTimeout`, `CircuitOpenError`), the new observability events (`circuit.opened`, `circuit.rejected`, `circuit.half_open`, `circuit.closed`, `timeout.exceeded`), the additive/non-breaking nature, and the recommended ordering. Read an existing release note first to match structure. + +- [ ] **Step 7: Build docs locally (if the project supports it) or sanity-check links** + +Run: `uv run mkdocs build --strict` (if `mkdocs` is configured; otherwise skip) +Expected: builds without warnings. If `mkdocs` is not available, manually verify no broken internal links in the edited sections. + +- [ ] **Step 8: Commit** + +```bash +git add docs/resilience.md README.md planning/releases/0.10.0.md +git commit -m "docs(resilience): document CircuitBreaker + AsyncTimeout (0.10.0) + +Co-Authored-By: Claude Fable 5 " +``` + +--- + +## Final verification + +- [ ] **Run the full gate** + +Run: `just lint-ci && just test` +Expected: lint clean (no auto-fix needed), all tests pass, coverage 100%. + +- [ ] **Confirm exports** + +Run: `uv run python -c "import httpware; [getattr(httpware, n) for n in ('CircuitBreaker','AsyncCircuitBreaker','AsyncTimeout','CircuitOpenError')]; print('ok')"` +Expected: `ok`. + +- [ ] **Architecture invariants** + +Run: `grep -rE 'httpx2\._' src/httpware/ || echo "no private httpx2 access"` +Expected: `no private httpx2 access`. +Run: `grep -rn 'from __future__ import annotations' src/httpware/ || echo "clean"` +Expected: `clean`. + +The branch is ready for `requesting-code-review` → `finishing-a-development-branch`. Release tagging (`0.10.0`) happens via the existing release flow: the tag name *is* the version (`uv version $GITHUB_REF_NAME`); do **not** bump `pyproject.toml`. + +--- + +## Self-Review notes (author) + +- **Spec coverage:** CircuitOpenError (T1), AsyncTimeout incl. `cm.expired()` inner-vs-deadline (T2), breaker state machine + classification + concurrency + loop guard (T3/T4), events (asserted in T2–T4 via caplog), property invariant (T5), docs/ordering/release (T6). All spec sections map to a task. +- **`test_observability.py`:** spec mentioned a centralized assertion "if asserted centrally" — there is none, so events are asserted in the feature files. Noted at the top of File Structure. +- **Type/name consistency:** `_CircuitBreakerState` methods (`admit`, `on_success`, `on_failure`, `release_probe`, `is_failure_status`, `_open`, `_emit`) are referenced identically in both wrappers. Role constants `_ROLE_CLOSED`/`_ROLE_PROBE`. Logger `httpware.circuit_breaker`. Event names match the spec table exactly. +- **Shared-core deviation** from bulkhead/retry duplication is called out in Task 3 with rationale; preserves the three spec-required properties. diff --git a/planning/releases/0.10.0.md b/planning/releases/0.10.0.md new file mode 100644 index 0000000..5656244 --- /dev/null +++ b/planning/releases/0.10.0.md @@ -0,0 +1,44 @@ +# httpware 0.10.0 — circuit breaker + async timeout + +**Minor release. Additive only — no breaking changes.** + +## New public names + +```python +from httpware.middleware.resilience import AsyncCircuitBreaker # async +from httpware.middleware.resilience import CircuitBreaker # sync +from httpware.middleware.resilience import AsyncTimeout +from httpware import CircuitOpenError +``` + +## `AsyncCircuitBreaker` / `CircuitBreaker` + +Classic consecutive-failure circuit breaker. Counts counted failures (5xx, `NetworkError`, `TimeoutError`) and fast-fails with `CircuitOpenError` once `failure_threshold` consecutive failures are observed. Recovers via a HALF_OPEN probe after `reset_timeout` seconds; closes when `success_threshold` consecutive probe successes are seen. + +4xx responses — including 429 — count as successes. A 429 means healthy-but-throttling; tripping the circuit on it would amplify incidents. + +`CircuitOpenError` (a `ClientError` subclass) carries `retry_after: float | None` — the seconds until the next probe window (`None` when HALF_OPEN with a probe already in flight). + +Sharable across multiple clients (one shared circuit). A sync `CircuitBreaker` cannot be shared with an `AsyncCircuitBreaker`. + +## `AsyncTimeout` + +Bounds total wall-clock across the inner pipeline — including retries and backoff sleeps. Raises `httpware.TimeoutError` on expiry. Async-only: sync Python has no cancellation primitive that can interrupt a blocking call mid-flight. + +## New observability events + +| Logger | Event | When | +|---|---|---| +| `httpware.circuit_breaker` | `circuit.opened` | Failure threshold reached | +| `httpware.circuit_breaker` | `circuit.rejected` | Request fast-failed (OPEN or HALF_OPEN probe taken) | +| `httpware.circuit_breaker` | `circuit.half_open` | Reset timeout elapsed; probe admitted | +| `httpware.circuit_breaker` | `circuit.closed` | Success threshold reached; service recovered | +| `httpware.timeout` | `timeout.exceeded` | Overall timeout expired | + +## Recommended chain ordering + +``` +AsyncTimeout → AsyncCircuitBreaker → AsyncBulkhead → AsyncRetry → terminal +``` + +Breaker outside retry: an open circuit short-circuits the whole retry loop; the breaker counts one outcome per fully-exhausted retry sequence. diff --git a/planning/specs/2026-06-13-circuit-breaker-and-timeout-design.md b/planning/specs/2026-06-13-circuit-breaker-and-timeout-design.md new file mode 100644 index 0000000..16aef0f --- /dev/null +++ b/planning/specs/2026-06-13-circuit-breaker-and-timeout-design.md @@ -0,0 +1,309 @@ +# Spec: CircuitBreaker + AsyncTimeout — completing the resilience suite + +**Date:** 2026-06-13 +**Topic slug:** `circuit-breaker-and-timeout` +**Status:** drafted, awaiting user review +**Target release:** `0.10.0` (minor — purely additive API; no deprecations, no contract changes) +**Branch:** `feat/circuit-breaker-timeout` off `main` + +## Purpose + +`httpware`'s resilience suite ships `Retry`/`AsyncRetry` (+ `RetryBudget`) and `Bulkhead`/`AsyncBulkhead` — two of the ~5 strategies that define the Polly / Resilience4j shape. This spec adds the two that let the suite be honestly described as a composable resilience pipeline: + +1. **`CircuitBreaker` + `AsyncCircuitBreaker`** — a *classic* consecutive-failure circuit breaker (Polly's pre-v8 default). +2. **`AsyncTimeout`** — an overall wall-clock deadline across the whole inner pipeline (async only). + +Both are pure stdlib (`asyncio.timeout`, `time.monotonic`, `threading.Lock`, `enum`). No new optional extra. They slot into the existing middleware chain (Seam A) and reuse the existing observability helper (`_emit_event`) and error conventions (`ClientError` subclass + module-level `_reconstruct_*` + `__reduce__`). + +## Resolved design decisions (settled in brainstorming — not open for re-litigation) + +1. **Timeout is async-only.** `AsyncTimeout` bounds total wall-clock across everything `next` wraps — most importantly across an `AsyncRetry` loop, whose attempts and backoff sleeps `httpx2` cannot bound. It does **not** duplicate `httpx2`'s per-call connect/read/write/pool timeouts. **No sync `Timeout` ships:** a sync total-deadline cannot interrupt a blocking call mid-flight (sync Python has no cancellation), and `httpx2` already covers sync per-call timeouts. This is the one deliberate break from sync/async parity in the project; the docstring states why. +2. **The breaker v1 trips on consecutive failures** (Polly *classic* breaker): open after `failure_threshold` consecutive counted failures → probe after `reset_timeout` → close after `success_threshold` consecutive half-open successes. Rolling-window / failure-rate (Resilience4j / Polly-v8 default) is **deferred to v2**; the config is shaped so adding a `window` mode later is purely additive. +3. **Failure classification = 5xx + network + timeout, excluding 429.** A *counted failure* is `NetworkError`, httpware `TimeoutError`, or a `StatusError` whose `status_code` is in the effective failure set (default = all 5xx, 500–599). 4xx including 429 do **not** trip the breaker (429 = healthy-but-throttling; tripping amplifies the incident) and count as breaker *successes*. Any other exception type (e.g. `BulkheadFullError`, `ValueError`) propagates unchanged and does **not** affect circuit state. +4. **Control surface is events-only (YAGNI).** No public `state` property, no `reset()` / `isolate()`. Monitoring goes through the observability events. State introspection and manual control can be added additively in a later release if a concrete consumer demand surfaces. +5. **Recommended ordering is breaker-outside-retry.** Documented (not enforced): `AsyncTimeout → AsyncCircuitBreaker → AsyncBulkhead → AsyncRetry → terminal` (corrected during implementation: AsyncBulkhead sits outside AsyncRetry to keep one slot per logical call, consistent with the existing `test_bulkhead_outside_retry_holds_one_slot_across_attempts` guidance). With the breaker outside retry, an open circuit short-circuits the *entire* retry loop (don't hammer a service that's already down), and the breaker counts one outcome per fully-exhausted retry sequence rather than per attempt. + +## Non-goals + +- **No sync `Timeout`.** See decision 1. Sync callers configure `httpx2`'s timeouts directly. +- **No rolling-window / failure-rate breaker.** Deferred to v2. Config shaped to make it additive (a future `window`/`failure_rate` mode coexists with the consecutive-failure default). +- **No public state introspection or manual control** (`state`, `reset()`, `isolate()`). See decision 4. +- **No per-call retry coupling.** The breaker does not know about `AsyncRetry`; it is plain middleware. Ordering is the user's choice; we only *recommend* one. +- **No new optional extra.** Pure stdlib. +- **No change to `Retry`, `Bulkhead`, `RetryBudget`, the clients, decoders, or any existing error.** Purely additive. The only edits to existing files are export lists, docs, and centralized event-name assertions in `test_observability.py`. +- **No enforced ordering.** The chain composition in `client.py` is untouched; we do not validate or reorder middleware. + +## Architecture + +### File layout + +``` +src/httpware/errors.py # + CircuitOpenError (append) +src/httpware/middleware/resilience/timeout.py # NEW — AsyncTimeout +src/httpware/middleware/resilience/circuit_breaker.py # NEW — CircuitBreaker + AsyncCircuitBreaker +src/httpware/middleware/resilience/__init__.py # + 3 new names (imports + __all__) +src/httpware/__init__.py # + 3 new names (imports + __all__) +tests/test_timeout.py # NEW +tests/test_circuit_breaker.py # NEW — async +tests/test_circuit_breaker_sync.py # NEW — sync mirror +tests/test_circuit_breaker_props.py # NEW — hypothesis invariant (optional but specified) +tests/test_errors.py # + CircuitOpenError fields/pickle +# (event names asserted in the feature test files above, not test_observability.py) +docs/resilience.md # + CircuitBreaker + AsyncTimeout sections +README.md # resilience paragraph +planning/releases/0.10.0.md # NEW +``` + +### Piece 1 — `CircuitOpenError` + +Appended to `errors.py`, mirroring `BulkheadFullError`'s module-level `_reconstruct_*` + `__reduce__` exactly (`errors.py:188-214`). A `ClientError` subclass with one keyword-only field `retry_after: float | None` — seconds until the circuit will next admit a probe, or `None` when a concurrent probe already holds the half-open slot. + +```python +def _reconstruct_circuit_open( + cls: "type[CircuitOpenError]", + retry_after: float | None, +) -> "CircuitOpenError": + return cls(retry_after=retry_after) + + +class CircuitOpenError(ClientError): + """Raised when a CircuitBreaker refuses a request because the circuit is not closed. + + Fires when the circuit is OPEN, or when it is HALF_OPEN and the single probe + slot is already taken. The request is never forwarded to ``next``. ``retry_after`` + carries the seconds until the circuit will next admit a probe, when known + (``None`` when a concurrent probe is already in flight). + """ + + retry_after: float | None + + def __init__(self, *, retry_after: float | None) -> None: + self.retry_after = retry_after + if retry_after is None: + super().__init__("circuit open (a probe request is already in flight)") + else: + super().__init__(f"circuit open (retry_after={retry_after:.3f}s)") + + def __reduce__(self) -> tuple[Any, ...]: + return (_reconstruct_circuit_open, (type(self), self.retry_after)) +``` + +`Any` is already imported in `errors.py` (`from typing import Any`). Exported from `httpware/__init__.py` (imports block + `__all__`, alphabetical). + +This is a non-status `ClientError` that defines `__init__` with a keyword-only field — consistent with `BulkheadFullError`, `RetryBudgetExhaustedError`, `DecodeError`, `MissingDecoderError`. The "no `__init__` override" rule scopes only to `StatusError` subclasses (CLAUDE.md §Exception construction). + +### Piece 2 — `AsyncTimeout` (`resilience/timeout.py`) + +Async-only. Bounds total wall-clock for `next(request)` to complete. Drop-in from the brief; the correctness hinge is `cm.expired()`: + +- `asyncio.timeout(self._timeout)` raises `TimeoutError` on expiry. +- httpware's `TimeoutError` **subclasses `builtins.TimeoutError`** (`errors.py:51`), so an inner per-call timeout surfacing through `next` (e.g. an `httpx2` read timeout, possibly via a retry) is *also* a `TimeoutError`. We must not re-label it as our overall deadline. +- `cm.expired()` is the discriminator: `True` ⇒ our deadline fired (re-wrap as httpware `TimeoutError`, emit `timeout.exceeded`); `False` ⇒ inner timeout (re-raise unchanged). + +```python +"""AsyncTimeout middleware — overall wall-clock deadline across the inner pipeline. + +See planning/specs/2026-06-13-circuit-breaker-and-timeout-design.md for the contract. + +This is NOT a per-call timeout — httpx2's connect/read/write/pool timeouts are the +right tool for bounding a single outbound call, and AsyncTimeout does not duplicate +them. What httpx2 cannot bound is the total wall-clock across the whole middleware +pipeline (most importantly across an AsyncRetry loop, whose attempts and backoff +sleeps it knows nothing about). Place AsyncTimeout outermost to enforce +"this whole operation must finish within `timeout` seconds, even across retries." + +Async-only by design: a sync total-deadline cannot interrupt a blocking httpx2 call +mid-flight (sync Python has no cancellation), and httpx2 already covers sync per-call +timeouts. Sync callers configure httpx2's timeouts directly; there is no sync Timeout. +""" + +import asyncio +import logging + +import httpx2 + +from httpware._internal.observability import _emit_event +from httpware.errors import TimeoutError as HttpwareTimeoutError # noqa: A004 +from httpware.middleware import AsyncNext + + +_TIMEOUT_INVALID = "timeout must be > 0" + +_LOGGER = logging.getLogger("httpware.timeout") + + +class AsyncTimeout: + """Bounds total wall-clock time spent in the inner pipeline. + + Parameters + ---------- + timeout + Required. Overall deadline in seconds for ``next(request)`` to complete, + including everything it wraps (retries, backoff sleeps, the call itself). + Must be ``> 0``. On expiry the middleware raises ``httpware.TimeoutError``. + + Place outermost in the chain for an overall-operation deadline. For bounding a + single outbound call (connect/read/write/pool), configure ``httpx2`` instead. + """ + + def __init__(self, *, timeout: float) -> None: + if timeout <= 0: + raise ValueError(_TIMEOUT_INVALID) + self._timeout = timeout + + async def __call__(self, request: httpx2.Request, next: AsyncNext) -> httpx2.Response: # noqa: A002 + """Invoke next under an asyncio.timeout; raise httpware.TimeoutError on expiry. + + Only a deadline THIS middleware imposed is re-wrapped: ``cm.expired()`` + distinguishes our own expiry from an inner ``TimeoutError`` (e.g. an httpx2 + per-call timeout surfacing through a retry), which propagates unchanged. + """ + try: + async with asyncio.timeout(self._timeout) as cm: + return await next(request) + except TimeoutError as exc: + if not cm.expired(): + raise # inner TimeoutError, not our deadline — leave it untouched + _emit_event( + _LOGGER, + "timeout.exceeded", + level=logging.WARNING, + message="overall timeout exceeded", + attributes={ + "timeout": self._timeout, + "method": request.method, + "url": str(request.url), + }, + ) + raise HttpwareTimeoutError(f"overall timeout of {self._timeout}s exceeded") from exc +``` + +### Piece 3 — `CircuitBreaker` + `AsyncCircuitBreaker` (`resilience/circuit_breaker.py`) + +Mirrors `bulkhead.py`'s shape: a shared, sharable instance (pass the same one to multiple clients = one shared circuit); the async class carries the `_check_loop` single-event-loop guard verbatim (`bulkhead.py:78-96`); the sync class guards all state under a `threading.Lock` (mirroring sync `Bulkhead`). Module-level message constants for `ValueError`s. + +#### Constructor (identical signature, both classes) + +```python +class AsyncCircuitBreaker: + def __init__( + self, + *, + failure_threshold: int = 5, # consecutive failures that open; >= 1 + reset_timeout: float = 30.0, # seconds OPEN before a probe; >= 0 + success_threshold: int = 1, # consecutive half-open successes to close; >= 1 + failure_status_codes: frozenset[int] | None = None, # None -> all 5xx (500-599) + _now: Callable[[], float] = time.monotonic, # seam for deterministic tests + ) -> None: ... +``` + +Validation (module-level message constants, like `bulkhead.py:32-33`): +- `failure_threshold >= 1` else `ValueError(_FAILURE_THRESHOLD_INVALID)` +- `reset_timeout >= 0` else `ValueError(_RESET_TIMEOUT_INVALID)` +- `success_threshold >= 1` else `ValueError(_SUCCESS_THRESHOLD_INVALID)` + +**Spec-author choice — `failure_status_codes` normalization:** at construction, `None` is normalized to `frozenset(range(500, 600))`. There is then exactly one classification code path (set membership); no per-request None-branch. The stored attribute is always a `frozenset[int]`. + +#### Internal state representation + +**Spec-author choice:** a private `enum.Enum` named `_CircuitState` with members `CLOSED`, `OPEN`, `HALF_OPEN`. Not exported (decision 4 — events only). Per-instance mutable fields: `_state`, `_consecutive_failures`, `_consecutive_successes`, `_opened_at: float`, `_probe_in_flight: bool`. + +#### State machine + +State transitions are synchronous (no `await` between read and mutate), so under asyncio they are atomic. The sync class wraps every read+transition (the admit decision and the record-outcome step) in the `threading.Lock`. The async class relies on cooperative scheduling plus `_check_loop` to reject cross-loop misuse. + +The request flow is two synchronous critical sections around the single `await next(request)` / `next(request)`: + +**Admit (before `next`)** — decide the request's role or reject: +- **CLOSED** → role `closed`; call `next`. +- **OPEN**: + - if `_now() - _opened_at >= reset_timeout` → transition to **HALF_OPEN**, set `_probe_in_flight = True`, emit `circuit.half_open` (INFO; attrs `method`, `url`), role `probe`; call `next`. + - else → raise `CircuitOpenError(retry_after = max(0.0, reset_timeout - (_now() - _opened_at)))`, emit `circuit.rejected` (WARNING; attrs `retry_after`, `method`, `url`). `next` is **not** called. +- **HALF_OPEN**: + - if `_probe_in_flight` → raise `CircuitOpenError(retry_after=None)`, emit `circuit.rejected`. `next` is **not** called. + - else (a prior probe succeeded but `success_threshold` not yet met) → set `_probe_in_flight = True`, role `probe`; call `next`. + +**Record (after `next`), in `finally` + result/except handling:** +- Classify the outcome: + - **counted failure** = `next` raised `NetworkError`, httpware `TimeoutError`, or a `StatusError` with `status_code` in the effective failure set. + - **success** = `next` returned a response, OR raised a `StatusError` whose `status_code` is *not* in the failure set (e.g. 404, 429). + - **non-counted** = any other exception type → clears `_probe_in_flight` (if probe), leaves all state unchanged, re-raises. +- On **success**: + - CLOSED → `_consecutive_failures = 0`. + - HALF_OPEN → `_consecutive_successes += 1`; if `>= success_threshold` → **CLOSED**, reset all counters, emit `circuit.closed` (INFO; attrs `method`, `url`). Else stay HALF_OPEN (the next request becomes the next probe). +- On **counted failure**: + - CLOSED → `_consecutive_failures += 1`; if `>= failure_threshold` → **OPEN**, `_opened_at = _now()`, emit `circuit.opened` (WARNING; attrs `failure_threshold`, `failures`, `method`, `url`). Re-raise the original exception unwrapped. + - HALF_OPEN → **OPEN**, `_opened_at = _now()`, `_consecutive_successes = 0`, emit `circuit.opened` (re-open; `failures` reported as `1` — the single probe failure that re-opened the circuit). Re-raise unwrapped. +- `_probe_in_flight` is cleared in `finally` whenever the request held the probe slot, regardless of outcome (success, counted failure, non-counted exception, cancellation). + +Counted failures **re-raise the original exception unwrapped** (matching `AsyncRetry`'s treatment of `StatusError`). The breaker never wraps a downstream error; it only *adds* `CircuitOpenError` when it refuses to forward. + +#### Concurrency + +- **Async:** carry `_check_loop` (cached-loop fast path + `threading.Lock` double-checked write, including the `# pragma: no cover` inner race arm) verbatim from `AsyncBulkhead`. The admit/record critical sections contain no `await`, so they are atomic under a single event loop. The probe gate is the synchronous `_probe_in_flight` flag set inside admit before the `await` and cleared in `finally`. +- **Sync:** a `threading.Lock` guards every state read + transition and the probe flag (mirror sync `Bulkhead`). Sharable across `Client`s. A sync instance **cannot** be shared with an async one (documented, like `Bulkhead`). + +#### Exports + +`resilience/__init__.py` and `httpware/__init__.py`: add `AsyncCircuitBreaker`, `AsyncTimeout`, `CircuitBreaker` to imports + `__all__` (keep alphabetical). Update the `resilience/__init__.py` module docstring to mention the new primitives. + +### Observability (public, stable once shipped) + +Loggers: `httpware.circuit_breaker`, `httpware.timeout`. Events: + +| Event | Level | Attributes | +|-------|-------|------------| +| `circuit.opened` | WARNING | `failure_threshold`, `failures`, `method`, `url` | +| `circuit.rejected` | WARNING | `retry_after`, `method`, `url` | +| `circuit.half_open` | INFO | `method`, `url` | +| `circuit.closed` | INFO | `method`, `url` | +| `timeout.exceeded` | WARNING | `timeout`, `method`, `url` | + +These names join `retry.*` / `bulkhead.*` as the stable observability surface; renames are breaking changes. + +## Testing + +TDD, 100% branch coverage enforced (`--cov-fail-under=100`). `httpx2.MockTransport` injected via `AsyncClient(httpx2_client=httpx2.AsyncClient(transport=mock))` / `Client(httpx2_client=httpx2.Client(transport=mock))`. Drive time with an injected `_now` (a small advancing-clock helper). Reuse the `_ResponseSequence` + `_client` pattern from `tests/test_retry.py`. + +**`tests/test_circuit_breaker.py` (async) + `tests/test_circuit_breaker_sync.py` (sync mirror)** — every branch: +- closed passes through; `N` consecutive counted-failures → OPEN (assert `circuit.opened`). +- OPEN fast-fails with `CircuitOpenError`, `next` not called, `retry_after` set and clamped ≥ 0 (assert `circuit.rejected`). +- after `reset_timeout` (advance `_now`) → HALF_OPEN admits one probe (assert `circuit.half_open`). +- probe counted-failure → OPEN again, `_opened_at` reset, success counter zeroed. +- probe success × `success_threshold` → CLOSED (assert `circuit.closed`); cover `success_threshold > 1` (stay HALF_OPEN between probes). +- 429 and 404 do **not** count as failures (CLOSED counter resets; treated as success). +- non-counted exception (`ValueError`) propagates unchanged, no state change, probe flag cleared. +- a success mid-streak resets the consecutive-failure counter. +- half-open second concurrent request fast-fails with `retry_after=None` (assert `circuit.rejected`). +- ctor validation for all three numeric params (one test each). +- custom `failure_status_codes` set: a code in the set trips; a 5xx *not* in the set is a success. +- **async only:** `_check_loop` raises `RuntimeError` on cross-loop use (mirror the bulkhead cross-loop test). + +**`tests/test_circuit_breaker_props.py` (hypothesis):** invariant "while OPEN and before `reset_timeout` elapses, `next` is never called" — generate random failure/advance sequences, assert the transport's call count does not increase while the circuit is OPEN pre-timeout. Mirror the `test_*_props.py` convention. + +**`tests/test_timeout.py`:** +- pass-through returns the response when under budget. +- expiry raises `httpware.TimeoutError` chained from `builtins.TimeoutError` (assert `__cause__`) and emits `timeout.exceeded`. +- an inner `TimeoutError` raised by `next` propagates unchanged (cm not expired) — assert it is the *inner* exception, not the overall-deadline message. +- `timeout <= 0` raises `ValueError` at construction. +- Deterministic: inject a `next` that `await asyncio.sleep(...)`s under a controllable clock; no wall-clock dependence. + +**`tests/test_errors.py`:** `CircuitOpenError` is a `ClientError`, stores `retry_after`, summary string for both `None` and a float, pickle round-trip via `__reduce__` (mirror `test_bulkhead_full_error_pickleable`). + +**Event-name assertions:** there is no central event-name registry test (`test_observability.py` only unit-tests the `_emit_event` helper), so the 5 new event names are asserted in their own feature test files via `caplog` — `circuit.opened`/`circuit.rejected` in `test_open_emits_opened_event_and_rejects`, `circuit.half_open`/`circuit.closed` in `test_reset_timeout_admits_probe_then_closes`, `timeout.exceeded` in `test_expiry_raises_httpware_timeout_chained_from_builtin`. + +`# pragma: no cover` only for genuinely-unreachable invariant arms (e.g. the `_check_loop` inner race arm), matching the existing style. + +## Docs + release + +- **`docs/resilience.md`:** a CircuitBreaker section and an AsyncTimeout section; the recommended ordering `AsyncTimeout → AsyncCircuitBreaker → AsyncBulkhead → AsyncRetry → terminal` (documented, not enforced); the rationale notes ("why no sync Timeout", "why not duplicate httpx2 per-call timeouts", "429/4xx count as successes, not failures"). +- **`README.md`:** extend the resilience paragraph from "Retry + Bulkhead" to include CircuitBreaker + AsyncTimeout. +- **`planning/releases/0.10.0.md`:** new release notes (additive minor; new public names; new observability events). + +## Green gate + +`just lint` clean (eof-fixer + ruff format + ruff check --fix + ty). `just test` → 100% coverage, all pass. Commit per piece (`CircuitOpenError` → `AsyncTimeout` → breaker → exports → docs/release). Minor bump `0.10.0` (additive API, pre-1.0). The tag name *is* the version (`uv version $GITHUB_REF_NAME`); `pyproject.toml` version is not bumped (see release-mechanics convention). + +## Open questions + +None. All design decisions resolved in brainstorming (see "Resolved design decisions" above). diff --git a/src/httpware/__init__.py b/src/httpware/__init__.py index f1599f9..746e057 100644 --- a/src/httpware/__init__.py +++ b/src/httpware/__init__.py @@ -6,6 +6,7 @@ STATUS_TO_EXCEPTION, BadRequestError, BulkheadFullError, + CircuitOpenError, ClientError, ClientStatusError, ConflictError, @@ -37,19 +38,32 @@ before_request, on_error, ) -from httpware.middleware.resilience import AsyncBulkhead, AsyncRetry, Bulkhead, Retry, RetryBudget +from httpware.middleware.resilience import ( + AsyncBulkhead, + AsyncCircuitBreaker, + AsyncRetry, + AsyncTimeout, + Bulkhead, + CircuitBreaker, + Retry, + RetryBudget, +) __all__ = [ "STATUS_TO_EXCEPTION", "AsyncBulkhead", + "AsyncCircuitBreaker", "AsyncClient", "AsyncMiddleware", "AsyncNext", "AsyncRetry", + "AsyncTimeout", "BadRequestError", "Bulkhead", "BulkheadFullError", + "CircuitBreaker", + "CircuitOpenError", "Client", "ClientError", "ClientStatusError", diff --git a/src/httpware/errors.py b/src/httpware/errors.py index baedda4..8502f2b 100644 --- a/src/httpware/errors.py +++ b/src/httpware/errors.py @@ -214,6 +214,35 @@ def __reduce__(self) -> tuple[Any, ...]: ) +def _reconstruct_circuit_open( + cls: "type[CircuitOpenError]", + retry_after: float | None, +) -> "CircuitOpenError": + return cls(retry_after=retry_after) + + +class CircuitOpenError(ClientError): + """Raised when a CircuitBreaker refuses a request because the circuit is not closed. + + Fires when the circuit is OPEN, or when it is HALF_OPEN and the single probe + slot is already taken. The request is never forwarded to ``next``. ``retry_after`` + carries the seconds until the circuit will next admit a probe, when known + (``None`` when a concurrent probe is already in flight). + """ + + retry_after: float | None + + def __init__(self, *, retry_after: float | None) -> None: + self.retry_after = retry_after + if retry_after is None: + super().__init__("circuit open (a probe request is already in flight)") + else: + super().__init__(f"circuit open (retry_after={retry_after:.3f}s)") + + def __reduce__(self) -> tuple[Any, ...]: + return (_reconstruct_circuit_open, (type(self), self.retry_after)) + + def _reconstruct_decode_error( cls: "type[DecodeError]", response: httpx2.Response, diff --git a/src/httpware/middleware/resilience/__init__.py b/src/httpware/middleware/resilience/__init__.py index 0c7d5ce..cdde36e 100644 --- a/src/httpware/middleware/resilience/__init__.py +++ b/src/httpware/middleware/resilience/__init__.py @@ -1,8 +1,19 @@ -"""Resilience primitives: Bulkhead/AsyncBulkhead, Retry/AsyncRetry, RetryBudget.""" +"""Resilience middleware: Bulkhead, CircuitBreaker, Retry, RetryBudget, and their Async counterparts + AsyncTimeout.""" from httpware.middleware.resilience.budget import RetryBudget from httpware.middleware.resilience.bulkhead import AsyncBulkhead, Bulkhead +from httpware.middleware.resilience.circuit_breaker import AsyncCircuitBreaker, CircuitBreaker from httpware.middleware.resilience.retry import AsyncRetry, Retry +from httpware.middleware.resilience.timeout import AsyncTimeout -__all__ = ["AsyncBulkhead", "AsyncRetry", "Bulkhead", "Retry", "RetryBudget"] +__all__ = [ + "AsyncBulkhead", + "AsyncCircuitBreaker", + "AsyncRetry", + "AsyncTimeout", + "Bulkhead", + "CircuitBreaker", + "Retry", + "RetryBudget", +] diff --git a/src/httpware/middleware/resilience/circuit_breaker.py b/src/httpware/middleware/resilience/circuit_breaker.py new file mode 100644 index 0000000..8b1b42c --- /dev/null +++ b/src/httpware/middleware/resilience/circuit_breaker.py @@ -0,0 +1,302 @@ +"""CircuitBreaker + AsyncCircuitBreaker — classic consecutive-failure circuit breaker. + +See planning/specs/2026-06-13-circuit-breaker-and-timeout-design.md for the contract. + +A counted failure is a NetworkError, an httpware TimeoutError, or a StatusError whose +status_code is in the effective failure set (default: all 5xx). 4xx — including 429 — +count as successes: 429 means healthy-but-throttling, and tripping on it amplifies +incidents. Any other exception propagates without affecting circuit state. + +State machine (classic / consecutive-failure): + CLOSED — forward; count consecutive counted-failures; open at failure_threshold. + OPEN — fast-fail with CircuitOpenError; after reset_timeout the next request + becomes the half-open probe. + HALF_OPEN — admit exactly one probe at a time; success_threshold consecutive probe + successes close the circuit; one probe failure re-opens it. + +The lock-free _CircuitBreakerState holds the transition logic, shared by both wrappers. +AsyncCircuitBreaker relies on asyncio atomicity (no await inside a transition) plus a +single-event-loop guard; CircuitBreaker (sync) serializes transitions with a +threading.Lock. Both are sharable across clients (one shared circuit); a sync instance +cannot be shared with an async one. +""" + +import asyncio +import enum +import logging +import threading +import time +import typing +from collections.abc import Callable, Collection + +import httpx2 + +from httpware._internal.observability import _emit_event +from httpware.errors import CircuitOpenError, NetworkError, StatusError, TimeoutError # noqa: A004 +from httpware.middleware import AsyncNext, Next + + +_FAILURE_THRESHOLD_INVALID = "failure_threshold must be >= 1" +_RESET_TIMEOUT_INVALID = "reset_timeout must be >= 0" +_SUCCESS_THRESHOLD_INVALID = "success_threshold must be >= 1" +_CROSS_LOOP_MSG = ( + "AsyncCircuitBreaker is bound to a single event loop. First seen on {first!r}; " + "current request is on {current!r}. Use one AsyncCircuitBreaker per loop; " + "cross-thread sharing requires the sync CircuitBreaker primitive." +) + +_DEFAULT_FAILURE_STATUS_CODES = frozenset(range(500, 600)) + +_ROLE_CLOSED = "closed" +_ROLE_PROBE = "probe" + +_LOGGER = logging.getLogger("httpware.circuit_breaker") + + +class _CircuitState(enum.Enum): + CLOSED = "closed" + OPEN = "open" + HALF_OPEN = "half_open" + + +class _CircuitBreakerState: + """Lock-free circuit-breaker state machine shared by the sync + async wrappers. + + Every method is synchronous and performs no I/O beyond logging. The async wrapper + calls these directly (atomic under a single event loop because no await occurs + inside a transition); the sync wrapper wraps each call in a threading.Lock. + """ + + def __init__( + self, + *, + failure_threshold: int, + reset_timeout: float, + success_threshold: int, + failure_status_codes: Collection[int] | None, + now: Callable[[], float], + ) -> None: + if failure_threshold < 1: + raise ValueError(_FAILURE_THRESHOLD_INVALID) + if reset_timeout < 0: + raise ValueError(_RESET_TIMEOUT_INVALID) + if success_threshold < 1: + raise ValueError(_SUCCESS_THRESHOLD_INVALID) + self._failure_threshold = failure_threshold + self._reset_timeout = reset_timeout + self._success_threshold = success_threshold + # Accept any Collection (set, frozenset, list, ...) and freeze it so callers + # aren't forced to construct a frozenset just to satisfy the type checker. + self._failure_status_codes = ( + frozenset(failure_status_codes) if failure_status_codes is not None else _DEFAULT_FAILURE_STATUS_CODES + ) + self._now = now + self._state = _CircuitState.CLOSED + self._consecutive_failures = 0 + self._consecutive_successes = 0 + self._opened_at = 0.0 + self._probe_in_flight = False + + def is_failure_status(self, status_code: int) -> bool: + return status_code in self._failure_status_codes + + def admit(self, request: httpx2.Request) -> str: + """Decide the request's role, or raise CircuitOpenError. No await inside.""" + if self._state is _CircuitState.CLOSED: + return _ROLE_CLOSED + if self._state is _CircuitState.OPEN: + elapsed = self._now() - self._opened_at + if elapsed >= self._reset_timeout: + self._state = _CircuitState.HALF_OPEN + self._probe_in_flight = True + self._emit(request, "circuit.half_open", logging.INFO, "circuit half-open — admitting probe", {}) + return _ROLE_PROBE + retry_after = max(0.0, self._reset_timeout - elapsed) + self._emit( + request, + "circuit.rejected", + logging.WARNING, + "circuit open — rejecting request", + {"retry_after": retry_after}, + ) + raise CircuitOpenError(retry_after=retry_after) + # HALF_OPEN + if self._probe_in_flight: + self._emit( + request, + "circuit.rejected", + logging.WARNING, + "circuit half-open — rejecting request (probe in flight)", + {"retry_after": None}, + ) + raise CircuitOpenError(retry_after=None) + self._probe_in_flight = True + return _ROLE_PROBE + + def on_success(self, role: str, request: httpx2.Request) -> None: + if role == _ROLE_PROBE: + self._probe_in_flight = False + if self._state is _CircuitState.CLOSED: + self._consecutive_failures = 0 + elif self._state is _CircuitState.HALF_OPEN: + self._consecutive_successes += 1 + if self._consecutive_successes >= self._success_threshold: + self._state = _CircuitState.CLOSED + self._consecutive_failures = 0 + self._consecutive_successes = 0 + self._emit(request, "circuit.closed", logging.INFO, "circuit closed — service recovered", {}) + + def on_failure(self, role: str, request: httpx2.Request) -> None: + if role == _ROLE_PROBE: + self._probe_in_flight = False + if self._state is _CircuitState.CLOSED: + self._consecutive_failures += 1 + if self._consecutive_failures >= self._failure_threshold: + self._open(request, failures=self._consecutive_failures) + elif self._state is _CircuitState.HALF_OPEN: + self._open(request, failures=1) # 1 = the single probe failure that re-opened the circuit + + def release_probe(self, role: str) -> None: + """Release the probe slot without recording success or failure (non-counted exc).""" + if role == _ROLE_PROBE: + self._probe_in_flight = False + + def _open(self, request: httpx2.Request, *, failures: int) -> None: + self._state = _CircuitState.OPEN + self._opened_at = self._now() + self._consecutive_failures = 0 + self._consecutive_successes = 0 + self._emit( + request, + "circuit.opened", + logging.WARNING, + "circuit opened — failure threshold reached", + {"failure_threshold": self._failure_threshold, "failures": failures}, + ) + + def _emit( + self, + request: httpx2.Request, + event_name: str, + level: int, + message: str, + attributes: dict[str, typing.Any], + ) -> None: + _emit_event( + _LOGGER, + event_name, + level=level, + message=message, + attributes={**attributes, "method": request.method, "url": str(request.url)}, + ) + + +class AsyncCircuitBreaker: + """Async classic circuit breaker middleware. See the module docstring for the contract.""" + + def __init__( + self, + *, + failure_threshold: int = 5, + reset_timeout: float = 30.0, + success_threshold: int = 1, + failure_status_codes: Collection[int] | None = None, + _now: Callable[[], float] = time.monotonic, + ) -> None: + self._state = _CircuitBreakerState( + failure_threshold=failure_threshold, + reset_timeout=reset_timeout, + success_threshold=success_threshold, + failure_status_codes=failure_status_codes, + now=_now, + ) + self._loop: asyncio.AbstractEventLoop | None = None + self._loop_lock = threading.Lock() + + def _check_loop(self) -> None: + current = asyncio.get_running_loop() + cached = self._loop + if cached is current: + return + if cached is not None: + raise RuntimeError(_CROSS_LOOP_MSG.format(first=cached, current=current)) + with self._loop_lock: + if self._loop is None: + self._loop = current + # pragma below: inner double-check-with-lock race arm; only reachable when + # two threads simultaneously pass the outer check, which single-threaded + # tests can't trigger. + elif self._loop is not current: # pragma: no cover + raise RuntimeError(_CROSS_LOOP_MSG.format(first=self._loop, current=current)) + + async def __call__(self, request: httpx2.Request, next: AsyncNext) -> httpx2.Response: # noqa: A002 + """Admit, forward, then record the outcome. Fast-fail when the circuit is not closed.""" + self._check_loop() + role = self._state.admit(request) + try: + response = await next(request) + except StatusError as exc: + if self._state.is_failure_status(exc.response.status_code): + self._state.on_failure(role, request) + else: + self._state.on_success(role, request) + raise + except (NetworkError, TimeoutError): + self._state.on_failure(role, request) + raise + except BaseException: + self._state.release_probe(role) + raise + self._state.on_success(role, request) + return response + + +class CircuitBreaker: + """Sync classic circuit breaker middleware. Mirror of AsyncCircuitBreaker. + + Serializes every state transition with a threading.Lock. Sharable across Clients + (one shared circuit); a sync instance cannot be shared with an AsyncClient. + """ + + def __init__( + self, + *, + failure_threshold: int = 5, + reset_timeout: float = 30.0, + success_threshold: int = 1, + failure_status_codes: Collection[int] | None = None, + _now: Callable[[], float] = time.monotonic, + ) -> None: + self._state = _CircuitBreakerState( + failure_threshold=failure_threshold, + reset_timeout=reset_timeout, + success_threshold=success_threshold, + failure_status_codes=failure_status_codes, + now=_now, + ) + self._lock = threading.Lock() + + def __call__(self, request: httpx2.Request, next: Next) -> httpx2.Response: # noqa: A002 + """Admit, forward, then record the outcome. Fast-fail when the circuit is not closed.""" + with self._lock: + role = self._state.admit(request) + try: + response = next(request) + except StatusError as exc: + with self._lock: + if self._state.is_failure_status(exc.response.status_code): + self._state.on_failure(role, request) + else: + self._state.on_success(role, request) + raise + except (NetworkError, TimeoutError): + with self._lock: + self._state.on_failure(role, request) + raise + except BaseException: + with self._lock: + self._state.release_probe(role) + raise + with self._lock: + self._state.on_success(role, request) + return response diff --git a/src/httpware/middleware/resilience/timeout.py b/src/httpware/middleware/resilience/timeout.py new file mode 100644 index 0000000..b404f80 --- /dev/null +++ b/src/httpware/middleware/resilience/timeout.py @@ -0,0 +1,75 @@ +"""AsyncTimeout middleware — overall wall-clock deadline across the inner pipeline. + +This is NOT a per-call timeout — httpx2's connect/read/write/pool timeouts are the +right tool for bounding a single outbound call, and AsyncTimeout does not duplicate +them. What httpx2 cannot bound is the total wall-clock across the whole middleware +pipeline (most importantly across an AsyncRetry loop, whose attempts and backoff +sleeps it knows nothing about). Place AsyncTimeout outermost to enforce +"this whole operation must finish within `timeout` seconds, even across retries." + +Async-only by design: a sync total-deadline cannot interrupt a blocking httpx2 call +mid-flight (sync Python has no cancellation), and httpx2 already covers sync per-call +timeouts. Sync callers configure httpx2's timeouts directly; there is no sync Timeout. +""" + +import asyncio +import logging + +import httpx2 + +from httpware._internal.observability import _emit_event +from httpware.errors import TimeoutError as HttpwareTimeoutError +from httpware.middleware import AsyncNext + + +_TIMEOUT_INVALID = "timeout must be > 0" + +_LOGGER = logging.getLogger("httpware.timeout") + + +class AsyncTimeout: + """Bounds total wall-clock time spent in the inner pipeline. + + Parameters + ---------- + timeout + Required. Overall deadline in seconds for ``next(request)`` to complete, + including everything it wraps (retries, backoff sleeps, the call itself). + Must be ``> 0``. On expiry the middleware raises ``httpware.TimeoutError``. + + Place outermost in the chain for an overall-operation deadline. For bounding a + single outbound call (connect/read/write/pool), configure ``httpx2`` instead. + + """ + + def __init__(self, *, timeout: float) -> None: + if timeout <= 0: + raise ValueError(_TIMEOUT_INVALID) + self._timeout = timeout + + async def __call__(self, request: httpx2.Request, next: AsyncNext) -> httpx2.Response: # noqa: A002 + """Invoke next under an asyncio.timeout; raise httpware.TimeoutError on expiry. + + Only a deadline THIS middleware imposed is re-wrapped: ``cm.expired()`` + distinguishes our own expiry from an inner ``TimeoutError`` (e.g. an httpx2 + per-call timeout surfacing through a retry), which propagates unchanged. + """ + try: + async with asyncio.timeout(self._timeout) as cm: + return await next(request) + except TimeoutError as exc: + if not cm.expired(): + raise # inner TimeoutError, not our deadline — leave it untouched + _emit_event( + _LOGGER, + "timeout.exceeded", + level=logging.WARNING, + message="overall timeout exceeded", + attributes={ + "timeout": self._timeout, + "method": request.method, + "url": str(request.url), + }, + ) + msg = f"overall timeout of {self._timeout}s exceeded" + raise HttpwareTimeoutError(msg) from exc diff --git a/tests/test_circuit_breaker.py b/tests/test_circuit_breaker.py new file mode 100644 index 0000000..f343ca2 --- /dev/null +++ b/tests/test_circuit_breaker.py @@ -0,0 +1,370 @@ +"""Tests for the AsyncCircuitBreaker middleware. + +Time is driven by an injected _now (a _Clock); the transport is mocked via +httpx2.MockTransport. 5xx responses surface as StatusError at the client terminal; +httpx2.ConnectError surfaces as NetworkError. +""" + +import asyncio +import logging +from collections.abc import Callable +from http import HTTPStatus + +import httpx2 +import pytest + +from httpware import ( + AsyncClient, + CircuitOpenError, + InternalServerError, + NetworkError, + NotFoundError, + RateLimitedError, + ServiceUnavailableError, +) +from httpware.middleware.resilience.circuit_breaker import AsyncCircuitBreaker + + +class _Clock: + """Manually-advanced monotonic clock for deterministic reset_timeout tests.""" + + def __init__(self) -> None: + self.t = 0.0 + + def __call__(self) -> float: + return self.t + + def advance(self, seconds: float) -> None: + self.t += seconds + + +class _StatusSequence: + """Mock-transport handler returning a fixed sequence of status codes (default 200).""" + + def __init__(self, statuses: list[int]) -> None: + self._statuses = list(statuses) + self.calls = 0 + + def __call__(self, request: httpx2.Request) -> httpx2.Response: + self.calls += 1 + status = self._statuses.pop(0) if self._statuses else HTTPStatus.OK + return httpx2.Response(status, request=request) + + +def _client( + handler: Callable[[httpx2.Request], httpx2.Response], + *, + breaker: AsyncCircuitBreaker, +) -> AsyncClient: + return AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=httpx2.MockTransport(handler)), + middleware=[breaker], + ) + + +# ── construction validation ── + + +def test_failure_threshold_below_one_rejected() -> None: + with pytest.raises(ValueError, match="failure_threshold must be >= 1"): + AsyncCircuitBreaker(failure_threshold=0) + + +def test_negative_reset_timeout_rejected() -> None: + with pytest.raises(ValueError, match="reset_timeout must be >= 0"): + AsyncCircuitBreaker(reset_timeout=-1.0) + + +def test_success_threshold_below_one_rejected() -> None: + with pytest.raises(ValueError, match="success_threshold must be >= 1"): + AsyncCircuitBreaker(success_threshold=0) + + +# ── closed-state behavior ── + + +async def test_closed_passes_through() -> None: + handler = _StatusSequence([HTTPStatus.OK]) + breaker = AsyncCircuitBreaker(failure_threshold=3, _now=_Clock()) + async with _client(handler, breaker=breaker) as client: + response = await client.get("https://example.test/x") + assert response.status_code == HTTPStatus.OK + assert handler.calls == 1 + + +async def test_consecutive_failures_open_the_circuit() -> None: + handler = _StatusSequence([500, 500, 500]) + breaker = AsyncCircuitBreaker(failure_threshold=3, _now=_Clock()) + async with _client(handler, breaker=breaker) as client: + for _ in range(3): + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") + with pytest.raises(CircuitOpenError) as info: + await client.get("https://example.test/x") + assert handler.calls == 3 # noqa: PLR2004 — 4th was short-circuited + assert info.value.retry_after is not None + # (circuit.opened event asserted in test_open_emits_opened_event_and_rejects) + + +async def test_open_emits_opened_event_and_rejects(caplog: pytest.LogCaptureFixture) -> None: + handler = _StatusSequence([500, 500]) + breaker = AsyncCircuitBreaker(failure_threshold=2, _now=_Clock()) + async with _client(handler, breaker=breaker) as client: + with caplog.at_level(logging.WARNING, logger="httpware.circuit_breaker"): + for _ in range(2): + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") + with pytest.raises(CircuitOpenError): + await client.get("https://example.test/y") + records = [r for r in caplog.records if r.name == "httpware.circuit_breaker"] + opened = [r for r in records if "opened" in r.message] + rejected = [r for r in records if "rejecting" in r.message] + assert len(opened) == 1 + assert opened[0].failure_threshold == 2 # noqa: PLR2004 # ty: ignore[unresolved-attribute] + assert opened[0].failures == 2 # noqa: PLR2004 # ty: ignore[unresolved-attribute] + assert len(rejected) == 1 + assert rejected[0].retry_after is not None # ty: ignore[unresolved-attribute] + assert rejected[0].method == "GET" # ty: ignore[unresolved-attribute] + + +async def test_success_resets_failure_streak() -> None: + handler = _StatusSequence([500, 500, 200, 500, 500]) + breaker = AsyncCircuitBreaker(failure_threshold=3, _now=_Clock()) + async with _client(handler, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") + await client.get("https://example.test/x") # 200 resets the streak + for _ in range(2): + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") + response = await client.get("https://example.test/x") # 6th -> default 200 + assert response.status_code == HTTPStatus.OK + assert handler.calls == 6 # noqa: PLR2004 — 2 failures + 1 success + 2 failures + 1 success + + +async def test_404_and_429_do_not_count_as_failures() -> None: + handler = _StatusSequence([404, 429, 404, 429, 404]) + breaker = AsyncCircuitBreaker(failure_threshold=2, _now=_Clock()) + async with _client(handler, breaker=breaker) as client: + for _ in range(5): + with pytest.raises((NotFoundError, RateLimitedError)): + await client.get("https://example.test/x") + assert handler.calls == 5 # noqa: PLR2004 — never opened, all five reached the transport + + +async def test_network_error_counts_as_failure() -> None: + def _raise(request: httpx2.Request) -> httpx2.Response: # noqa: ARG001 + msg = "connect failed" + raise httpx2.ConnectError(msg) + + breaker = AsyncCircuitBreaker(failure_threshold=2, _now=_Clock()) + async with _client(_raise, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(NetworkError): + await client.get("https://example.test/x") + with pytest.raises(CircuitOpenError): + await client.get("https://example.test/x") + + +async def test_custom_failure_status_codes_trips_on_member() -> None: + """A status code in a custom failure set trips the breaker (plain set accepted).""" + handler = _StatusSequence([503, 503]) + breaker = AsyncCircuitBreaker( + failure_threshold=2, + failure_status_codes={503}, # a plain set — any Collection[int] is accepted + _now=_Clock(), + ) + async with _client(handler, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(ServiceUnavailableError): + await client.get("https://example.test/x") + with pytest.raises(CircuitOpenError): + await client.get("https://example.test/x") + assert handler.calls == 2 # noqa: PLR2004 + + +async def test_custom_failure_status_codes_excludes_other_5xx() -> None: + """With a custom set of {503}, a 500 response is NOT a failure — it counts as success.""" + handler = _StatusSequence([500, 500, 500, 500]) + breaker = AsyncCircuitBreaker( + failure_threshold=2, + failure_status_codes=[503], # a list, too — frozen internally + _now=_Clock(), + ) + async with _client(handler, breaker=breaker) as client: + for _ in range(4): + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") + assert handler.calls == 4 # noqa: PLR2004 # 500 not in custom set -> never opened + + +async def test_non_counted_exception_propagates_without_state_change() -> None: + """A ValueError from inner middleware is neither success nor failure; state unchanged.""" + + class _Boom: + async def __call__(self, request: httpx2.Request, next: object) -> httpx2.Response: # noqa: A002,ARG002 + msg = "boom" + raise ValueError(msg) + + handler = _StatusSequence([200]) + breaker = AsyncCircuitBreaker(failure_threshold=1, _now=_Clock()) + client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=httpx2.MockTransport(handler)), + middleware=[breaker, _Boom()], + ) + async with client: + # failure_threshold=1, but ValueError is never counted -> circuit stays CLOSED. + # Each call raises ValueError (NOT CircuitOpenError), proving no state change. + for _ in range(3): + with pytest.raises(ValueError, match="boom"): + await client.get("https://example.test/x") + + +async def test_non_counted_exception_in_probe_releases_slot() -> None: + """A non-counted exception during the probe releases the probe slot. + + The circuit stays OPEN (probe didn't succeed/fail), and the next request + after reset_timeout can take the probe slot again. + """ + clock = _Clock() + + class _Boom: + async def __call__(self, request: httpx2.Request, next: object) -> httpx2.Response: # noqa: A002,ARG002 + msg = "boom" + raise ValueError(msg) + + open_handler = _StatusSequence([500]) + breaker = AsyncCircuitBreaker(failure_threshold=1, reset_timeout=5.0, _now=clock) + async with _client(open_handler, breaker=breaker) as opener: + with pytest.raises(InternalServerError): + await opener.get("https://example.test/x") + # Circuit is OPEN. Advance time to allow a probe. + clock.advance(5.0) + + boom_client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=httpx2.MockTransport(open_handler)), + middleware=[breaker, _Boom()], + ) + async with boom_client: + # First call after timeout: probe slot taken, but _Boom raises ValueError. + # release_probe is called, clearing probe_in_flight. + with pytest.raises(ValueError, match="boom"): + await boom_client.get("https://example.test/probe") + # Circuit is still OPEN (probe neither succeeded nor failed). + # Advance again and try a second probe — this time without the boom middleware. + # Use a fresh good client to verify the probe slot was released. + good_handler = _StatusSequence([200]) + good_client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=httpx2.MockTransport(good_handler)), + middleware=[breaker], + ) + async with good_client: + # clock hasn't advanced further, but reset_timeout was met earlier. + # After probe slot was released, admit should allow a new probe. + response = await good_client.get("https://example.test/x") + assert response.status_code == HTTPStatus.OK + + +# ── half-open / reset_timeout ── + + +async def test_reset_timeout_admits_probe_then_closes(caplog: pytest.LogCaptureFixture) -> None: + clock = _Clock() + handler = _StatusSequence([500, 500, 200]) # 2 fails -> open; probe (3rd) -> 200 -> close + breaker = AsyncCircuitBreaker(failure_threshold=2, reset_timeout=30.0, success_threshold=1, _now=clock) + async with _client(handler, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") + with pytest.raises(CircuitOpenError): # OPEN, before reset_timeout -> rejected + await client.get("https://example.test/x") + assert handler.calls == 2 # noqa: PLR2004 — 2 failures, 3rd rejected + clock.advance(30.0) + with caplog.at_level(logging.INFO, logger="httpware.circuit_breaker"): + response = await client.get("https://example.test/x") # probe -> 200 -> CLOSED + assert response.status_code == HTTPStatus.OK + assert handler.calls == 3 # noqa: PLR2004 — 2 failures + 1 probe + messages = [r.message for r in caplog.records if r.name == "httpware.circuit_breaker"] + assert any("half-open" in m for m in messages) + assert any("closed" in m for m in messages) + + +async def test_probe_failure_reopens_circuit() -> None: + clock = _Clock() + handler = _StatusSequence([500, 500, 500]) # open after 2; probe (3rd) fails -> reopen + breaker = AsyncCircuitBreaker(failure_threshold=2, reset_timeout=10.0, _now=clock) + async with _client(handler, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") + clock.advance(10.0) + with pytest.raises(InternalServerError): # probe runs, fails + await client.get("https://example.test/x") + with pytest.raises(CircuitOpenError): # reopened; immediate retry rejected + await client.get("https://example.test/x") + assert handler.calls == 3 # noqa: PLR2004 — 2 failures + 1 probe-failure + + +async def test_success_threshold_requires_multiple_probes() -> None: + clock = _Clock() + handler = _StatusSequence([500, 500, 200, 200]) # open; then 2 successful probes to close + breaker = AsyncCircuitBreaker(failure_threshold=2, reset_timeout=5.0, success_threshold=2, _now=clock) + async with _client(handler, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") + clock.advance(5.0) + await client.get("https://example.test/x") # probe 1 -> 200 (still HALF_OPEN, 1/2) + await client.get("https://example.test/x") # probe 2 -> 200 -> CLOSED + response = await client.get("https://example.test/x") # default 200, CLOSED + assert response.status_code == HTTPStatus.OK + assert handler.calls == 5 # noqa: PLR2004 — 2 failures + 2 probes + 1 closed call + + +async def test_half_open_second_concurrent_request_rejected_with_none_retry_after() -> None: + """While the single probe is in flight, a concurrent request fast-fails (retry_after=None).""" + clock = _Clock() + probe_started = asyncio.Event() + release_probe = asyncio.Event() + + async def _handler_async(request: httpx2.Request) -> httpx2.Response: + probe_started.set() + await release_probe.wait() + return httpx2.Response(HTTPStatus.OK, request=request) + + breaker = AsyncCircuitBreaker(failure_threshold=1, reset_timeout=1.0, _now=clock) + open_handler = _StatusSequence([500]) + async with _client(open_handler, breaker=breaker) as opener: + with pytest.raises(InternalServerError): + await opener.get("https://example.test/x") + clock.advance(1.0) + + client = AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=httpx2.MockTransport(_handler_async)), + middleware=[breaker], + ) + async with client: + probe_task = asyncio.create_task(client.get("https://example.test/probe")) + await probe_started.wait() # probe in flight, HALF_OPEN + with pytest.raises(CircuitOpenError) as info: + await client.get("https://example.test/concurrent") + assert info.value.retry_after is None + release_probe.set() + await probe_task + + +# ── single-event-loop guard ── + + +def test_cross_loop_use_raises_runtimeerror() -> None: + breaker = AsyncCircuitBreaker(_now=_Clock()) + handler = _StatusSequence([200]) + + async def _run_once() -> None: + async with _client(handler, breaker=breaker) as client: + await client.get("https://example.test/x") + + asyncio.run(_run_once()) # binds to loop L1 + with pytest.raises(RuntimeError, match="bound to a single event loop"): + asyncio.run(_run_once()) diff --git a/tests/test_circuit_breaker_props.py b/tests/test_circuit_breaker_props.py new file mode 100644 index 0000000..286d4ce --- /dev/null +++ b/tests/test_circuit_breaker_props.py @@ -0,0 +1,68 @@ +"""Property test: while OPEN and before reset_timeout, the breaker never forwards. + +Drives the AsyncCircuitBreaker directly with a stub `next` that records calls. +Hypothesis generates random advance/outcome sequences. Time is injected via a clock. +""" + +import httpx2 +import pytest +from hypothesis import given, settings +from hypothesis import strategies as st + +from httpware import CircuitOpenError, InternalServerError +from httpware.middleware.resilience.circuit_breaker import AsyncCircuitBreaker + + +class _Clock: + def __init__(self) -> None: + self.t = 0.0 + + def __call__(self) -> float: + return self.t + + +def _request() -> httpx2.Request: + return httpx2.Request("GET", "https://example.test/x") + + +@given( + failure_threshold=st.integers(min_value=1, max_value=5), + reset_timeout=st.floats(min_value=1.0, max_value=100.0), + fractions=st.lists(st.floats(min_value=0.0, max_value=0.99), min_size=1, max_size=20), +) +@settings(max_examples=50, deadline=None) +async def test_open_circuit_never_forwards_before_reset_timeout( + failure_threshold: int, + reset_timeout: float, + fractions: list[float], +) -> None: + clock = _Clock() + breaker = AsyncCircuitBreaker( + failure_threshold=failure_threshold, + reset_timeout=reset_timeout, + _now=clock, + ) + forwarded = 0 + + async def _ok(request: httpx2.Request) -> httpx2.Response: + nonlocal forwarded + forwarded += 1 # pragma: no cover — invariant: OPEN never forwards, so this never runs + return httpx2.Response(200, request=request) # pragma: no cover + + async def _five_hundred(request: httpx2.Request) -> httpx2.Response: + raise InternalServerError(httpx2.Response(500, request=request)) + + # Open the circuit while the clock reads 0.0, so opened_at == 0.0. + for _ in range(failure_threshold): + with pytest.raises(InternalServerError): + await breaker(_request(), _five_hundred) + + # Now OPEN. Probe at clock times strictly below reset_timeout (fraction <= 0.99), so + # elapsed = clock.t - 0.0 < reset_timeout and every request is rejected without ever + # forwarding to _ok. No conditional branch here — coverage is deterministic across examples. + for fraction in fractions: + clock.t = fraction * reset_timeout + with pytest.raises(CircuitOpenError): + await breaker(_request(), _ok) + + assert forwarded == 0 # `next` (_ok) was never called while OPEN pre-timeout diff --git a/tests/test_circuit_breaker_sync.py b/tests/test_circuit_breaker_sync.py new file mode 100644 index 0000000..a87f722 --- /dev/null +++ b/tests/test_circuit_breaker_sync.py @@ -0,0 +1,271 @@ +"""Tests for the sync CircuitBreaker middleware (mirror of AsyncCircuitBreaker).""" + +import logging +import threading +from collections.abc import Callable +from http import HTTPStatus + +import httpx2 +import pytest + +from httpware import ( + CircuitOpenError, + Client, + InternalServerError, + NetworkError, + NotFoundError, + RateLimitedError, + ServiceUnavailableError, +) +from httpware.middleware.resilience.circuit_breaker import CircuitBreaker + + +class _Clock: + def __init__(self) -> None: + self.t = 0.0 + + def __call__(self) -> float: + return self.t + + def advance(self, seconds: float) -> None: + self.t += seconds + + +class _StatusSequence: + def __init__(self, statuses: list[int]) -> None: + self._statuses = list(statuses) + self.calls = 0 + + def __call__(self, request: httpx2.Request) -> httpx2.Response: + self.calls += 1 + status = self._statuses.pop(0) if self._statuses else HTTPStatus.OK + return httpx2.Response(status, request=request) + + +def _client(handler: Callable[[httpx2.Request], httpx2.Response], *, breaker: CircuitBreaker) -> Client: + return Client( + httpx2_client=httpx2.Client(transport=httpx2.MockTransport(handler)), + middleware=[breaker], + ) + + +def test_failure_threshold_below_one_rejected() -> None: + with pytest.raises(ValueError, match="failure_threshold must be >= 1"): + CircuitBreaker(failure_threshold=0) + + +def test_negative_reset_timeout_rejected() -> None: + with pytest.raises(ValueError, match="reset_timeout must be >= 0"): + CircuitBreaker(reset_timeout=-1.0) + + +def test_success_threshold_below_one_rejected() -> None: + with pytest.raises(ValueError, match="success_threshold must be >= 1"): + CircuitBreaker(success_threshold=0) + + +def test_closed_passes_through() -> None: + handler = _StatusSequence([HTTPStatus.OK]) + breaker = CircuitBreaker(failure_threshold=3, _now=_Clock()) + with _client(handler, breaker=breaker) as client: + response = client.get("https://example.test/x") + assert response.status_code == HTTPStatus.OK + assert handler.calls == 1 + + +def test_open_emits_opened_event_and_rejects(caplog: pytest.LogCaptureFixture) -> None: + handler = _StatusSequence([500, 500]) + breaker = CircuitBreaker(failure_threshold=2, _now=_Clock()) + with ( + _client(handler, breaker=breaker) as client, + caplog.at_level(logging.WARNING, logger="httpware.circuit_breaker"), + ): + for _ in range(2): + with pytest.raises(InternalServerError): + client.get("https://example.test/x") + with pytest.raises(CircuitOpenError) as info: + client.get("https://example.test/y") + assert info.value.retry_after is not None + assert handler.calls == 2 # noqa: PLR2004 + records = [r for r in caplog.records if r.name == "httpware.circuit_breaker"] + opened = [r for r in records if "opened" in r.message] + rejected = [r for r in records if "rejecting" in r.message] + assert len(opened) == 1 + assert opened[0].failure_threshold == 2 # noqa: PLR2004 # ty: ignore[unresolved-attribute] + assert opened[0].failures == 2 # noqa: PLR2004 # ty: ignore[unresolved-attribute] + assert len(rejected) == 1 + assert rejected[0].retry_after is not None # ty: ignore[unresolved-attribute] + assert rejected[0].method == "GET" # ty: ignore[unresolved-attribute] + + +def test_success_resets_failure_streak() -> None: + handler = _StatusSequence([500, 500, 200, 500, 500]) + breaker = CircuitBreaker(failure_threshold=3, _now=_Clock()) + with _client(handler, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(InternalServerError): + client.get("https://example.test/x") + client.get("https://example.test/x") + for _ in range(2): + with pytest.raises(InternalServerError): + client.get("https://example.test/x") + response = client.get("https://example.test/x") + assert response.status_code == HTTPStatus.OK + assert handler.calls == 6 # noqa: PLR2004 — 2 failures + 1 success + 2 failures + 1 success + + +def test_404_and_429_do_not_count_as_failures() -> None: + handler = _StatusSequence([404, 429, 404, 429, 404]) + breaker = CircuitBreaker(failure_threshold=2, _now=_Clock()) + with _client(handler, breaker=breaker) as client: + for _ in range(5): + with pytest.raises((NotFoundError, RateLimitedError)): + client.get("https://example.test/x") + assert handler.calls == 5 # noqa: PLR2004 — never opened, all five reached the transport + + +def test_network_error_counts_as_failure() -> None: + def _raise(request: httpx2.Request) -> httpx2.Response: # noqa: ARG001 + msg = "connect failed" + raise httpx2.ConnectError(msg) + + breaker = CircuitBreaker(failure_threshold=2, _now=_Clock()) + with _client(_raise, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(NetworkError): + client.get("https://example.test/x") + with pytest.raises(CircuitOpenError): + client.get("https://example.test/x") + + +def test_custom_failure_status_codes_trips_on_member() -> None: + handler = _StatusSequence([503, 503]) + breaker = CircuitBreaker(failure_threshold=2, failure_status_codes={503}, _now=_Clock()) # plain set accepted + with _client(handler, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(ServiceUnavailableError): + client.get("https://example.test/x") + with pytest.raises(CircuitOpenError): + client.get("https://example.test/x") + assert handler.calls == 2 # noqa: PLR2004 + + +def test_custom_failure_status_codes_excludes_other_5xx() -> None: + handler = _StatusSequence([500, 500, 500, 500]) + breaker = CircuitBreaker(failure_threshold=2, failure_status_codes=[503], _now=_Clock()) # list accepted too + with _client(handler, breaker=breaker) as client: + for _ in range(4): + with pytest.raises(InternalServerError): + client.get("https://example.test/x") + assert handler.calls == 4 # noqa: PLR2004 # 500 not in custom set -> never opened + + +def test_non_counted_exception_propagates_without_state_change() -> None: + class _Boom: + def __call__(self, request: httpx2.Request, next: object) -> httpx2.Response: # noqa: A002,ARG002 + msg = "boom" + raise ValueError(msg) + + handler = _StatusSequence([200]) + breaker = CircuitBreaker(failure_threshold=1, _now=_Clock()) + client = Client( + httpx2_client=httpx2.Client(transport=httpx2.MockTransport(handler)), + middleware=[breaker, _Boom()], + ) + with client: + for _ in range(3): + with pytest.raises(ValueError, match="boom"): + client.get("https://example.test/x") + + +def test_reset_timeout_admits_probe_then_closes(caplog: pytest.LogCaptureFixture) -> None: + clock = _Clock() + handler = _StatusSequence([500, 500, 200]) + breaker = CircuitBreaker(failure_threshold=2, reset_timeout=30.0, success_threshold=1, _now=clock) + with _client(handler, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(InternalServerError): + client.get("https://example.test/x") + with pytest.raises(CircuitOpenError): + client.get("https://example.test/x") + assert handler.calls == 2 # noqa: PLR2004 + clock.advance(30.0) + with caplog.at_level(logging.INFO, logger="httpware.circuit_breaker"): + response = client.get("https://example.test/x") + assert response.status_code == HTTPStatus.OK + assert handler.calls == 3 # noqa: PLR2004 + messages = [r.message for r in caplog.records if r.name == "httpware.circuit_breaker"] + assert any("half-open" in m for m in messages) + assert any("closed" in m for m in messages) + + +def test_probe_failure_reopens_circuit() -> None: + clock = _Clock() + handler = _StatusSequence([500, 500, 500]) + breaker = CircuitBreaker(failure_threshold=2, reset_timeout=10.0, _now=clock) + with _client(handler, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(InternalServerError): + client.get("https://example.test/x") + clock.advance(10.0) + with pytest.raises(InternalServerError): + client.get("https://example.test/x") + with pytest.raises(CircuitOpenError): + client.get("https://example.test/x") + assert handler.calls == 3 # noqa: PLR2004 + + +def test_success_threshold_requires_multiple_probes() -> None: + clock = _Clock() + handler = _StatusSequence([500, 500, 200, 200]) + breaker = CircuitBreaker(failure_threshold=2, reset_timeout=5.0, success_threshold=2, _now=clock) + with _client(handler, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(InternalServerError): + client.get("https://example.test/x") + clock.advance(5.0) + client.get("https://example.test/x") # probe 1 -> 200 (HALF_OPEN 1/2) + client.get("https://example.test/x") # probe 2 -> 200 -> CLOSED + response = client.get("https://example.test/x") # default 200, CLOSED + assert response.status_code == HTTPStatus.OK + assert handler.calls == 5 # noqa: PLR2004 — 2 failures + 2 probes + 1 CLOSED + + +def test_half_open_second_concurrent_request_rejected_with_none_retry_after() -> None: + """Two threads hit a half-open breaker; exactly one is the probe, the other is rejected.""" + clock = _Clock() + probe_started = threading.Event() + release_probe = threading.Event() + + def _handler(request: httpx2.Request) -> httpx2.Response: + probe_started.set() + release_probe.wait(timeout=5.0) + return httpx2.Response(HTTPStatus.OK, request=request) + + breaker = CircuitBreaker(failure_threshold=1, reset_timeout=1.0, _now=clock) + open_handler = _StatusSequence([500]) + with _client(open_handler, breaker=breaker) as opener, pytest.raises(InternalServerError): + opener.get("https://example.test/x") + clock.advance(1.0) + + client = Client( + httpx2_client=httpx2.Client(transport=httpx2.MockTransport(_handler)), + middleware=[breaker], + ) + rejected: list[CircuitOpenError] = [] + + def _probe() -> None: + client.get("https://example.test/probe") + + with client: + thread = threading.Thread(target=_probe) + thread.start() + assert probe_started.wait(timeout=5.0) + with pytest.raises(CircuitOpenError) as info: + client.get("https://example.test/concurrent") + rejected.append(info.value) + release_probe.set() + thread.join(timeout=5.0) + assert not thread.is_alive() + + assert rejected[0].retry_after is None diff --git a/tests/test_errors.py b/tests/test_errors.py index 4a48b0f..2e706d5 100644 --- a/tests/test_errors.py +++ b/tests/test_errors.py @@ -12,6 +12,7 @@ STATUS_TO_EXCEPTION, BadRequestError, BulkheadFullError, + CircuitOpenError, ClientError, ClientStatusError, ConflictError, @@ -106,6 +107,7 @@ def test_status_error_repr_strips_userinfo() -> None: _NOT_FOUND = 404 +_RETRY_AFTER_2_5 = 2.5 _RETRY_ATTEMPTS_3 = 3 _RETRY_ATTEMPTS_2 = 2 _RETRY_ATTEMPTS_5 = 5 @@ -357,3 +359,38 @@ def test_missing_decoder_error_pickle_roundtrip() -> None: assert isinstance(revived, MissingDecoderError) assert revived.model is _Foo assert revived.registered_names == ("PydanticDecoder", "MsgspecDecoder") + + +def test_circuit_open_error_is_client_error() -> None: + exc = CircuitOpenError(retry_after=_RETRY_AFTER_2_5) + assert isinstance(exc, ClientError) + assert exc.retry_after == _RETRY_AFTER_2_5 + + +def test_circuit_open_error_accepts_none_retry_after() -> None: + exc = CircuitOpenError(retry_after=None) + assert exc.retry_after is None + + +def test_circuit_open_error_summary_with_retry_after() -> None: + exc = CircuitOpenError(retry_after=_RETRY_AFTER_2_5) + assert str(exc) == "circuit open (retry_after=2.500s)" + + +def test_circuit_open_error_summary_with_none_retry_after() -> None: + exc = CircuitOpenError(retry_after=None) + assert str(exc) == "circuit open (a probe request is already in flight)" + + +def test_circuit_open_error_pickleable_with_float() -> None: + exc = CircuitOpenError(retry_after=_RETRY_AFTER_2_5) + restored = pickle.loads(pickle.dumps(exc)) # noqa: S301 + assert isinstance(restored, CircuitOpenError) + assert restored.retry_after == _RETRY_AFTER_2_5 + + +def test_circuit_open_error_pickleable_with_none() -> None: + exc = CircuitOpenError(retry_after=None) + restored = pickle.loads(pickle.dumps(exc)) # noqa: S301 + assert isinstance(restored, CircuitOpenError) + assert restored.retry_after is None diff --git a/tests/test_public_api.py b/tests/test_public_api.py index 1f1426e..97a8b9a 100644 --- a/tests/test_public_api.py +++ b/tests/test_public_api.py @@ -29,13 +29,17 @@ def test_no_removed_symbols_leaked() -> None: def test_expected_exports() -> None: expected = { "AsyncBulkhead", + "AsyncCircuitBreaker", "AsyncClient", "AsyncMiddleware", "AsyncNext", "AsyncRetry", + "AsyncTimeout", "BadRequestError", "Bulkhead", "BulkheadFullError", + "CircuitBreaker", + "CircuitOpenError", "Client", "ClientError", "ClientStatusError", diff --git a/tests/test_timeout.py b/tests/test_timeout.py new file mode 100644 index 0000000..51bd707 --- /dev/null +++ b/tests/test_timeout.py @@ -0,0 +1,94 @@ +"""Tests for the AsyncTimeout middleware. + +Calls the middleware directly with an injected `next` callable. Expiry tests use a +tiny timeout against a long sleep (large margin -> not wall-clock flaky); the +inner-timeout test raises immediately so no real time passes. +""" + +import asyncio +import builtins +import logging + +import httpx2 +import pytest + +from httpware.errors import TimeoutError as HttpwareTimeoutError +from httpware.middleware.resilience.timeout import AsyncTimeout + + +def _request() -> httpx2.Request: + return httpx2.Request("GET", "https://example.test/x") + + +async def test_passes_through_response_when_under_budget() -> None: + async def _next(request: httpx2.Request) -> httpx2.Response: + return httpx2.Response(200, request=request) + + middleware = AsyncTimeout(timeout=10.0) + response = await middleware(_request(), _next) + assert response.status_code == 200 # noqa: PLR2004 + + +async def test_expiry_raises_httpware_timeout_chained_from_builtin( + caplog: pytest.LogCaptureFixture, +) -> None: + async def _next(request: httpx2.Request) -> httpx2.Response: + await asyncio.sleep(5.0) + return httpx2.Response(200, request=request) # pragma: no cover — deadline fires first + + middleware = AsyncTimeout(timeout=0.01) + with ( + caplog.at_level(logging.WARNING, logger="httpware.timeout"), + pytest.raises(HttpwareTimeoutError) as info, + ): + await middleware(_request(), _next) + + assert "overall timeout of 0.01s exceeded" in str(info.value) + assert isinstance(info.value.__cause__, builtins.TimeoutError) + + records = [r for r in caplog.records if r.name == "httpware.timeout"] + assert len(records) == 1 + assert records[0].levelno == logging.WARNING + assert records[0].timeout == 0.01 # noqa: PLR2004 # ty: ignore[unresolved-attribute] + assert records[0].method == "GET" # ty: ignore[unresolved-attribute] + assert "example.test/x" in records[0].url # ty: ignore[unresolved-attribute] + + +async def test_inner_timeout_propagates_unchanged() -> None: + """A TimeoutError from next (not our deadline) is re-raised untouched.""" + + async def _next(_request: httpx2.Request) -> httpx2.Response: + msg = "inner read timeout" + raise HttpwareTimeoutError(msg) + + middleware = AsyncTimeout(timeout=10.0) + with pytest.raises(HttpwareTimeoutError) as info: + await middleware(_request(), _next) + + assert "inner read timeout" in str(info.value) + assert "overall timeout" not in str(info.value) + + +async def test_raw_builtin_timeout_from_next_propagates_by_identity() -> None: + """A raw builtins.TimeoutError from next (e.g. a nested asyncio.timeout) is re-raised as-is.""" + inner = builtins.TimeoutError("nested asyncio timeout") + + async def _next(_request: httpx2.Request) -> httpx2.Response: + raise inner + + middleware = AsyncTimeout(timeout=10.0) + with pytest.raises(builtins.TimeoutError) as info: + await middleware(_request(), _next) + + assert info.value is inner # propagated by identity, not re-wrapped + assert "overall timeout" not in str(info.value) + + +def test_zero_timeout_rejected() -> None: + with pytest.raises(ValueError, match="timeout must be > 0"): + AsyncTimeout(timeout=0) + + +def test_negative_timeout_rejected() -> None: + with pytest.raises(ValueError, match="timeout must be > 0"): + AsyncTimeout(timeout=-1.0)