diff --git a/docs/index.md b/docs/index.md index c183a8f..3b0b8f8 100644 --- a/docs/index.md +++ b/docs/index.md @@ -52,6 +52,10 @@ process, no Kafka. `opentelemetry`), Postgres setup. - [Basic usage](usage/basic.md) — declare the table, create the broker, publish a row, register a subscriber. +- [Tutorial: Your first outbox app](tutorials/first-outbox-app.md) — + build a working publisher / subscriber from scratch. +- [Tutorial: Add a Kafka relay](tutorials/add-kafka-relay.md) — extend + the first app to forward outbox rows to Kafka. ### Concepts @@ -61,6 +65,8 @@ process, no Kafka. - [Comparison](concepts/comparison.md) — vs writing your own, vs CDC, vs Kafka transactions, vs `LISTEN/NOTIFY`, vs Celery, vs FastStream foreign-broker direct. +- [Instrumentation seams](concepts/instrumentation-seams.md) — the + recorder seam vs native middleware, and why both exist. ### Guides @@ -74,6 +80,8 @@ process, no Kafka. loop-driven modes. - [Schema validation](usage/schema-validation.md) — opt-in Alembic-driven check for `/health` and CI. +- [Setup Prometheus and OpenTelemetry](usage/setup-prometheus-opentelemetry.md) + — wire the native middleware and recorder adapters end-to-end. ### Reference @@ -87,3 +95,12 @@ process, no Kafka. via a single CTE, `dlq_written` metric, retention patterns. - [Observability](usage/observability.md) — recorder seam plus native Prometheus / OpenTelemetry middleware. + +### Operations + +- [Production checklist](operations/checklist.md) — connection budget, + lease TTL sizing, and deploy-safety items before going live. +- [Troubleshooting](operations/troubleshooting.md) — common symptoms + (idle latency, `lease_lost` spikes, connection exhaustion) and fixes. +- [Alembic migrations](operations/alembic.md) — autogenerate the outbox + table and its partial indexes. diff --git a/docs/introduction/how-it-works.md b/docs/introduction/how-it-works.md index 0213bcb..fdeb29c 100644 --- a/docs/introduction/how-it-works.md +++ b/docs/introduction/how-it-works.md @@ -148,11 +148,14 @@ and terminal-by-failure rows are copied into a sibling audit table in the same Postgres statement as the `DELETE`: ```python +from sqlalchemy import MetaData +from sqlalchemy.ext.asyncio import create_async_engine from faststream_outbox import OutboxBroker, make_dlq_table, make_outbox_table metadata = MetaData() outbox_table = make_outbox_table(metadata, table_name="outbox") dlq_table = make_dlq_table(metadata, table_name="outbox_dlq") +engine = create_async_engine("postgresql+asyncpg://outbox:outbox@localhost:5432/outbox") broker = OutboxBroker(engine, outbox_table=outbox_table, dlq_table=dlq_table) ``` @@ -180,7 +183,7 @@ you add). ## Relay to Kafka / RabbitMQ / NATS / Redis -> **Relay outbox rows to Kafka / RabbitMQ / NATS / Redis with a single decorator → [Relay tutorial](../usage/relay.md).** +> **Relay outbox rows to Kafka / RabbitMQ / NATS / Redis with a single decorator → [Relay tutorial](../tutorials/add-kafka-relay.md).** ## Acknowledgements diff --git a/docs/introduction/installation.md b/docs/introduction/installation.md index fc936e3..1881c7c 100644 --- a/docs/introduction/installation.md +++ b/docs/introduction/installation.md @@ -59,3 +59,10 @@ Combine extras with commas: ```bash pip install 'faststream-outbox[asyncpg,fastapi,prometheus]' ``` + +Or use the `all` extra to pull in every optional extra at once +(`asyncpg`, `validate`, `fastapi`, `prometheus`, `opentelemetry`): + +```bash +pip install 'faststream-outbox[all]' +``` diff --git a/docs/operations/alembic.md b/docs/operations/alembic.md index b3b36f5..b0afe71 100644 --- a/docs/operations/alembic.md +++ b/docs/operations/alembic.md @@ -148,6 +148,14 @@ If you also pass `dlq_table=make_dlq_table(metadata)` when constructing the broker, `validate_schema()` checks both tables in one call and surfaces drift on either one. +Both autogenerate and `validate_schema()` run with +`compare_server_default=False`, so **server-default drift is not +detected** — neither the autogenerated migration nor the drift gate will +flag a column whose `server_default` is missing or wrong. The +consequence that bites is a missing `server_default=now()` on +`next_attempt_at`; see the server-defaults caveat in +[Schema validation](../usage/schema-validation.md). + ## DLQ retention via partition drop { #dlq-retention-via-partition-drop } Plain `DELETE FROM outbox_dlq WHERE failed_at < now() - interval '90 diff --git a/docs/operations/checklist.md b/docs/operations/checklist.md index aafec3b..7ae610b 100644 --- a/docs/operations/checklist.md +++ b/docs/operations/checklist.md @@ -30,9 +30,9 @@ story. unbounded; pair with a non-`NoRetry()` retry strategy or wedge-prone handlers can replay forever. - [ ] **Retry strategy chosen.** Default - `ExponentialRetry(initial=1, multiplier=2, max=300, attempts=10, - jitter=0.2)` is fine for most. Opt into `NoRetry()` explicitly for - an audit feed. + `ExponentialRetry(initial_delay_seconds=1.0, multiplier=2.0, + max_delay_seconds=300.0, max_attempts=10, jitter_factor=0.2)` is fine + for most. Opt into `NoRetry()` explicitly for an audit feed. ## DLQ diff --git a/docs/tutorials/add-kafka-relay.md b/docs/tutorials/add-kafka-relay.md index 9ef88d9..e6ffeb2 100644 --- a/docs/tutorials/add-kafka-relay.md +++ b/docs/tutorials/add-kafka-relay.md @@ -87,10 +87,10 @@ You should see: outbox-kafka | [2026-06-12 05:22:33,782] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer) ``` -## Step 2: Install `faststream[kafka]` +## Step 2: Install `faststream[cli,kafka]` ```bash -uv add 'faststream[kafka]' +uv add 'faststream[cli,kafka]' ``` You should see: diff --git a/docs/tutorials/first-outbox-app.md b/docs/tutorials/first-outbox-app.md index 44bc70c..e3e1925 100644 --- a/docs/tutorials/first-outbox-app.md +++ b/docs/tutorials/first-outbox-app.md @@ -317,5 +317,10 @@ docker stop outbox-postgres `OutboxPublisher` decorator, chained publishing. - [FastAPI integration](../usage/fastapi.md) — wire the outbox into a real HTTP service with `Depends(get_session)`. +- [Schema validation](../usage/schema-validation.md) — this tutorial + installed the `validate` extra; call `validate_schema()` from a + startup hook or `/health` check to catch a table that drifted from + what the broker expects (e.g. a missing partial index after a + migration). - [Tutorial: Add a Kafka relay](./add-kafka-relay.md) — extend this app to forward each row into Kafka with one stacked decorator. diff --git a/docs/usage/basic.md b/docs/usage/basic.md index e797691..b894ff4 100644 --- a/docs/usage/basic.md +++ b/docs/usage/basic.md @@ -99,7 +99,7 @@ async def publish_one() -> None: await broker.publish(1, queue="orders", session=session) ``` -Run with `faststream run app:app`. +Save this module as `app.py`, then run with `faststream run app:app`. ## Connection ownership diff --git a/docs/usage/fastapi.md b/docs/usage/fastapi.md index 4a3d915..38e4529 100644 --- a/docs/usage/fastapi.md +++ b/docs/usage/fastapi.md @@ -21,13 +21,30 @@ pip install 'faststream-outbox[fastapi]' from collections.abc import AsyncIterator from fastapi import Depends, FastAPI +from pydantic import BaseModel from sqlalchemy import MetaData from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column from faststream_outbox import make_outbox_table from faststream_outbox.fastapi import OutboxRouter +class OrderIn(BaseModel): + item: str + + +class Base(DeclarativeBase): + pass + + +class Order(Base): + __tablename__ = "orders" + + id: Mapped[int] = mapped_column(primary_key=True) + item: Mapped[str] + + metadata = MetaData() outbox_table = make_outbox_table(metadata, table_name="outbox") engine = create_async_engine("postgresql+asyncpg://localhost/app") @@ -55,8 +72,10 @@ async def create_order( order: OrderIn, session: AsyncSession = Depends(get_session), ) -> dict: - session.add(Order(...)) - await router.broker.publish({"order_id": ...}, queue="orders", session=session) + db_order = Order(item=order.item) + session.add(db_order) + await session.flush() # populate db_order.id + await router.broker.publish({"order_id": db_order.id}, queue="orders", session=session) return {"ok": True} @@ -77,9 +96,10 @@ the same way it would in an HTTP endpoint: a fresh `AsyncSession` per delivery, opened in a `session.begin()` block, committed on handler return, rolled back on exception. -A handler can therefore receive the **same** `AsyncSession` it would in an -HTTP route — and `OutboxResponse(session=...)` commits the follow-on row -with the handler's domain writes. See [Chained +A handler's `AsyncSession` is therefore resolved exactly as in an HTTP +route — a fresh session per delivery, not a shared instance — and +`OutboxResponse(session=...)` commits the follow-on row with the handler's +domain writes. See [Chained publishing](./publisher.md#chained-publishing). ## Annotated context shortcuts @@ -112,15 +132,24 @@ there resolves as a request field and fails with a 422. ## What's intentionally not exposed -`apply_types` and the broker's FastStream `dependencies` argument are -intentionally **not exposed** on `OutboxRouter.__init__`: - -- `StreamRouter` forces `apply_types=False` because FastAPI's FastDepends - takes over the parameter resolution. Letting the user flip it would - produce weird half-resolved handlers. -- `dependencies` on the router signature means FastAPI `Depends(...)` only - — the broker's FastStream `Dependant` list is the wrong shape for this - flow. +Several `OutboxBroker.__init__` arguments are intentionally **not exposed** +on `OutboxRouter.__init__`: + +- `apply_types` — `StreamRouter` forces `apply_types=False` because + FastAPI's FastDepends takes over the parameter resolution. Letting the + user flip it would produce weird half-resolved handlers. +- `dependencies` — on the router signature this means FastAPI + `Depends(...)` only; the broker's FastStream `Dependant` list is the + wrong shape for this flow. +- `dlq_table`, `metrics_recorder`, and `routers` — simply not forwarded + through the router today. + +The router builds the broker for you and gives you no handle to a +pre-constructed `OutboxBroker`, and all three of those arguments are +constructor-only on the broker (no setters). So a FastAPI user **cannot** +enable the [DLQ](./dlq.md) or the [metrics-recorder +seam](./observability.md) through `OutboxRouter` today — there is no +router-based workaround. Use the standalone `OutboxBroker` for those. If you need broker-level FastStream middlewares, pass them to `OutboxRouter(middlewares=[...])` — the router builds the broker for you, so diff --git a/docs/usage/observability.md b/docs/usage/observability.md index 5784b82..77d842e 100644 --- a/docs/usage/observability.md +++ b/docs/usage/observability.md @@ -51,7 +51,7 @@ broken recorder never poisons the dispatch loop. | Event | Tags (always present) | Tags (situational) | Fired by | |---|---|---|---| -| `fetched` | `queue`, `subscriber`, `count` | | Fetch loop, every cycle (including empty) | +| `fetched` | `queue`, `subscriber`, `count` | | Fetch loop, once per fetch attempt (`count=0` on an empty fetch) — **skipped** when the in-flight queue is full (no fetch is issued). `queue` is tagged with the subscriber's **first** queue only; multi-queue subscribers should break down by queue using the row-level events instead | | `dispatched` | `queue`, `subscriber`, `deliveries_count`, `size_bytes` | | Worker loop, before handler runs | | `acked` | `queue`, `subscriber`, `deliveries_count`, `duration_seconds` | | Handler returned successfully | | `nacked_retried` | `queue`, `subscriber`, `deliveries_count`, `duration_seconds`, `next_delay_seconds` | `exception_type` | Retry scheduled | @@ -89,12 +89,14 @@ faststream_received_messages_in_process{broker="outbox"} # Operator playbook: lease_ttl_seconds is too low for this handler's P99 rate(faststream_outbox_lease_lost_total[5m]) > 0 -# Publish throughput per queue -rate(faststream_published_messages_total{broker="outbox",status="success"}[1m]) +# Publish throughput per queue (publish metrics are tagged by `destination`) +sum by (destination) ( + rate(faststream_published_messages_total{broker="outbox",status="success"}[1m])) # P99 publish (INSERT) latency per queue histogram_quantile(0.99, - rate(faststream_published_messages_duration_seconds_bucket{broker="outbox"}[5m])) + sum by (destination, le) ( + rate(faststream_published_messages_duration_seconds_bucket{broker="outbox"}[5m]))) # DLQ misconfiguration: terminal-failure rate diverges from DLQ-write rate rate(faststream_outbox_terminal_total[5m]) diff --git a/docs/usage/publisher.md b/docs/usage/publisher.md index 6b86663..6c99115 100644 --- a/docs/usage/publisher.md +++ b/docs/usage/publisher.md @@ -99,7 +99,7 @@ async def checkout(order: Order, session: AsyncSession) -> None: await session.commit() # row + domain commit together ``` -Per-call `headers` are merged with the decorator's static headers +Per-call `headers` are merged with the publisher's static headers (per-call wins). The publisher exists primarily for AsyncAPI spec coverage and to @@ -130,9 +130,16 @@ transactional contract applies (you provide the session, the row commits with your domain writes): ```python +from fastapi import Depends +from sqlalchemy.ext.asyncio import AsyncSession + from faststream_outbox import OutboxResponse from faststream_outbox.annotations import OutboxMessage +# `get_session` is your app's session dependency — the same one your HTTP +# routes use. `Depends(...)` resolves inside a handler only under the FastAPI +# integration; see the engine / sessionmaker setup in the FastAPI guide. + @broker.subscriber("orders") async def handle( @@ -152,6 +159,15 @@ explicitly — useful for trace stitching. Plain returns (`None`, `dict`, etc.) are silently skipped, so handlers that don't want to chain just return normally. +!!! warning "Duplicate delivery on crash" + The chained `downstream` row commits with the handler's transaction, + but the inbound `orders` row's terminal `DELETE` runs **after** the + handler returns, on the worker's separate autocommit connection. A + crash between those two points leaves the inbound row undeleted, so it + is redelivered — producing a **second** chained row. For non-idempotent + chains, pass a deterministic `timer_id` derived from the inbound message + so the duplicate insert is a no-op (see [Timers](./timers.md)). + ## Annotated handler params `faststream_outbox.annotations` exports `Annotated[..., Context(...)]` diff --git a/docs/usage/relay.md b/docs/usage/relay.md index 3971df3..e51e108 100644 --- a/docs/usage/relay.md +++ b/docs/usage/relay.md @@ -47,7 +47,11 @@ the rest of the outbox subscriber's behavior. ## Two-broker lifecycle -Both brokers must be started for the relay to work. Two idiomatic shapes: +Both brokers must be started for the relay to work. There's a built-in +safety net: at `start()` the outbox broker logs a WARNING (one per unstarted +foreign broker) naming the affected queue(s), and a relay to an unstarted +foreign broker simply fails-and-retries until that broker is started — the +row is never lost. Two idiomatic shapes: ### FastAPI (recommended) @@ -121,6 +125,7 @@ are **not** forwarded to the foreign publish. Two ways to override: ```python from faststream.response import Response +from faststream_outbox.annotations import OutboxMessage @publisher_kafka @broker_outbox.subscriber("outbox_queue") @@ -170,13 +175,17 @@ async def relay(body: dict) -> OutboxResponse: This would both insert a row into the outbox AND publish to Kafka. The subscriber raises `RuntimeError` at dispatch time when it detects the -combination — pick one path. +combination — pick one path. The worker catches that error and logs it at +ERROR; it does **not** flush a nack and does **not** route the row through +the `retry_strategy`. The row's lease simply expires and a later fetch +reclaims it, so the row keeps being retried (not lost) until you fix the +configuration. **Do not** stack an outbox publisher on a foreign subscriber. ```python -@broker_outbox.publisher("outbox_queue") -@broker_kafka.subscriber("kafka_topic") # NotImplementedError at decoration +@broker_outbox.publisher("outbox_queue") # NotImplementedError at decoration +@broker_kafka.subscriber("kafka_topic") async def relay(body: dict) -> dict: return body ``` diff --git a/docs/usage/schema-validation.md b/docs/usage/schema-validation.md index e646ac1..d9aece7 100644 --- a/docs/usage/schema-validation.md +++ b/docs/usage/schema-validation.md @@ -38,6 +38,18 @@ schema (`add_*` / `modify_*` ops). `remove_*` ops are silently dropped so you can attach your own audit columns or additional indexes without the validator complaining. +!!! warning "Server defaults are not checked" + The diff runs with `compare_server_default=False` — Alembic's + server-default comparison is flaky against Postgres' normalized + expressions (`now()` vs `CURRENT_TIMESTAMP`), so it is disabled to avoid + false positives. A **green** `validate_schema()` therefore does **not** + prove your server defaults exist. The load-bearing case: a table missing + `server_default=now()` on `next_attempt_at` leaves fresh rows with NULL + `next_attempt_at`, which the fetch CTE's `next_attempt_at <= now()` + predicate silently filters out — a silent broker outage that validation + will not catch. Generate your migration from `make_outbox_table(...)` so + the defaults are in place to begin with. + ## Where to call it Call it from a `/health` endpoint or startup hook — **not** at diff --git a/docs/usage/subscriber.md b/docs/usage/subscriber.md index c523079..6d90e02 100644 --- a/docs/usage/subscriber.md +++ b/docs/usage/subscriber.md @@ -17,6 +17,25 @@ async def handle(order_id: int) -> None: print(f"order {order_id}") ``` +## Multiple queues per subscriber + +The first argument is `queues: str | list[str]`. Pass a list to fan one +handler across several queues: + +```python +@broker.subscriber(["orders", "refunds"]) +async def handle(body: dict) -> None: ... +``` + +The subscriber claims rows from any of its queues in a single fetch. Its +[connection budget](#connection-budget) is unchanged — `max_workers + 1` +pool connections regardless of how many queues it serves. + +Do **not** register two subscribers on the **same** queue: they compete for +the same rows, and registration emits a warning to that effect. To run more +than one handler over a queue, attach them to a single subscriber; to scale +throughput, raise `max_workers`. + ## Body types FastStream deserializes the message body into the annotated type. Any @@ -145,6 +164,15 @@ async def handle(msg: OutboxMessage, body: dict) -> None: await msg.reject() # terminal delete ``` +!!! warning "MANUAL: returning without acking is a terminal reject" + Under `AckPolicy.MANUAL`, a handler that returns **without** calling + `ack()` / `nack()` / `reject()` (and without raising) is treated as a + terminal **reject** — the row is **deleted** (or written to the DLQ with + `failure_reason="rejected"` if a `dlq_table` is configured), not retried. + A handler that *raises* is nacked through the retry strategy instead, so + only the silent-return path is destructive. Always ack/nack/reject on + every branch. + ## Retry strategies A subscriber with no explicit `retry_strategy` defaults to diff --git a/docs/usage/timers.md b/docs/usage/timers.md index 7dc7532..7544bba 100644 --- a/docs/usage/timers.md +++ b/docs/usage/timers.md @@ -12,12 +12,13 @@ import datetime as dt # Fire 30 seconds from now, deduplicated by timer_id: +order_id = 1 await broker.publish( - {"order_id": 1}, + {"order_id": order_id}, queue="orders", session=session, activate_in=dt.timedelta(seconds=30), - timer_id=f"order-confirm-{order.id}", + timer_id=f"order-confirm-{order_id}", ) # Fire at a specific UTC instant: @@ -82,6 +83,15 @@ notifies. `timer_id` is only available on single `publish`, not on `publish_batch` (per-row dedup makes no sense for a batch). +### `timer_id` dedups only *live* rows + +The unique index is **partial** — `(queue, timer_id) WHERE timer_id IS NOT +NULL`. It constrains only rows currently in the table. Once a timer fires +(the row is deleted) or is cancelled, the same `timer_id` can be inserted +fresh. So `timer_id` is a dedup key for **in-flight / pending** timers, not +a permanent idempotency key — it won't stop a value from being re-published +after the original delivery has completed. + ## Cancellation `broker.cancel_timer(*, queue, timer_id, session)` issues a `DELETE` on diff --git a/faststream_outbox/publisher/usecase.py b/faststream_outbox/publisher/usecase.py index 23a3579..33d2215 100644 --- a/faststream_outbox/publisher/usecase.py +++ b/faststream_outbox/publisher/usecase.py @@ -35,7 +35,7 @@ _REJECT_RELAY_MSG = ( "OutboxPublisher cannot decorate a subscriber handler — relay chaining is not supported. " "Call `await broker.publish(value, queue=..., session=session)` inside your handler instead, " - "on the same session that owns the inbound row's terminal write." + "reusing the session that owns the inbound row's terminal write." ) diff --git a/faststream_outbox/testing.py b/faststream_outbox/testing.py index f41d0e6..518219f 100644 --- a/faststream_outbox/testing.py +++ b/faststream_outbox/testing.py @@ -2,7 +2,7 @@ Test broker with an in-memory ``OutboxClient`` substitute. ``TestOutboxBroker`` wraps an ``OutboxBroker`` and swaps in a ``FakeOutboxClient`` -backed by a list of dicts. Defaults to **sync dispatch**: ``await broker.publish(...)`` +backed by a list of ``_FakeRow`` records. Defaults to **sync dispatch**: ``await broker.publish(...)`` finds the matching subscriber and awaits its consume pipeline before returning, the same model as ``TestKafkaBroker`` / ``TestRabbitBroker``. Timers fire immediately in sync mode — ``activate_in`` / ``activate_at`` are recorded on the fake row but not @@ -68,7 +68,7 @@ def __init__(self) -> None: self._next_id = 1 # Populated when ``delete_with_lease`` receives a ``dlq_payload``. Mirrors # the real client's CTE side-effect: outbox row gone + DLQ row created in - # the same call. Tests assert against ``broker.fake_client.dlq_rows``. + # the same call. Tests assert against ``test_broker.fake_client.dlq_rows``. self._dlq_rows: list[dict[str, typing.Any]] = [] def feed( @@ -562,12 +562,14 @@ class TestOutboxBroker(TestBroker[OutboxBroker, OutboxBroker]): # ty: ignore[in Default (``run_loops=False``): ``broker.publish`` synchronously drives the matching subscriber's consume pipeline, so handlers run before ``publish`` returns. Matches the FastStream test-broker idiom — ``TestKafkaBroker`` / ``TestRabbitBroker`` behave the - same way. Future-dated rows (``activate_in`` / ``activate_at``) stay in the fake - client and are *not* dispatched, mirroring production where they wait for the gate. + same way. Future-dated rows (``activate_in`` / ``activate_at``) **fire immediately** in + sync mode — sync dispatch ignores ``next_attempt_at``. The future-dated gate only applies + in loop mode (``run_loops=True``), where the fetch loop honors ``next_attempt_at``. Pass ``run_loops=True`` to spin up the real ``_fetch_loop`` / ``_worker_loop`` against the in-memory client. Required for tests that exercise loop-driven behavior: - retry rescheduling, lease expiry reclaim, or fetch-loop error recovery. + retry rescheduling, lease expiry reclaim, scheduled-delivery waiting, or fetch-loop + error recovery. """ fake_client: FakeOutboxClient diff --git a/planning/active/2026-06-12-docs-audit-findings.md b/planning/archived/2026-06-12-docs-audit-findings.md similarity index 98% rename from planning/active/2026-06-12-docs-audit-findings.md rename to planning/archived/2026-06-12-docs-audit-findings.md index 33e7c29..3edae1f 100644 --- a/planning/active/2026-06-12-docs-audit-findings.md +++ b/planning/archived/2026-06-12-docs-audit-findings.md @@ -1,7 +1,14 @@ --- +status: shipped date: 2026-06-12 +slug: 2026-06-12-docs-audit-findings scope: docs/ (22 user-facing mkdocs pages) -status: triage +prs: [63, 64, 72] +outcome: > + All findings remediated, nothing deferred. Bugs B1–B14 (#63) + + inaccuracies I1–I22 (#64) shipped; improvements P1–P25 + the 4 + source-side docstring/comment notes resolved in #72. Docs-only — not + tied to a package release. --- # Docs audit findings — 2026-06-12