diff --git a/README.md b/README.md index a927116..f5ce6d9 100644 --- a/README.md +++ b/README.md @@ -112,7 +112,7 @@ It does NOT pass through the middleware chain: `AsyncRetry`, `AsyncBulkhead`, an ## Errors -All 4xx/5xx responses raise typed exceptions automatically: `NotFoundError`, `ServiceUnavailableError`, `RateLimitedError`, etc. — all subclasses of `httpware.StatusError`. Transport-layer transient failures raise `NetworkError`; the resilience middleware raise `RetryBudgetExhaustedError` and `BulkheadFullError`. Everything inherits `httpware.ClientError`. +All 4xx/5xx responses raise typed exceptions automatically: `NotFoundError`, `ServiceUnavailableError`, `RateLimitedError`, etc. — all subclasses of `httpware.StatusError`. Transport-layer transient failures raise `NetworkError`; the resilience middleware raise `RetryBudgetExhaustedError`, `BulkheadFullError`, and `CircuitOpenError`. Everything inherits `httpware.ClientError`. ## Observability @@ -126,7 +126,7 @@ import logging # Enable visibility into resilience operational events logging.getLogger("httpware.retry").setLevel(logging.WARNING) logging.getLogger("httpware.bulkhead").setLevel(logging.WARNING) -logging.getLogger("httpware.circuit_breaker").setLevel(logging.WARNING) +logging.getLogger("httpware.circuit_breaker").setLevel(logging.INFO) # INFO: includes recovery events (half_open, closed) logging.getLogger("httpware.timeout").setLevel(logging.WARNING) ``` diff --git a/docs/index.md b/docs/index.md index 5ff246e..34d1a75 100644 --- a/docs/index.md +++ b/docs/index.md @@ -138,23 +138,34 @@ All errors inherit `httpware.ClientError`. The categories: - **Status errors** (4xx/5xx responses) — raised automatically, no `raise_for_status()` needed: `NotFoundError`, `RateLimitedError`, `ServiceUnavailableError`, and the rest. All subclass `StatusError`. - **Transport errors** — connection / network / protocol failures before a response arrived. `NetworkError` (transient) subclasses `TransportError`. -- **Resilience refusals** — `RetryBudgetExhaustedError` and `BulkheadFullError`, raised by the resilience middleware. +- **Resilience refusals** — `RetryBudgetExhaustedError`, `BulkheadFullError`, and `CircuitOpenError`, raised by the resilience middleware. - **Decode errors** — `DecodeError`, raised when `response_model=` decoding fails (HTTP call itself succeeded). `MissingDecoderError`, raised when no registered decoder claims the `response_model=` type — fires *before* the HTTP call. See the [Errors reference](errors.md) for the full tree and catching strategies. ## Observability -`AsyncRetry`/`Retry` and `AsyncBulkhead`/`Bulkhead` emit operational events via two channels — stdlib `logging` records (always on) and OpenTelemetry span events (when `opentelemetry-api` is installed). Event names and payloads are identical across sync and async; dashboards built against one class apply unchanged to the other. +All resilience middleware emit operational events via two channels — stdlib `logging` records (always on) and OpenTelemetry span events (when `opentelemetry-api` is installed). Event names and payloads are identical across sync and async; dashboards built against one class apply unchanged to the other. -Logger names (`httpware.retry`, `httpware.bulkhead`) and event names (`retry.giving_up`, `retry.budget_refused`, `retry.streaming_refused`, `bulkhead.rejected`) are the stable public contract. +Logger names and event names are the stable public contract: + +| Logger | Events | +|---|---| +| `httpware.retry` | `retry.giving_up`, `retry.budget_refused`, `retry.streaming_refused` | +| `httpware.bulkhead` | `bulkhead.rejected` | +| `httpware.circuit_breaker` | `circuit.opened` (WARNING), `circuit.rejected` (WARNING), `circuit.half_open` (INFO), `circuit.closed` (INFO) | +| `httpware.timeout` | `timeout.exceeded` (WARNING) | + +Each log record carries an `event` field with the event-name string (e.g. `event="circuit.opened"`), usable for log-aggregator filtering. See [resilience.md](resilience.md) for the full event tables per middleware. ```python import logging -# Enable visibility into retry / bulkhead operational events +# Enable visibility into resilience operational events logging.getLogger("httpware.retry").setLevel(logging.WARNING) logging.getLogger("httpware.bulkhead").setLevel(logging.WARNING) +logging.getLogger("httpware.circuit_breaker").setLevel(logging.INFO) # INFO for recovery events +logging.getLogger("httpware.timeout").setLevel(logging.WARNING) ``` For OTel attribute enrichment on the active span — install the extra: diff --git a/docs/resilience.md b/docs/resilience.md index 94eca70..04e413b 100644 --- a/docs/resilience.md +++ b/docs/resilience.md @@ -24,9 +24,7 @@ from httpware.middleware.resilience import AsyncRetry | `respect_retry_after` | `True` | When the response carries a `Retry-After` header on a retryable status, sleep for the header value instead of the jittered backoff. If the header value exceeds `max_delay`, AsyncRetry gives up and re-raises the underlying `StatusError` with a PEP 678 note `httpware: Retry-After (Ns) exceeded max_delay (Ms); giving up`. Set `max_delay` higher (or `respect_retry_after=False`) to opt out. | | `budget` | `RetryBudget()` (default-configured) | The token bucket. Pass a shared `RetryBudget` instance to apply one budget across multiple clients. | -For a whole-attempt wall-clock bound, use `httpx2.Timeout` on the client or -pass `timeout=` per request. `httpware` does not own a structured-cancellation -timeout knob. +For a whole-operation wall-clock bound across all retry attempts, compose `AsyncTimeout` outermost — see [AsyncTimeout](#asynctimeout) below. For a per-request bound, use `httpx2.Timeout` on the client or pass `timeout=` per request. ### Retry-After parsing @@ -156,7 +154,7 @@ Classic consecutive-failure circuit breaker. Counts failures and prevents reques ### States - **CLOSED** — normal operation. Each counted failure increments the consecutive-failure counter. Once `failure_threshold` consecutive counted failures accumulate, the circuit opens. -- **OPEN** — fast-fail. All requests are rejected immediately with `CircuitOpenError` (carrying `retry_after` seconds until the next probe window). After `reset_timeout` seconds the circuit moves to HALF_OPEN. +- **OPEN** — fast-fail. While elapsed time is below `reset_timeout`, requests are rejected immediately with `CircuitOpenError` (carrying `retry_after` seconds until the next probe window). The first request after `reset_timeout` elapses transitions the circuit to HALF_OPEN and becomes the probe. - **HALF_OPEN** — exactly one probe is admitted. If `success_threshold` consecutive probe successes are observed, the circuit closes. A single probe failure re-opens the circuit. ### Constructor @@ -317,7 +315,7 @@ from httpware.middleware.resilience import Retry `Retry` uses `time.sleep` between attempts. `Retry-After`, streaming-body refusal, exhaustion behavior, and `RetryBudgetExhaustedError` semantics are identical to `AsyncRetry`. -For a whole-attempt wall-clock bound, use `httpx2.Timeout` on the wrapped client or pass `timeout=` per request. `httpware` does not own a structured-cancellation timeout knob. +For a whole-attempt wall-clock bound, use `httpx2.Timeout` on the wrapped client or pass `timeout=` per request. No sync `Timeout` middleware exists — sync Python has no cancellation primitive that can interrupt a blocking call mid-flight. ### `Bulkhead` diff --git a/planning/audit/2026-06-13-delta-audit.md b/planning/audit/2026-06-13-delta-audit.md new file mode 100644 index 0000000..3e37f03 --- /dev/null +++ b/planning/audit/2026-06-13-delta-audit.md @@ -0,0 +1,107 @@ +# httpware delta audit — 2026-06-13 (0.10.0 circuit-breaker + async-timeout) + +**Status:** complete +**Baseline:** 0.9.1 → 0.10.0 (`2a2b541`) +**Scope:** `circuit_breaker.py`, `timeout.py`, `CircuitOpenError`, their tests, and the 0.10.0 docs. +**Method:** six adversarial finders across dimensions (concurrency, state machine, exception classification, AsyncTimeout, API/docs, test quality), then per-finding triage; the two headline production findings were reproduced directly. + +## Summary + +- Blockers: 0 +- High: 1 +- Medium: 6 +- Low: 8 +- Nits / informational: 2 + +No blockers. The state machine, async atomicity claim, sync lock coverage, cross-loop guard, `cm.expired()` discriminator, exception-clause ordering, exception chaining, pickle round-trip, export symmetry, and the property/concurrent tests' soundness were all verified **correct** (see Negative results). The headline is a **test-contract gap**, echoing the 0.9.0 audit's pattern: the five observability event-name strings are the documented stable public surface, yet **no test asserts any event name** — every test discriminates events by substring on the human-readable `message`, and `_emit_event` never puts `event_name` on the log record (it goes only to OTel `add_event`). Renaming `circuit.opened` → anything passes 100% of tests. The two real production bugs are both small and bounded: `AsyncTimeout` accepts `nan`/`inf` (the `timeout <= 0` guard is false for both), and a probe-slot/state leak if an observability emit raises mid-transition (no `finally` around the half-open/open mutation). + +## Findings + +### High + +#### H1 — Observability event names are never asserted; a rename of the stable public surface passes silently +*(test quality — verified)* + +`tests/test_circuit_breaker.py`, `tests/test_circuit_breaker_sync.py`, `tests/test_timeout.py`; root cause `src/httpware/_internal/observability.py:40`. + +`_emit_event` does `logger.log(level, message, extra=attributes)` — the `event_name` (`circuit.opened`, `circuit.rejected`, `circuit.half_open`, `circuit.closed`, `timeout.exceeded`) is **not** on the log record; it is passed only to OTel's `add_event`. Every circuit/timeout test discriminates events with substrings on `message` (`"opened" in r.message`, `any("half-open" in m …)`). The spec calls these five strings "the stable observability surface; renames are breaking changes" and claims they are asserted in the feature tests — they are not. Renaming any event string in source would not fail a single test. Verified: `event_name` is absent from the record; only `test_observability.py` ever exercises `add_event`, with a synthetic `"test.event"`. + +**Direction:** assert each event name — either mock `trace.get_current_span().add_event` and assert the name (mirroring `test_observability.py`), or add `event_name` to the structured log record's extras and assert it (the latter also lets users filter logs by event, but changes the emission shape — decide deliberately). + +### Medium + +#### M1 — `AsyncTimeout` accepts `nan` and `inf` (`timeout <= 0` guard is false for both) +*(production — reproduced)* + +`src/httpware/middleware/resilience/timeout.py:47`. `float('nan') <= 0` and `float('inf') <= 0` are both `False`, so the constructor accepts them. Reproduced: `AsyncTimeout(timeout=float('nan'))` and `(float('inf'))` are both ACCEPTED. `asyncio.timeout(nan)` fires nondeterministically (NaN breaks the timer heap ordering); `asyncio.timeout(inf)` never fires (silent no-op). A caller passing `math.inf` to mean "no limit", or `nan` from a bad config parse, gets silent misbehavior instead of a clear error. + +**Direction:** `if not math.isfinite(timeout) or timeout <= 0: raise ValueError(...)` (add `import math`). Add a test asserting `nan`/`inf` are rejected (folds in Low L8). + +#### M2 — Probe-slot / state leak if an observability emit raises mid-transition → permanent HALF_OPEN wedge +*(production — verified by trace; low probability)* + +`src/httpware/middleware/resilience/circuit_breaker.py` `admit` (OPEN→HALF_OPEN branch) and `_open`. In `admit`, `self._probe_in_flight = True` is set and then `self._emit(...)` is called — all inside `admit`, **before** `__call__` enters its `try` block, and there is no `finally` clearing the flag. `_emit` → `_emit_event` calls `trace.get_current_span().add_event(...)` outside any guard (only the OTel *import* is guarded). If `add_event` raises (a recording span with a broken exporter / attribute validation), the exception propagates out of `admit` with `_state` already HALF_OPEN and `_probe_in_flight` already `True` and nothing to reset them. The circuit then wedges permanently in HALF_OPEN — every later request takes the probe-in-flight arm and raises `CircuitOpenError(retry_after=None)` forever, even after the service recovers. The same emit-after-mutate shape in `_open` would instead mask the original failure exception. Affects both sync and async. Requires OTel installed + a recording span whose `add_event` raises, so probability is low, but the failure mode (silent permanent wedge) is severe. + +**Direction:** make state-mutating transitions resilient to observability failure — e.g. emit *before* mutating state where possible, or move the flag-set/emit so that an emit failure cannot strand the slot, or harden `_emit_event` to never propagate exceptions from `add_event` (note: shared helper — also used by retry/bulkhead). + +#### M3 — `retry_after` value and the `max(0.0, …)` clamp are never asserted +*(test quality)* + +`tests/test_circuit_breaker.py:105,126`, sync `:88,97` — `retry_after` is only checked `is not None`. With the injected `_Clock` the value is fully deterministic (open at t=0, `reset_timeout=30`, reject at t=10 ⇒ `retry_after == 20.0`). The clamp `max(0.0, reset_timeout - elapsed)` could be deleted or return wrong/negative arithmetic and every test still passes. + +**Direction:** advance the clock a known amount before the reject and assert the exact `retry_after`; add a case pinning the `0.0` floor. + +#### M4 — "429/4xx resets the failure streak" is proven only via the `200` branch +*(test quality)* + +`test_404_and_429_do_not_count_as_failures` interleaves only 404/429, so the failure counter is always already 0 — the `on_success` reset runs 0→0, a no-op. The streak-reset is proven by `test_success_resets_failure_streak` via a `200` (response-returned branch), not the StatusError-not-in-set branch. A bug routing 429 to `release_probe`/no-op instead of `on_success` would survive. Sequence `[500, 429, 500, 500]` at `failure_threshold=2` should stay CLOSED (429 resets) but would open under such a bug. + +**Direction:** add `[500, 429, 500, 500]` (threshold=2), assert it never opens (`handler.calls == 4`). + +#### M5 — `docs/index.md` Observability section omits the new loggers/events +*(docs)* + +`docs/index.md:148–158` still lists only `httpware.retry` / `httpware.bulkhead` and the `retry.*` / `bulkhead.*` events as the stable contract. `httpware.circuit_breaker` (4 events) and `httpware.timeout` (1 event) are absent. README was updated; the docs landing page was not — undercutting the "stable public contract" claim on the most-read page. + +**Direction:** extend `docs/index.md` to list all four families, or cross-reference `docs/resilience.md`. + +#### M6 — README logging example suppresses the INFO recovery events +*(docs)* + +`README.md:129` sets `httpware.circuit_breaker` to `WARNING`. `circuit.half_open` and `circuit.closed` are emitted at `INFO` (`circuit_breaker.py` `admit`/`on_success`). Users copying the snippet silently miss exactly the events that show the circuit probing and recovering — only `opened`/`rejected` (WARNING) appear. + +**Direction:** use `INFO` for `httpware.circuit_breaker` in the snippet, or comment that recovery events fire at INFO. + +### Low + +- **L1 — `docs/resilience.md` AsyncRetry section is now false** (`:28–29`): "httpware does not own a structured-cancellation timeout knob" — `AsyncTimeout` is exactly that, documented in the same file. (The sync `Retry` occurrence at `:320` remains accurate.) *(docs)* +- **L2 — `docs/index.md:141` + `README.md:115` Errors lists omit `CircuitOpenError`** from the resilience-refusal enumeration. `docs/errors.md` correctly includes it. *(docs)* +- **L3 — `docs/resilience.md:159` OPEN-state wording imprecise**: "All requests are rejected" — the first request after `reset_timeout` is admitted as the probe (lazy transition), not rejected. *(docs)* +- **L4 — `src/httpware/_internal/observability.py:5–6` docstring** lists only `httpware.retry`/`httpware.bulkhead` as stable loggers; add the two new ones. *(docs)* +- **L5 — No sync mirror of `test_non_counted_exception_in_probe_releases_slot`**: the sync wrapper's `except BaseException → release_probe(role=probe)` arm is behaviorally untested (branch coverage is satisfied only via the shared `_CircuitBreakerState` exercised by the async test). *(test quality)* +- **L6 — `success_threshold > 1` with a probe failure mid-streak is untested**: no test where probe-1 succeeds (counter→1), probe-2 fails → reopen, and a later close must re-accumulate from 0. A failure to zero `_consecutive_successes` on reopen would close one probe early, undetected. *(test quality)* +- **L7 — Reachable boundary configs untested**: `reset_timeout=0` (OPEN immediately admits a probe; the `retry_after` reject branch is unreachable) and empty `failure_status_codes` (normalizes to `frozenset()`; no status ever trips — only network/timeout do). Both are accepted at construction. *(test quality)* +- **L8 — `failures=1` on probe-reopen + reopen `circuit.opened` event unasserted**, and (after M2's fix) `nan`/`inf` rejection needs a test. *(test quality)* + +### Nits / informational + +- **N1 — Spec/impl type divergence (informational, no action):** the spec declares `failure_status_codes: frozenset[int] | None`; the impl uses `Collection[int] | None` (frozen internally). This is the deliberate ergonomics fix from the PR-review round (a code comment documents it; tests pass a plain set and a list). The spec is the outlier; no user-facing inconsistency. +- **N2 — `TransportError` (non-`NetworkError`, e.g. `httpx2.InvalidURL`) is treated as a foreign exception by the breaker** (no state change) — correct (it's a programming error, not a transient failure), but the module docstring doesn't say so. A one-line note would prevent surprise. + +## Negative results (attacked, found sound) + +- **Async atomicity:** no `await` exists inside `admit`/`on_success`/`on_failure` or between `admit()` returning and the `try` block; transitions are atomic under one event loop. No double-probe, no interleave leak. `CancelledError` cannot fire mid-`admit` (no await point). +- **Sync lock coverage:** every shared-state read/mutation is under `self._lock`; `next()` is correctly outside it; the TOCTOU window between admit and record is benign (the `_probe_in_flight` flag, set under lock, serializes the probe). +- **Cross-loop guard:** `_check_loop` is verbatim from `AsyncBulkhead` (incl. the `# pragma: no cover` race arm) and runs before any mutation. +- **Exception classification:** `StatusError` / `(NetworkError, TimeoutError)` / `BaseException` clauses are genuinely disjoint under the hierarchy; `StatusError`-not-in-set correctly routes to `on_success`; `CancelledError`/`KeyboardInterrupt`/`SystemExit` and foreign suite exceptions (`BulkheadFullError`, `RetryBudgetExhaustedError`, `DecodeError`, `MissingDecoderError`) all release the probe and re-raise with no state change; counted failures re-raise unwrapped; `CircuitOpenError` never chains a downstream error. +- **`AsyncTimeout` `cm.expired()` discriminator:** airtight — a near-simultaneous inner `asyncio.timeout`, an external task cancel, and our own deadline are all distinguished correctly (confirmed experimentally); `__cause__` is a `builtins.TimeoutError`; the middleware is stateless and safe to share. +- **State machine:** threshold boundaries (`>= N`, opens on exactly N), `reset_timeout` boundary (`>=`, inclusive), counter resets on open/close, probe-flag lifecycle, and `success_threshold>1` multi-probe all correct. +- **Tests that ARE sound:** the property test is non-vacuous (deterministic coverage, real invariant); both concurrent-probe tests genuinely force the probe-in-flight rejection; the timeout tests assert exact attributes + `__cause__`; the `CircuitOpenError` pickle/field/summary tests are substantive; `handler.calls`-based assertions genuinely distinguish opened-vs-closed. +- **Conventions / exports:** no `from __future__ import annotations`, no `# type: ignore`, no `print()`, no `basicConfig`, no `httpx2._`, no `__all__` in submodules; the four new names are symmetric across both `__init__.py`s and `test_public_api.py`; defaults match docs. + +## Closure note + +All findings are bounded and well-understood. Suggested grouping for closure (process-weight-matched, not one PR per finding): +1. **Production fixes** (M1 `nan`/`inf` guard + test; M2 emit-safety / no-wedge) — one small PR. +2. **Test hardening** (H1 event-name assertions; M3 `retry_after` value; M4 429-resets-streak; L5–L8) — one PR. +3. **Docs sweep** (M5, M6, L1–L4, N2) — one docs PR. diff --git a/planning/releases/0.10.1.md b/planning/releases/0.10.1.md new file mode 100644 index 0000000..422f38f --- /dev/null +++ b/planning/releases/0.10.1.md @@ -0,0 +1,25 @@ +# httpware 0.10.1 — 0.10.0 delta-audit closure + +**Patch release. Bug fixes + hardening from the 0.10.0 delta audit. No breaking changes** (one additive observability field; see below). + +Audit report: [`planning/audit/2026-06-13-delta-audit.md`](../audit/2026-06-13-delta-audit.md). + +## Fixes + +- **`AsyncTimeout` now rejects non-finite values.** `AsyncTimeout(timeout=float("nan"))` and `float("inf")` were silently accepted — `timeout <= 0` is `False` for both under IEEE-754 — producing a nondeterministic (`nan`) or never-firing (`inf`) deadline. The constructor now requires a finite positive number. +- **Observability failures can no longer break the request path.** `_emit_event` previously called OpenTelemetry's `span.add_event(...)` unguarded; a recording span whose `add_event` raised would propagate into the caller. Because the circuit breaker emits inside its state transitions, this could strand the circuit permanently in HALF_OPEN. The OTel emission now degrades silently on failure (the structured log record has already fired), at the root for all middleware. + +## Observability (additive) + +- **Each event log record now carries an `event` field** holding the event-name string (e.g. `event="circuit.opened"`). Previously the event name reached only OpenTelemetry span events; it is now also a first-class, filterable field on the stdlib log record. Purely additive — no existing attribute changed. + +## Docs + +- `docs/index.md` observability section now lists all four middleware loggers (`httpware.retry`, `httpware.bulkhead`, `httpware.circuit_breaker`, `httpware.timeout`) and their events with levels. +- `CircuitOpenError` added to the resilience-refusal error lists in `README.md` and `docs/index.md`. +- README logging example now sets `httpware.circuit_breaker` to `INFO` so the `circuit.half_open` / `circuit.closed` recovery events are visible. +- Corrected the AsyncRetry timeout-knob note (now points to `AsyncTimeout`), the OPEN-state wording (the first request after `reset_timeout` becomes the probe), and the observability/breaker docstrings. + +## Tests + +Hardened to assert the stable event-name strings, the exact `retry_after` value, the 429-resets-the-failure-streak path, `success_threshold > 1` with a mid-streak probe failure, and `reset_timeout=0` / empty `failure_status_codes` boundaries. No production behavior change from the test work. diff --git a/src/httpware/_internal/observability.py b/src/httpware/_internal/observability.py index 36551e2..310be57 100644 --- a/src/httpware/_internal/observability.py +++ b/src/httpware/_internal/observability.py @@ -2,11 +2,13 @@ See planning/specs/2026-06-05-observability-design.md for the contract. -Logger names (``httpware.retry``, ``httpware.bulkhead``) and event names -(``retry.giving_up``, ``bulkhead.rejected``, etc.) are the public observability +Logger names (``httpware.retry``, ``httpware.bulkhead``, ``httpware.circuit_breaker``, +``httpware.timeout``) and event names (``retry.giving_up``, ``bulkhead.rejected``, +``circuit.opened``, ``timeout.exceeded``, etc.) are the public observability surface. They are stable: renames are breaking changes. """ +import contextlib import logging import typing @@ -37,7 +39,7 @@ def _emit_event( the optional-extras isolation invariant: ``import httpware`` must not pull ``opentelemetry`` into ``sys.modules`` when the extra is absent. """ - logger.log(level, message, extra=attributes) + logger.log(level, message, extra={**attributes, "event": event_name}) if import_checker.is_otel_installed: try: from opentelemetry import trace # noqa: PLC0415 — lazy by design (optional-extras isolation) @@ -45,4 +47,9 @@ def _emit_event( # opentelemetry namespace exists but the api package is broken or missing — # degrade to log-only emission. The structured log record above has already fired. return - trace.get_current_span().add_event(event_name, attributes=attributes) + # Observability must never break the request path — suppress any failure from + # add_event (e.g. a recording span with a broken exporter or attribute validation). + # The structured log record above has already fired; CancelledError/KeyboardInterrupt + # are not Exception subclasses and will still propagate. + with contextlib.suppress(Exception): + trace.get_current_span().add_event(event_name, attributes=attributes) diff --git a/src/httpware/middleware/resilience/circuit_breaker.py b/src/httpware/middleware/resilience/circuit_breaker.py index 8b1b42c..795a001 100644 --- a/src/httpware/middleware/resilience/circuit_breaker.py +++ b/src/httpware/middleware/resilience/circuit_breaker.py @@ -5,7 +5,10 @@ A counted failure is a NetworkError, an httpware TimeoutError, or a StatusError whose status_code is in the effective failure set (default: all 5xx). 4xx — including 429 — count as successes: 429 means healthy-but-throttling, and tripping on it amplifies -incidents. Any other exception propagates without affecting circuit state. +incidents. Any other exception propagates without affecting circuit state. In +particular, non-NetworkError transport problems — e.g. httpx2.InvalidURL from a +malformed URL — are foreign: they propagate unchanged and do not increment the +failure counter, so programming errors cannot trip the breaker. State machine (classic / consecutive-failure): CLOSED — forward; count consecutive counted-failures; open at failure_threshold. diff --git a/src/httpware/middleware/resilience/timeout.py b/src/httpware/middleware/resilience/timeout.py index b404f80..17730e0 100644 --- a/src/httpware/middleware/resilience/timeout.py +++ b/src/httpware/middleware/resilience/timeout.py @@ -14,6 +14,7 @@ import asyncio import logging +import math import httpx2 @@ -22,7 +23,7 @@ from httpware.middleware import AsyncNext -_TIMEOUT_INVALID = "timeout must be > 0" +_TIMEOUT_INVALID = "timeout must be a finite number > 0" _LOGGER = logging.getLogger("httpware.timeout") @@ -43,7 +44,7 @@ class AsyncTimeout: """ def __init__(self, *, timeout: float) -> None: - if timeout <= 0: + if not math.isfinite(timeout) or timeout <= 0: raise ValueError(_TIMEOUT_INVALID) self._timeout = timeout diff --git a/tests/test_circuit_breaker.py b/tests/test_circuit_breaker.py index f343ca2..800b6b1 100644 --- a/tests/test_circuit_breaker.py +++ b/tests/test_circuit_breaker.py @@ -120,9 +120,11 @@ async def test_open_emits_opened_event_and_rejects(caplog: pytest.LogCaptureFixt opened = [r for r in records if "opened" in r.message] rejected = [r for r in records if "rejecting" in r.message] assert len(opened) == 1 + assert opened[0].event == "circuit.opened" # ty: ignore[unresolved-attribute] assert opened[0].failure_threshold == 2 # noqa: PLR2004 # ty: ignore[unresolved-attribute] assert opened[0].failures == 2 # noqa: PLR2004 # ty: ignore[unresolved-attribute] assert len(rejected) == 1 + assert rejected[0].event == "circuit.rejected" # ty: ignore[unresolved-attribute] assert rejected[0].retry_after is not None # ty: ignore[unresolved-attribute] assert rejected[0].method == "GET" # ty: ignore[unresolved-attribute] @@ -285,25 +287,143 @@ async def test_reset_timeout_admits_probe_then_closes(caplog: pytest.LogCaptureF response = await client.get("https://example.test/x") # probe -> 200 -> CLOSED assert response.status_code == HTTPStatus.OK assert handler.calls == 3 # noqa: PLR2004 — 2 failures + 1 probe - messages = [r.message for r in caplog.records if r.name == "httpware.circuit_breaker"] + records = [r for r in caplog.records if r.name == "httpware.circuit_breaker"] + assert any(r.event == "circuit.half_open" for r in records) # ty: ignore[unresolved-attribute] + assert any(r.event == "circuit.closed" for r in records) # ty: ignore[unresolved-attribute] + messages = [r.message for r in records] assert any("half-open" in m for m in messages) assert any("closed" in m for m in messages) -async def test_probe_failure_reopens_circuit() -> None: +async def test_probe_failure_reopens_circuit(caplog: pytest.LogCaptureFixture) -> None: clock = _Clock() handler = _StatusSequence([500, 500, 500]) # open after 2; probe (3rd) fails -> reopen breaker = AsyncCircuitBreaker(failure_threshold=2, reset_timeout=10.0, _now=clock) + async with _client(handler, breaker=breaker) as client: + with caplog.at_level(logging.WARNING, logger="httpware.circuit_breaker"): + for _ in range(2): + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") + clock.advance(10.0) + with pytest.raises(InternalServerError): # probe runs, fails + await client.get("https://example.test/x") + with pytest.raises(CircuitOpenError): # reopened; immediate retry rejected + await client.get("https://example.test/x") + assert handler.calls == 3 # noqa: PLR2004 — 2 failures + 1 probe-failure + # Probe failure emits circuit.opened with failures=1 (the single probe that reopened it). + reopen_records = [ + r + for r in caplog.records + if r.name == "httpware.circuit_breaker" + and r.event == "circuit.opened" # ty: ignore[unresolved-attribute] + and r.failures == 1 # ty: ignore[unresolved-attribute] + ] + assert len(reopen_records) == 1 + + +async def test_open_reject_retry_after_value() -> None: + """retry_after is exactly reset_timeout - elapsed (not just non-None).""" + clock = _Clock() + handler = _StatusSequence([500, 500]) + breaker = AsyncCircuitBreaker(failure_threshold=2, reset_timeout=30.0, _now=clock) + async with _client(handler, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") + clock.advance(10.0) # 10s into a 30s open window + with pytest.raises(CircuitOpenError) as info: + await client.get("https://example.test/x") + assert info.value.retry_after == 20.0 # noqa: PLR2004 # 30 - 10, exact + # The max(0.0, …) floor is only reachable if elapsed > reset_timeout while still + # OPEN, but the lazy OPEN→HALF_OPEN transition in admit() fires as soon as + # elapsed >= reset_timeout — so the circuit is never both OPEN and elapsed > + # reset_timeout. The floor is defensive dead code; no separate floor test needed. + + +async def test_429_resets_failure_streak() -> None: + """A 429 response is treated as success, resetting the failure streak.""" + handler = _StatusSequence([500, 429, 500, 500]) + breaker = AsyncCircuitBreaker(failure_threshold=2, _now=_Clock()) + async with _client(handler, breaker=breaker) as client: + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") # streak=1 + with pytest.raises(RateLimitedError): + await client.get("https://example.test/x") # 429 -> success, resets streak to 0 + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") # streak=1 again + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") # streak=2 -> opens + assert handler.calls == 4 # noqa: PLR2004 # all four reached the transport; never short-circuited + + +async def test_success_threshold_probe_failure_mid_streak_reopens() -> None: + """A probe failure mid-streak resets consecutive_successes — the next close needs two FRESH successes. + + Discriminating: if the success counter were NOT reset on reopen, the circuit would + close after a single post-reopen success and the final request would reach the + transport instead of being rejected. + """ + clock = _Clock() + # 2x500 open; probe-1=200 (s=1); probe-2=500 (reopen, s->0); probe-3=200 (s=1, NOT 2); + # probe-4=500 -> half-open probe failure -> reopen -> next request rejected. + handler = _StatusSequence([500, 500, 200, 500, 200, 500]) + breaker = AsyncCircuitBreaker(failure_threshold=2, success_threshold=2, reset_timeout=5.0, _now=clock) async with _client(handler, breaker=breaker) as client: for _ in range(2): with pytest.raises(InternalServerError): await client.get("https://example.test/x") - clock.advance(10.0) - with pytest.raises(InternalServerError): # probe runs, fails + clock.advance(5.0) + await client.get("https://example.test/x") # probe-1: 200 -> HALF_OPEN s=1 + with pytest.raises(InternalServerError): # probe-2: 500 -> reopen, s reset to 0 await client.get("https://example.test/x") - with pytest.raises(CircuitOpenError): # reopened; immediate retry rejected + clock.advance(5.0) + await client.get("https://example.test/x") # probe-3: 200 -> s=1 (would be 2->CLOSED if not reset) + with pytest.raises(InternalServerError): # probe-4: 500 -> half-open probe failure -> reopen await client.get("https://example.test/x") - assert handler.calls == 3 # noqa: PLR2004 — 2 failures + 1 probe-failure + # OPEN now (no clock advance): a missing-reset bug would have CLOSED the circuit + # after probe-3, so this request would reach the transport instead of being rejected. + with pytest.raises(CircuitOpenError): + await client.get("https://example.test/x") + assert handler.calls == 6 # noqa: PLR2004 # the final request was short-circuited (not the 7th transport hit) + + +async def test_reset_timeout_zero_admits_probe_immediately() -> None: + """With reset_timeout=0, the circuit admits a probe immediately (elapsed >= 0 always).""" + handler = _StatusSequence([500, 200]) + breaker = AsyncCircuitBreaker(failure_threshold=1, reset_timeout=0.0, _now=_Clock()) + async with _client(handler, breaker=breaker) as client: + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") # opens + # No clock advance needed — reset_timeout=0, so elapsed >= 0 is immediately true. + response = await client.get("https://example.test/x") # admitted as probe + assert response.status_code == HTTPStatus.OK + assert handler.calls == 2 # noqa: PLR2004 # both reached the transport + + +async def test_empty_failure_status_codes_ignores_5xx_trips_on_network_error() -> None: + """With failure_status_codes=[], no status ever counts; only NetworkError trips the breaker.""" + handler = _StatusSequence([500, 500, 500]) + breaker = AsyncCircuitBreaker(failure_threshold=2, failure_status_codes=[], _now=_Clock()) + async with _client(handler, breaker=breaker) as client: + for _ in range(3): + with pytest.raises(InternalServerError): + await client.get("https://example.test/x") + assert handler.calls == 3 # noqa: PLR2004 # never opened — 500 not in empty set + + def _raise(request: httpx2.Request) -> httpx2.Response: # noqa: ARG001 + msg = "connect failed" + raise httpx2.ConnectError(msg) + + breaker2 = AsyncCircuitBreaker(failure_threshold=2, failure_status_codes=[], _now=_Clock()) + async with AsyncClient( + httpx2_client=httpx2.AsyncClient(transport=httpx2.MockTransport(_raise)), + middleware=[breaker2], + ) as client2: + for _ in range(2): + with pytest.raises(NetworkError): + await client2.get("https://example.test/x") + with pytest.raises(CircuitOpenError): + await client2.get("https://example.test/x") async def test_success_threshold_requires_multiple_probes() -> None: diff --git a/tests/test_circuit_breaker_sync.py b/tests/test_circuit_breaker_sync.py index a87f722..b9e116f 100644 --- a/tests/test_circuit_breaker_sync.py +++ b/tests/test_circuit_breaker_sync.py @@ -91,9 +91,11 @@ def test_open_emits_opened_event_and_rejects(caplog: pytest.LogCaptureFixture) - opened = [r for r in records if "opened" in r.message] rejected = [r for r in records if "rejecting" in r.message] assert len(opened) == 1 + assert opened[0].event == "circuit.opened" # ty: ignore[unresolved-attribute] assert opened[0].failure_threshold == 2 # noqa: PLR2004 # ty: ignore[unresolved-attribute] assert opened[0].failures == 2 # noqa: PLR2004 # ty: ignore[unresolved-attribute] assert len(rejected) == 1 + assert rejected[0].event == "circuit.rejected" # ty: ignore[unresolved-attribute] assert rejected[0].retry_after is not None # ty: ignore[unresolved-attribute] assert rejected[0].method == "GET" # ty: ignore[unresolved-attribute] @@ -194,16 +196,22 @@ def test_reset_timeout_admits_probe_then_closes(caplog: pytest.LogCaptureFixture response = client.get("https://example.test/x") assert response.status_code == HTTPStatus.OK assert handler.calls == 3 # noqa: PLR2004 - messages = [r.message for r in caplog.records if r.name == "httpware.circuit_breaker"] + records = [r for r in caplog.records if r.name == "httpware.circuit_breaker"] + assert any(r.event == "circuit.half_open" for r in records) # ty: ignore[unresolved-attribute] + assert any(r.event == "circuit.closed" for r in records) # ty: ignore[unresolved-attribute] + messages = [r.message for r in records] assert any("half-open" in m for m in messages) assert any("closed" in m for m in messages) -def test_probe_failure_reopens_circuit() -> None: +def test_probe_failure_reopens_circuit(caplog: pytest.LogCaptureFixture) -> None: clock = _Clock() handler = _StatusSequence([500, 500, 500]) breaker = CircuitBreaker(failure_threshold=2, reset_timeout=10.0, _now=clock) - with _client(handler, breaker=breaker) as client: + with ( + _client(handler, breaker=breaker) as client, + caplog.at_level(logging.WARNING, logger="httpware.circuit_breaker"), + ): for _ in range(2): with pytest.raises(InternalServerError): client.get("https://example.test/x") @@ -213,6 +221,159 @@ def test_probe_failure_reopens_circuit() -> None: with pytest.raises(CircuitOpenError): client.get("https://example.test/x") assert handler.calls == 3 # noqa: PLR2004 + # Probe failure emits circuit.opened with failures=1 (the single probe that reopened it). + reopen_records = [ + r + for r in caplog.records + if r.name == "httpware.circuit_breaker" + and r.event == "circuit.opened" # ty: ignore[unresolved-attribute] + and r.failures == 1 # ty: ignore[unresolved-attribute] + ] + assert len(reopen_records) == 1 + + +def test_open_reject_retry_after_value() -> None: + """retry_after is exactly reset_timeout - elapsed (not just non-None).""" + clock = _Clock() + handler = _StatusSequence([500, 500]) + breaker = CircuitBreaker(failure_threshold=2, reset_timeout=30.0, _now=clock) + with _client(handler, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(InternalServerError): + client.get("https://example.test/x") + clock.advance(10.0) # 10s into a 30s open window + with pytest.raises(CircuitOpenError) as info: + client.get("https://example.test/x") + assert info.value.retry_after == 20.0 # noqa: PLR2004 # 30 - 10, exact + # The max(0.0, …) floor is only reachable if elapsed > reset_timeout while still + # OPEN, but the lazy OPEN→HALF_OPEN transition fires as soon as elapsed >= reset_timeout. + # The floor is defensive dead code; no separate floor test needed. + + +def test_429_resets_failure_streak() -> None: + """A 429 response is treated as success, resetting the failure streak.""" + handler = _StatusSequence([500, 429, 500, 500]) + breaker = CircuitBreaker(failure_threshold=2, _now=_Clock()) + with _client(handler, breaker=breaker) as client: + with pytest.raises(InternalServerError): + client.get("https://example.test/x") # streak=1 + with pytest.raises(RateLimitedError): + client.get("https://example.test/x") # 429 -> success, resets streak to 0 + with pytest.raises(InternalServerError): + client.get("https://example.test/x") # streak=1 again + with pytest.raises(InternalServerError): + client.get("https://example.test/x") # streak=2 -> opens + assert handler.calls == 4 # noqa: PLR2004 # all four reached the transport; never short-circuited + + +def test_non_counted_exception_in_probe_releases_slot() -> None: + """A non-counted exception during the probe releases the probe slot (sync mirror). + + The circuit stays OPEN (probe neither succeeded nor failed), and the next request + after reset_timeout can take the probe slot again. + """ + clock = _Clock() + + class _Boom: + def __call__(self, request: httpx2.Request, next: object) -> httpx2.Response: # noqa: A002,ARG002 + msg = "boom" + raise ValueError(msg) + + open_handler = _StatusSequence([500]) + breaker = CircuitBreaker(failure_threshold=1, reset_timeout=5.0, _now=clock) + with _client(open_handler, breaker=breaker) as opener, pytest.raises(InternalServerError): + opener.get("https://example.test/x") + # Circuit is OPEN. Advance time to allow a probe. + clock.advance(5.0) + + boom_client = Client( + httpx2_client=httpx2.Client(transport=httpx2.MockTransport(open_handler)), + middleware=[breaker, _Boom()], + ) + with boom_client, pytest.raises(ValueError, match="boom"): + # Probe slot taken, but _Boom raises ValueError — probe slot must be released. + boom_client.get("https://example.test/probe") + + # After the ValueError, probe_in_flight is False again. The next request should + # be admitted as a new probe (not rejected with retry_after=None). + good_handler = _StatusSequence([200]) + good_client = Client( + httpx2_client=httpx2.Client(transport=httpx2.MockTransport(good_handler)), + middleware=[breaker], + ) + with good_client: + response = good_client.get("https://example.test/x") + assert response.status_code == HTTPStatus.OK + + +def test_success_threshold_probe_failure_mid_streak_reopens() -> None: + """A probe failure mid-streak resets consecutive_successes — the next close needs two FRESH successes. + + Discriminating: if the success counter were NOT reset on reopen, the circuit would + close after a single post-reopen success and the final request would reach the + transport instead of being rejected. + """ + clock = _Clock() + # 2x500 open; probe-1=200 (s=1); probe-2=500 (reopen, s->0); probe-3=200 (s=1, NOT 2); + # probe-4=500 -> half-open probe failure -> reopen -> next request rejected. + handler = _StatusSequence([500, 500, 200, 500, 200, 500]) + breaker = CircuitBreaker(failure_threshold=2, success_threshold=2, reset_timeout=5.0, _now=clock) + with _client(handler, breaker=breaker) as client: + for _ in range(2): + with pytest.raises(InternalServerError): + client.get("https://example.test/x") + clock.advance(5.0) + client.get("https://example.test/x") # probe-1: 200 -> HALF_OPEN s=1 + with pytest.raises(InternalServerError): # probe-2: 500 -> reopen, s reset to 0 + client.get("https://example.test/x") + clock.advance(5.0) + client.get("https://example.test/x") # probe-3: 200 -> s=1 (would be 2->CLOSED if not reset) + with pytest.raises(InternalServerError): # probe-4: 500 -> half-open probe failure -> reopen + client.get("https://example.test/x") + # OPEN now (no clock advance): a missing-reset bug would have CLOSED the circuit + # after probe-3, so this request would reach the transport instead of being rejected. + with pytest.raises(CircuitOpenError): + client.get("https://example.test/x") + assert handler.calls == 6 # noqa: PLR2004 # the final request was short-circuited (not the 7th transport hit) + + +def test_reset_timeout_zero_admits_probe_immediately() -> None: + """With reset_timeout=0, the circuit admits a probe immediately (elapsed >= 0 always).""" + handler = _StatusSequence([500, 200]) + breaker = CircuitBreaker(failure_threshold=1, reset_timeout=0.0, _now=_Clock()) + with _client(handler, breaker=breaker) as client: + with pytest.raises(InternalServerError): + client.get("https://example.test/x") # opens + # No clock advance needed — reset_timeout=0, so elapsed >= 0 is immediately true. + response = client.get("https://example.test/x") # admitted as probe + assert response.status_code == HTTPStatus.OK + assert handler.calls == 2 # noqa: PLR2004 # both reached the transport + + +def test_empty_failure_status_codes_ignores_5xx_trips_on_network_error() -> None: + """With failure_status_codes=[], no status ever counts; only NetworkError trips the breaker.""" + handler = _StatusSequence([500, 500, 500]) + breaker = CircuitBreaker(failure_threshold=2, failure_status_codes=[], _now=_Clock()) + with _client(handler, breaker=breaker) as client: + for _ in range(3): + with pytest.raises(InternalServerError): + client.get("https://example.test/x") + assert handler.calls == 3 # noqa: PLR2004 # never opened — 500 not in empty set + + def _raise(request: httpx2.Request) -> httpx2.Response: # noqa: ARG001 + msg = "connect failed" + raise httpx2.ConnectError(msg) + + breaker2 = CircuitBreaker(failure_threshold=2, failure_status_codes=[], _now=_Clock()) + with Client( + httpx2_client=httpx2.Client(transport=httpx2.MockTransport(_raise)), + middleware=[breaker2], + ) as client2: + for _ in range(2): + with pytest.raises(NetworkError): + client2.get("https://example.test/x") + with pytest.raises(CircuitOpenError): + client2.get("https://example.test/x") def test_success_threshold_requires_multiple_probes() -> None: diff --git a/tests/test_observability.py b/tests/test_observability.py index c8f4f32..c176ef7 100644 --- a/tests/test_observability.py +++ b/tests/test_observability.py @@ -29,6 +29,7 @@ def test_emit_event_logs_at_warning_with_extra_fields(caplog: pytest.LogCaptureF assert record.message == "something interesting happened" assert record.foo == 1 # ty: ignore[unresolved-attribute] assert record.bar == "x" # ty: ignore[unresolved-attribute] + assert record.event == "test.event" # ty: ignore[unresolved-attribute] def test_emit_event_respects_level_parameter(caplog: pytest.LogCaptureFixture) -> None: @@ -95,3 +96,22 @@ def test_emit_event_works_when_otel_installed_but_no_active_span() -> None: attributes={"a": 1}, ) # No assertion needed — the absence of an exception IS the assertion. + + +def test_emit_event_swallows_add_event_failure() -> None: + """A failing OTel add_event must not break the caller; the log record still fires.""" + mock_span = MagicMock(name="MockSpan") + mock_span.add_event.side_effect = RuntimeError("exporter boom") + with ( + patch("httpware._internal.import_checker.is_otel_installed", True), + patch("opentelemetry.trace.get_current_span", return_value=mock_span), + ): + # must not raise + _emit_event( + _TEST_LOGGER, + "test.event", + level=logging.WARNING, + message="resilient", + attributes={"k": "v"}, + ) + mock_span.add_event.assert_called_once() diff --git a/tests/test_timeout.py b/tests/test_timeout.py index 51bd707..04a1aac 100644 --- a/tests/test_timeout.py +++ b/tests/test_timeout.py @@ -49,6 +49,7 @@ async def _next(request: httpx2.Request) -> httpx2.Response: records = [r for r in caplog.records if r.name == "httpware.timeout"] assert len(records) == 1 assert records[0].levelno == logging.WARNING + assert records[0].event == "timeout.exceeded" # ty: ignore[unresolved-attribute] assert records[0].timeout == 0.01 # noqa: PLR2004 # ty: ignore[unresolved-attribute] assert records[0].method == "GET" # ty: ignore[unresolved-attribute] assert "example.test/x" in records[0].url # ty: ignore[unresolved-attribute] @@ -85,10 +86,20 @@ async def _next(_request: httpx2.Request) -> httpx2.Response: def test_zero_timeout_rejected() -> None: - with pytest.raises(ValueError, match="timeout must be > 0"): + with pytest.raises(ValueError, match="timeout must be a finite number > 0"): AsyncTimeout(timeout=0) def test_negative_timeout_rejected() -> None: - with pytest.raises(ValueError, match="timeout must be > 0"): + with pytest.raises(ValueError, match="timeout must be a finite number > 0"): AsyncTimeout(timeout=-1.0) + + +def test_nan_timeout_rejected() -> None: + with pytest.raises(ValueError, match="timeout must be a finite number > 0"): + AsyncTimeout(timeout=float("nan")) + + +def test_inf_timeout_rejected() -> None: + with pytest.raises(ValueError, match="timeout must be a finite number > 0"): + AsyncTimeout(timeout=float("inf"))