Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ process, no Kafka.
`opentelemetry`), Postgres setup.
- [Basic usage](usage/basic.md) — declare the table, create the
broker, publish a row, register a subscriber.
- [Tutorial: Your first outbox app](tutorials/first-outbox-app.md) —
build a working publisher / subscriber from scratch.
- [Tutorial: Add a Kafka relay](tutorials/add-kafka-relay.md) — extend
the first app to forward outbox rows to Kafka.

### Concepts

Expand All @@ -61,6 +65,8 @@ process, no Kafka.
- [Comparison](concepts/comparison.md) — vs writing your own, vs CDC,
vs Kafka transactions, vs `LISTEN/NOTIFY`, vs Celery, vs FastStream
foreign-broker direct.
- [Instrumentation seams](concepts/instrumentation-seams.md) — the
recorder seam vs native middleware, and why both exist.

### Guides

Expand All @@ -74,6 +80,8 @@ process, no Kafka.
loop-driven modes.
- [Schema validation](usage/schema-validation.md) — opt-in
Alembic-driven check for `/health` and CI.
- [Setup Prometheus and OpenTelemetry](usage/setup-prometheus-opentelemetry.md)
— wire the native middleware and recorder adapters end-to-end.

### Reference

Expand All @@ -87,3 +95,12 @@ process, no Kafka.
via a single CTE, `dlq_written` metric, retention patterns.
- [Observability](usage/observability.md) — recorder seam plus
native Prometheus / OpenTelemetry middleware.

### Operations

- [Production checklist](operations/checklist.md) — connection budget,
lease TTL sizing, and deploy-safety items before going live.
- [Troubleshooting](operations/troubleshooting.md) — common symptoms
(idle latency, `lease_lost` spikes, connection exhaustion) and fixes.
- [Alembic migrations](operations/alembic.md) — autogenerate the outbox
table and its partial indexes.
5 changes: 4 additions & 1 deletion docs/introduction/how-it-works.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,14 @@ and terminal-by-failure rows are copied into a sibling audit table in the
same Postgres statement as the `DELETE`:

```python
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import create_async_engine
from faststream_outbox import OutboxBroker, make_dlq_table, make_outbox_table

metadata = MetaData()
outbox_table = make_outbox_table(metadata, table_name="outbox")
dlq_table = make_dlq_table(metadata, table_name="outbox_dlq")
engine = create_async_engine("postgresql+asyncpg://outbox:outbox@localhost:5432/outbox")
broker = OutboxBroker(engine, outbox_table=outbox_table, dlq_table=dlq_table)
```

Expand Down Expand Up @@ -180,7 +183,7 @@ you add).

## Relay to Kafka / RabbitMQ / NATS / Redis

> **Relay outbox rows to Kafka / RabbitMQ / NATS / Redis with a single decorator → [Relay tutorial](../usage/relay.md).**
> **Relay outbox rows to Kafka / RabbitMQ / NATS / Redis with a single decorator → [Relay tutorial](../tutorials/add-kafka-relay.md).**

## Acknowledgements

Expand Down
7 changes: 7 additions & 0 deletions docs/introduction/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,10 @@ Combine extras with commas:
```bash
pip install 'faststream-outbox[asyncpg,fastapi,prometheus]'
```

Or use the `all` extra to pull in every optional extra at once
(`asyncpg`, `validate`, `fastapi`, `prometheus`, `opentelemetry`):

```bash
pip install 'faststream-outbox[all]'
```
8 changes: 8 additions & 0 deletions docs/operations/alembic.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ If you also pass `dlq_table=make_dlq_table(metadata)` when
constructing the broker, `validate_schema()` checks both tables in
one call and surfaces drift on either one.

Both autogenerate and `validate_schema()` run with
`compare_server_default=False`, so **server-default drift is not
detected** — neither the autogenerated migration nor the drift gate will
flag a column whose `server_default` is missing or wrong. The
consequence that bites is a missing `server_default=now()` on
`next_attempt_at`; see the server-defaults caveat in
[Schema validation](../usage/schema-validation.md).

## DLQ retention via partition drop { #dlq-retention-via-partition-drop }

Plain `DELETE FROM outbox_dlq WHERE failed_at < now() - interval '90
Expand Down
6 changes: 3 additions & 3 deletions docs/operations/checklist.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ story.
unbounded; pair with a non-`NoRetry()` retry strategy or
wedge-prone handlers can replay forever.
- [ ] **Retry strategy chosen.** Default
`ExponentialRetry(initial=1, multiplier=2, max=300, attempts=10,
jitter=0.2)` is fine for most. Opt into `NoRetry()` explicitly for
an audit feed.
`ExponentialRetry(initial_delay_seconds=1.0, multiplier=2.0,
max_delay_seconds=300.0, max_attempts=10, jitter_factor=0.2)` is fine
for most. Opt into `NoRetry()` explicitly for an audit feed.

## DLQ

Expand Down
4 changes: 2 additions & 2 deletions docs/tutorials/add-kafka-relay.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ You should see:
outbox-kafka | [2026-06-12 05:22:33,782] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)
```

## Step 2: Install `faststream[kafka]`
## Step 2: Install `faststream[cli,kafka]`

```bash
uv add 'faststream[kafka]'
uv add 'faststream[cli,kafka]'
```

You should see:
Expand Down
5 changes: 5 additions & 0 deletions docs/tutorials/first-outbox-app.md
Original file line number Diff line number Diff line change
Expand Up @@ -317,5 +317,10 @@ docker stop outbox-postgres
`OutboxPublisher` decorator, chained publishing.
- [FastAPI integration](../usage/fastapi.md) — wire the outbox into
a real HTTP service with `Depends(get_session)`.
- [Schema validation](../usage/schema-validation.md) — this tutorial
installed the `validate` extra; call `validate_schema()` from a
startup hook or `/health` check to catch a table that drifted from
what the broker expects (e.g. a missing partial index after a
migration).
- [Tutorial: Add a Kafka relay](./add-kafka-relay.md) — extend this
app to forward each row into Kafka with one stacked decorator.
2 changes: 1 addition & 1 deletion docs/usage/basic.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async def publish_one() -> None:
await broker.publish(1, queue="orders", session=session)
```

Run with `faststream run app:app`.
Save this module as `app.py`, then run with `faststream run app:app`.

## Connection ownership

Expand Down
57 changes: 43 additions & 14 deletions docs/usage/fastapi.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,30 @@ pip install 'faststream-outbox[fastapi]'
from collections.abc import AsyncIterator

from fastapi import Depends, FastAPI
from pydantic import BaseModel
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

from faststream_outbox import make_outbox_table
from faststream_outbox.fastapi import OutboxRouter


class OrderIn(BaseModel):
item: str


class Base(DeclarativeBase):
pass


class Order(Base):
__tablename__ = "orders"

id: Mapped[int] = mapped_column(primary_key=True)
item: Mapped[str]


metadata = MetaData()
outbox_table = make_outbox_table(metadata, table_name="outbox")
engine = create_async_engine("postgresql+asyncpg://localhost/app")
Expand Down Expand Up @@ -55,8 +72,10 @@ async def create_order(
order: OrderIn,
session: AsyncSession = Depends(get_session),
) -> dict:
session.add(Order(...))
await router.broker.publish({"order_id": ...}, queue="orders", session=session)
db_order = Order(item=order.item)
session.add(db_order)
await session.flush() # populate db_order.id
await router.broker.publish({"order_id": db_order.id}, queue="orders", session=session)
return {"ok": True}


Expand All @@ -77,9 +96,10 @@ the same way it would in an HTTP endpoint: a fresh `AsyncSession` per
delivery, opened in a `session.begin()` block, committed on handler return,
rolled back on exception.

A handler can therefore receive the **same** `AsyncSession` it would in an
HTTP route — and `OutboxResponse(session=...)` commits the follow-on row
with the handler's domain writes. See [Chained
A handler's `AsyncSession` is therefore resolved exactly as in an HTTP
route — a fresh session per delivery, not a shared instance — and
`OutboxResponse(session=...)` commits the follow-on row with the handler's
domain writes. See [Chained
publishing](./publisher.md#chained-publishing).

## Annotated context shortcuts
Expand Down Expand Up @@ -112,15 +132,24 @@ there resolves as a request field and fails with a 422.

## What's intentionally not exposed

`apply_types` and the broker's FastStream `dependencies` argument are
intentionally **not exposed** on `OutboxRouter.__init__`:

- `StreamRouter` forces `apply_types=False` because FastAPI's FastDepends
takes over the parameter resolution. Letting the user flip it would
produce weird half-resolved handlers.
- `dependencies` on the router signature means FastAPI `Depends(...)` only
— the broker's FastStream `Dependant` list is the wrong shape for this
flow.
Several `OutboxBroker.__init__` arguments are intentionally **not exposed**
on `OutboxRouter.__init__`:

- `apply_types` — `StreamRouter` forces `apply_types=False` because
FastAPI's FastDepends takes over the parameter resolution. Letting the
user flip it would produce weird half-resolved handlers.
- `dependencies` — on the router signature this means FastAPI
`Depends(...)` only; the broker's FastStream `Dependant` list is the
wrong shape for this flow.
- `dlq_table`, `metrics_recorder`, and `routers` — simply not forwarded
through the router today.

The router builds the broker for you and gives you no handle to a
pre-constructed `OutboxBroker`, and all three of those arguments are
constructor-only on the broker (no setters). So a FastAPI user **cannot**
enable the [DLQ](./dlq.md) or the [metrics-recorder
seam](./observability.md) through `OutboxRouter` today — there is no
router-based workaround. Use the standalone `OutboxBroker` for those.

If you need broker-level FastStream middlewares, pass them to
`OutboxRouter(middlewares=[...])` — the router builds the broker for you, so
Expand Down
10 changes: 6 additions & 4 deletions docs/usage/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ broken recorder never poisons the dispatch loop.

| Event | Tags (always present) | Tags (situational) | Fired by |
|---|---|---|---|
| `fetched` | `queue`, `subscriber`, `count` | | Fetch loop, every cycle (including empty) |
| `fetched` | `queue`, `subscriber`, `count` | | Fetch loop, once per fetch attempt (`count=0` on an empty fetch) — **skipped** when the in-flight queue is full (no fetch is issued). `queue` is tagged with the subscriber's **first** queue only; multi-queue subscribers should break down by queue using the row-level events instead |
| `dispatched` | `queue`, `subscriber`, `deliveries_count`, `size_bytes` | | Worker loop, before handler runs |
| `acked` | `queue`, `subscriber`, `deliveries_count`, `duration_seconds` | | Handler returned successfully |
| `nacked_retried` | `queue`, `subscriber`, `deliveries_count`, `duration_seconds`, `next_delay_seconds` | `exception_type` | Retry scheduled |
Expand Down Expand Up @@ -89,12 +89,14 @@ faststream_received_messages_in_process{broker="outbox"}
# Operator playbook: lease_ttl_seconds is too low for this handler's P99
rate(faststream_outbox_lease_lost_total[5m]) > 0

# Publish throughput per queue
rate(faststream_published_messages_total{broker="outbox",status="success"}[1m])
# Publish throughput per queue (publish metrics are tagged by `destination`)
sum by (destination) (
rate(faststream_published_messages_total{broker="outbox",status="success"}[1m]))

# P99 publish (INSERT) latency per queue
histogram_quantile(0.99,
rate(faststream_published_messages_duration_seconds_bucket{broker="outbox"}[5m]))
sum by (destination, le) (
rate(faststream_published_messages_duration_seconds_bucket{broker="outbox"}[5m])))

# DLQ misconfiguration: terminal-failure rate diverges from DLQ-write rate
rate(faststream_outbox_terminal_total[5m])
Expand Down
18 changes: 17 additions & 1 deletion docs/usage/publisher.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async def checkout(order: Order, session: AsyncSession) -> None:
await session.commit() # row + domain commit together
```

Per-call `headers` are merged with the decorator's static headers
Per-call `headers` are merged with the publisher's static headers
(per-call wins).

The publisher exists primarily for AsyncAPI spec coverage and to
Expand Down Expand Up @@ -130,9 +130,16 @@ transactional contract applies (you provide the session, the row commits
with your domain writes):

```python
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession

from faststream_outbox import OutboxResponse
from faststream_outbox.annotations import OutboxMessage

# `get_session` is your app's session dependency — the same one your HTTP
# routes use. `Depends(...)` resolves inside a handler only under the FastAPI
# integration; see the engine / sessionmaker setup in the FastAPI guide.


@broker.subscriber("orders")
async def handle(
Expand All @@ -152,6 +159,15 @@ explicitly — useful for trace stitching. Plain returns (`None`, `dict`,
etc.) are silently skipped, so handlers that don't want to chain just
return normally.

!!! warning "Duplicate delivery on crash"
The chained `downstream` row commits with the handler's transaction,
but the inbound `orders` row's terminal `DELETE` runs **after** the
handler returns, on the worker's separate autocommit connection. A
crash between those two points leaves the inbound row undeleted, so it
is redelivered — producing a **second** chained row. For non-idempotent
chains, pass a deterministic `timer_id` derived from the inbound message
so the duplicate insert is a no-op (see [Timers](./timers.md)).

## Annotated handler params

`faststream_outbox.annotations` exports `Annotated[..., Context(...)]`
Expand Down
17 changes: 13 additions & 4 deletions docs/usage/relay.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ the rest of the outbox subscriber's behavior.

## Two-broker lifecycle

Both brokers must be started for the relay to work. Two idiomatic shapes:
Both brokers must be started for the relay to work. There's a built-in
safety net: at `start()` the outbox broker logs a WARNING (one per unstarted
foreign broker) naming the affected queue(s), and a relay to an unstarted
foreign broker simply fails-and-retries until that broker is started — the
row is never lost. Two idiomatic shapes:

### FastAPI (recommended)

Expand Down Expand Up @@ -121,6 +125,7 @@ are **not** forwarded to the foreign publish. Two ways to override:

```python
from faststream.response import Response
from faststream_outbox.annotations import OutboxMessage

@publisher_kafka
@broker_outbox.subscriber("outbox_queue")
Expand Down Expand Up @@ -170,13 +175,17 @@ async def relay(body: dict) -> OutboxResponse:

This would both insert a row into the outbox AND publish to Kafka. The
subscriber raises `RuntimeError` at dispatch time when it detects the
combination — pick one path.
combination — pick one path. The worker catches that error and logs it at
ERROR; it does **not** flush a nack and does **not** route the row through
the `retry_strategy`. The row's lease simply expires and a later fetch
reclaims it, so the row keeps being retried (not lost) until you fix the
configuration.

**Do not** stack an outbox publisher on a foreign subscriber.

```python
@broker_outbox.publisher("outbox_queue")
@broker_kafka.subscriber("kafka_topic") # NotImplementedError at decoration
@broker_outbox.publisher("outbox_queue") # NotImplementedError at decoration
@broker_kafka.subscriber("kafka_topic")
async def relay(body: dict) -> dict:
return body
```
Expand Down
12 changes: 12 additions & 0 deletions docs/usage/schema-validation.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ schema (`add_*` / `modify_*` ops). `remove_*` ops are silently dropped so
you can attach your own audit columns or additional indexes without the
validator complaining.

!!! warning "Server defaults are not checked"
The diff runs with `compare_server_default=False` — Alembic's
server-default comparison is flaky against Postgres' normalized
expressions (`now()` vs `CURRENT_TIMESTAMP`), so it is disabled to avoid
false positives. A **green** `validate_schema()` therefore does **not**
prove your server defaults exist. The load-bearing case: a table missing
`server_default=now()` on `next_attempt_at` leaves fresh rows with NULL
`next_attempt_at`, which the fetch CTE's `next_attempt_at <= now()`
predicate silently filters out — a silent broker outage that validation
will not catch. Generate your migration from `make_outbox_table(...)` so
the defaults are in place to begin with.

## Where to call it

Call it from a `/health` endpoint or startup hook — **not** at
Expand Down
Loading