Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions docs/concepts/comparison.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,8 @@ no replay, no persistence, no retry.

`faststream-outbox` keeps the **outbox row** as the durability boundary
and uses NOTIFY only as a wake-up short-circuit on top of polling. If
the NOTIFY is lost — listener reconnecting, channel name too long
(Postgres' 63-char limit), `LISTEN` setup failed at startup — the
subscriber still finds the row on its next poll cycle. The worst case
the NOTIFY is lost — listener reconnecting, or `LISTEN` setup failed at
startup — the subscriber still finds the row on its next poll cycle. The worst case
is one `max_fetch_interval` of idle latency (default 10 seconds), not
data loss.

Expand Down
10 changes: 2 additions & 8 deletions docs/concepts/instrumentation-seams.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ This is the "spans + bus parity" mode the native middleware

## What the middleware seam *can't* observe

Four events fire **outside** the handler invocation, with no
Three events fire **outside** the handler invocation, with no
`StreamMessage` in scope:

- **`fetched` ticks (including empty fetches).** Emitted by the fetch
Expand All @@ -57,19 +57,14 @@ Four events fire **outside** the handler invocation, with no
the `max_deliveries` ceiling and was dropped *without invoking the
handler*. No handler call = no `consume_scope`. The middleware has
nothing to wrap.
- **The empty-fetch idle counter.** Same shape as `fetched` ticks —
fires when the fetch loop went a round without finding anything to
claim. Useful for tuning `min_fetch_interval` and `max_fetch_interval`.
The middleware bus has no concept of "the broker checked and found
nothing."

## What the recorder seam observes naturally

The recorder is a `Callable[[str, Mapping[str, Any]], None]` invoked at
six subscriber events and one producer event. Plus `dlq_written` when
the DLQ is configured. It fires whether or not a handler is in scope:

- All four bus-invisible events above.
- All three bus-invisible events above.
- Plus `acked` / `nacked_retried` / `nacked_terminal` / `dispatched` /
`published` from inside the handler-execution paths, with explicit
`subscriber` and `queue` tags.
Expand All @@ -90,7 +85,6 @@ physically cannot observe.
| `fetched` ticks (including empty) | ❌ (no `StreamMessage` at fetch time) | ✅ |
| `lease_lost` after `consume_scope` exits | ❌ | ✅ |
| `nacked_terminal(reason="max_deliveries")` before consume opens | ❌ | ✅ |
| Empty-fetch idle counter | ❌ | ✅ |

## Operator implication

Expand Down
17 changes: 13 additions & 4 deletions docs/introduction/how-it-works.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ round-trip for many rows.

The producer also emits `SELECT pg_notify('outbox_<table>', queue)` on the
caller's session right after the INSERT, **except** when the row is
future-dated (`activate_in` / `activate_at` set) or a `timer_id` conflict
made the insert a no-op. NOTIFY is transactional, so listeners only see it
genuinely future-dated (a future `activate_in` / `activate_at` — a *past*
`activate_at`, e.g. a recovered idempotency token, still notifies) or a
`timer_id` conflict made the insert a no-op. NOTIFY is transactional, so listeners only see it
after the user's transaction commits — atomicity with the row insert is
automatic.

Expand All @@ -68,15 +69,23 @@ WITH claimed AS (
acquired_token IS NULL
OR acquired_at < now() - make_interval(secs => :lease_ttl)
)
ORDER BY id
ORDER BY next_attempt_at, id
LIMIT :batch
FOR UPDATE SKIP LOCKED
)
UPDATE outbox SET acquired_token = :uuid, acquired_at = now()
UPDATE outbox
SET acquired_token = :uuid, acquired_at = now(),
deliveries_count = deliveries_count + 1
WHERE id IN (SELECT id FROM claimed)
RETURNING *
```

This is **simplified for illustration**. The real query writes each `OR`
disjunct with its own partial-index predicate spelled out as a conjunct, so
Postgres can use the `outbox_pending_idx` / `outbox_lease_idx` partial
indexes instead of a seq-scan — the naive `OR` above is the exact shape the
code avoids.

The CTE reclaims both unleased rows AND rows whose lease has expired
(`acquired_at < now() - lease_ttl_seconds`), so there is no separate stuck-row
reaper. The idle-sleep is short-circuited by NOTIFY via an `asyncio.Event` —
Expand Down
9 changes: 6 additions & 3 deletions docs/introduction/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ docker run -d -p 5432:5432 \

## Optional extras

The base install ships only the SQLAlchemy-driven polling broker. Each
optional extra unlocks one feature; nothing else changes if you omit them.
The base install ships only `faststream` + `sqlalchemy[asyncio]` — **no
async Postgres driver**, so you must install one (the `asyncpg` extra below,
or another async driver such as `psycopg`); the `postgresql+asyncpg://` DSNs
in the examples need `asyncpg` specifically. Each optional extra unlocks one
feature; nothing else changes if you omit them.

| Extra | Install | What it enables |
|---|---|---|
| `asyncpg` | `pip install 'faststream-outbox[asyncpg]'` | The `asyncpg` SQLAlchemy driver. Required to get `LISTEN/NOTIFY` short-circuit wakeups in the subscriber's fetch loop — without it the loop falls back to plain polling, which adds up to `max_fetch_interval` (default 10s) of idle latency between an INSERT and a dispatch. |
| `asyncpg` | `pip install 'faststream-outbox[asyncpg]'` | The `asyncpg` SQLAlchemy driver. Required to get `LISTEN/NOTIFY` short-circuit wakeups in the subscriber's fetch loop. With a *different* async driver (e.g. `psycopg`) but no `asyncpg`, the broker still works but the loop falls back to plain polling, which adds up to `max_fetch_interval` (default 10s) of idle latency between an INSERT and a dispatch; with no async driver at all the engine can't connect. |
| `fastapi` | `pip install 'faststream-outbox[fastapi]'` | The `faststream_outbox.fastapi.OutboxRouter` — see [FastAPI integration](../usage/fastapi.md). |
| `validate` | `pip install 'faststream-outbox[validate]'` | Alembic, for `broker.validate_schema()` — see [Schema validation](../usage/schema-validation.md). Calling `validate_schema()` without this extra raises `ImportError`; every other code path works. |
| `prometheus` | `pip install 'faststream-outbox[prometheus]'` | The `PrometheusRecorder` metrics adapter and native `OutboxPrometheusMiddleware` — see [Observability](../usage/observability.md). |
Expand Down
9 changes: 5 additions & 4 deletions docs/operations/alembic.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ op.create_table('outbox',
sa.Column('id', sa.BigInteger(), autoincrement=True, nullable=False),
sa.Column('queue', sa.String(length=255), nullable=False),
sa.Column('payload', sa.LargeBinary(), nullable=False),
sa.Column('headers', postgresql.JSONB(astext_type=Text()), nullable=True),
sa.Column('headers', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column('attempts_count', sa.BigInteger(), server_default='0', nullable=False),
sa.Column('deliveries_count', sa.BigInteger(), server_default='0', nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
Expand Down Expand Up @@ -66,8 +66,9 @@ maintained by the broker; no application code touches them.
The `# please adjust!` comment from Alembic is misleading here —
**don't adjust**. The column types, predicates, and indexes are
exactly what the broker depends on. The
[`validate_schema()`](../usage/schema-validation.md) check will refuse
to start a service whose live DB drifts from this declaration.
[`validate_schema()`](../usage/schema-validation.md) check — when you wire
it into a `/health` probe or CI gate — fails when the live DB drifts from
this declaration. (It is opt-in; it never runs at `broker.start()`.)

## Adding the DLQ after the fact

Expand All @@ -83,7 +84,7 @@ op.create_table('outbox_dlq',
sa.Column('original_id', sa.BigInteger(), nullable=False),
sa.Column('queue', sa.String(length=255), nullable=False),
sa.Column('payload', sa.LargeBinary(), nullable=False),
sa.Column('headers', postgresql.JSONB(astext_type=Text()), nullable=True),
sa.Column('headers', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column('deliveries_count', sa.BigInteger(), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('failed_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
Expand Down
7 changes: 4 additions & 3 deletions docs/operations/checklist.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ story.
writer per worker + one fetch) plus one raw asyncpg connection for
`LISTEN`. Sub-budget formula in [Subscriber § Connection
budget](../usage/subscriber.md#connection-budget).
- [ ] **Postgres `max_connections` ≥ `replicas × Σ subs × (max_workers + 1)`**
— the formula is per-process; rolling deploys multiply it.
Failure mode: pods refuse with `FATAL: too many connections`.
- [ ] **Postgres `max_connections` ≥ `replicas × Σ subs × (max_workers + 2)`**
— `max_workers + 1` pool connections **plus** the raw asyncpg `LISTEN`
connection per subscriber; the formula is per-process and rolling deploys
multiply it. Failure mode: pods refuse with `FATAL: too many connections`.

## Subscribers

Expand Down
23 changes: 14 additions & 9 deletions docs/operations/troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ and a link into the reference page that owns the underlying design.
| [Outbox row count grows + `lease_lost` spike](#outbox-row-count-grows-lease_lost-spike) | DLQ CTE failing (DLQ schema drift) |
| [Outbox row count grows, no `lease_lost`](#outbox-row-count-grows-no-lease_lost) | Fetch loop not running, or rows future-dated |
| [Idle dispatch latency > `max_fetch_interval`](#idle-dispatch-latency-max_fetch_interval) | LISTEN setup failed → polling fallback |
| [Subscriber blocks at `broker.start()`](#subscriber-blocks-at-brokerstart) | Engine pool exhausted on writer-connection checkout |
| [Subscriber dispatch never starts; rows pile up](#subscriber-blocks-at-brokerstart) | Engine pool exhausted on writer-connection checkout |
| [Duplicate handler invocations](#duplicate-handler-invocations) | Lease expired before handler returned, or handler not idempotent |
| [Rolling deploy leaks rows](#rolling-deploy-leaks-rows) | `graceful_timeout` < handler P99, or k8s grace too short |
| [`activate_in` / `activate_at` fires immediately in tests](#activate_in-activate_at-fires-immediately-in-tests) | `TestOutboxBroker(run_loops=False)` ignores scheduling |
Expand Down Expand Up @@ -112,24 +112,29 @@ engine URL (`postgresql+asyncpg://...`). Restart the subscriber.
](../introduction/installation.md#optional-extras), [How it works §
Fetch loop](../introduction/how-it-works.md#subscriber-two-async-loops).

## Subscriber blocks at `broker.start()` { #subscriber-blocks-at-brokerstart }
## Subscriber dispatch never starts; rows pile up { #subscriber-blocks-at-brokerstart }

**Symptom.** Process hangs on `broker.start()` (or the FastAPI
`include_router` lifespan) and never completes startup.
**Symptom.** Rows are published but never dispatched (the table grows) and
the subscriber's loops emit repeating reconnect ERROR logs. `broker.start()`
(or the FastAPI `include_router` lifespan) itself returns normally — it only
schedules the loop tasks, so the failure shows up *after* startup, not as a
hang.

**Likely cause.** SQLAlchemy pool exhausted on the per-worker writer
connection checkout. Each subscriber needs `max_workers + 1` pool
connections; the default pool is `pool_size=5, max_overflow=10`. A
handful of single-worker subscribers fits, but a fleet of high-
`max_workers` subscribers does not.
connection checkout — the fetch/worker loops can't acquire their
connections, so each cycle errors and backs off. Each subscriber needs
`max_workers + 1` pool connections; the default pool is `pool_size=5,
max_overflow=10`. A handful of single-worker subscribers fits, but a fleet
of high-`max_workers` subscribers does not.

**Diagnose.** Inspect the engine pool. Compute `Σ subs × (max_workers
+ 1)` from your subscriber registrations and compare to
`pool_size + max_overflow`.

**Fix.** Raise `pool_size` / `max_overflow` on the engine, OR lower
`max_workers` per subscriber. Also confirm Postgres
`max_connections ≥ replicas × Σ subs × (max_workers + 1)` — rolling
`max_connections ≥ replicas × Σ subs × (max_workers + 2)` (the pool's
`max_workers + 1` plus the raw `LISTEN` connection) — rolling
deploys multiply the demand.

**Reference.** [Subscriber § Connection
Expand Down
20 changes: 13 additions & 7 deletions docs/tutorials/add-kafka-relay.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,25 @@ relay and seen the row arrive at a `kafka-console-consumer`.

- You finished [Tutorial: Your first outbox app](./first-outbox-app.md).
This tutorial extends that same `app.py`, the same `outbox-postgres`
container, and the same project directory.
container, and the same project directory. **If you ran Tutorial 1's
final cleanup**, its `--rm` Postgres container (and its data) is gone —
re-run Tutorial 1's Postgres-start and schema-creation steps first; this
tutorial assumes `outbox-postgres` is up with the `outbox` table.
- Docker Compose (the `docker compose` CLI) for the Kafka container.
- Another ten minutes.

## Step 1: Add Kafka via docker-compose

Postgres is already running from Tutorial 1. Add Kafka via a small
`docker-compose.yml`. Single-broker [KRaft
mode](https://kafka.apache.org/documentation/#kraft) — no separate
Postgres should still be running from Tutorial 1 (see the note above if you
ran its cleanup). Add Kafka via a small `docker-compose.yml`. Single-broker
[KRaft mode](https://kafka.apache.org/documentation/#kraft) — no separate
ZooKeeper service, and Confluent's `cp-kafka:7.6.0` image is known to
run well on Apple Silicon. Two listeners: one for clients on the host
(your `faststream run` process) and one for clients inside the Docker
network (the `kafka-console-consumer` we'll use in Step 5).
run well on Apple Silicon. Two listeners: one on the host at `localhost:9092`
(for your `faststream run` process) and one inside the Docker network at
`kafka:29092` (inter-broker traffic). The Step 5 console consumer runs
*inside* the broker container via `docker compose exec`, so it reaches the
host listener at `kafka:9092` directly — no separate in-network client
listener is needed for it.

```yaml title="docker-compose.yml"
services:
Expand Down
4 changes: 3 additions & 1 deletion docs/usage/dlq.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ Tags:
| `subscriber` | Subscriber handler name (`call_name`). |
| `deliveries_count` | Attempt count at terminal flush. |
| `failure_reason` | Same value set as the schema column. |
| `exception_type` | Present only when `last_exception` was set (omitted for `max_deliveries` and manual `reject()` without an exception). |
| `exception_type` | Always present; the exception class name, or `None` for terminals with no exception (`max_deliveries`, or a manual `reject()` without one). Custom recorders always see the key. |

The bundled adapters surface the event without further wiring:

Expand Down Expand Up @@ -187,6 +187,8 @@ side-effect: the source row is removed from `fake_client.rows` and an
audit dict is appended to `fake_client.dlq_rows` in the same call.

```python
from sqlalchemy import MetaData

from faststream_outbox import NoRetry, OutboxBroker, TestOutboxBroker, make_dlq_table, make_outbox_table


Expand Down
7 changes: 4 additions & 3 deletions docs/usage/fastapi.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,10 @@ intentionally **not exposed** on `OutboxRouter.__init__`:
— the broker's FastStream `Dependant` list is the wrong shape for this
flow.

If you need broker-level FastStream middlewares or dependencies, set them
on the broker before mounting the router and use the FastAPI `Depends(...)`
mechanism in handlers.
If you need broker-level FastStream middlewares, pass them to
`OutboxRouter(middlewares=[...])` — the router builds the broker for you, so
there is no separate broker to configure. Use the FastAPI `Depends(...)`
mechanism in handlers for dependencies.

## Engine ownership

Expand Down
14 changes: 7 additions & 7 deletions docs/usage/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ MetricsRecorder = Callable[[str, Mapping[str, Any]], None]
The default (`_noop_recorder`) lets instrumentation sites call
unconditionally. The recorder threads through `OutboxBrokerConfig` to:

- The subscriber's six emission points via `OutboxSubscriber._emit_metric`
- The subscriber's seven emission points via `OutboxSubscriber._emit_metric`
- The producer's single emission point via `OutboxProducer._emit_metric`

### Bare seam
Expand All @@ -28,7 +28,7 @@ from faststream_outbox import MetricsRecorder, OutboxBroker

def recorder(event: str, tags: dict) -> None:
# event ∈ {fetched, dispatched, acked, nacked_retried, nacked_terminal,
# lease_lost, published}
# lease_lost, dlq_written, published}
# tags always include "queue"; subscriber-side events also include "subscriber"
print(event, tags)

Expand All @@ -52,13 +52,13 @@ broken recorder never poisons the dispatch loop.
| Event | Tags (always present) | Tags (situational) | Fired by |
|---|---|---|---|
| `fetched` | `queue`, `subscriber`, `count` | | Fetch loop, every cycle (including empty) |
| `dispatched` | `queue`, `subscriber` | | Worker loop, before handler runs |
| `acked` | `queue`, `subscriber` | `duration_seconds` | Handler returned successfully |
| `dispatched` | `queue`, `subscriber`, `deliveries_count`, `size_bytes` | | Worker loop, before handler runs |
| `acked` | `queue`, `subscriber`, `deliveries_count`, `duration_seconds` | | Handler returned successfully |
| `nacked_retried` | `queue`, `subscriber`, `deliveries_count`, `duration_seconds`, `next_delay_seconds` | `exception_type` | Retry scheduled |
| `nacked_terminal` | `queue`, `subscriber`, `deliveries_count`, `reason` | `exception_type` | Row terminally failed |
| `lease_lost` | `queue`, `phase`, `row_id`, `deliveries_count` | | Terminal write found `rowcount == 0` |
| `nacked_terminal` | `queue`, `subscriber`, `deliveries_count`, `reason` | `duration_seconds`, `exception_type` | Row terminally failed (`duration_seconds` absent for `max_deliveries`, which never ran the handler) |
| `lease_lost` | `queue`, `subscriber`, `phase`, `row_id`, `deliveries_count` | | Terminal or retry write found `rowcount == 0` (`phase` = `terminal` \| `retry`) |
| `published` | `queue`, `status`, `count`, `size_bytes`, `duration_seconds` | `exception_type` | Producer, after the INSERT executes (pre-commit; also fires on error with `status="error"`) |
| `dlq_written` | `queue`, `subscriber`, `deliveries_count`, `failure_reason` | `exception_type` | DLQ CTE wrote an audit row |
| `dlq_written` | `queue`, `subscriber`, `deliveries_count`, `failure_reason`, `exception_type` | | DLQ CTE wrote an audit row (`exception_type` is `None` when the terminal had no exception) |

`reason` on `nacked_terminal` is one of `max_deliveries`,
`retry_terminal`, `rejected`. The same value lands in the DLQ
Expand Down
6 changes: 4 additions & 2 deletions docs/usage/publisher.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ contract.

For "consume from queue A → enqueue to queue B" relays, either call
`broker.publish(value, queue="B", session=session)` directly inside your
handler — on the same session that owns the inbound row's terminal write —
or `return OutboxResponse(...)` (see below).
handler — on the same session that holds your domain writes — or
`return OutboxResponse(...)` (see below). (The inbound row's own terminal
DELETE runs separately, on the worker's autocommit connection, not this
session.)

## Chained publishing

Expand Down
4 changes: 3 additions & 1 deletion docs/usage/schema-validation.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ everything the broker needs at runtime.
`broker.validate_schema()` delegates to Alembic's
`autogenerate.compare_metadata` against a throwaway `MetaData` populated
by `make_outbox_table(...)`. The canonical `Table` is the single source
of truth; the validator never duplicates the schema declaration.
of truth; the validator never duplicates the schema declaration. When the
broker was constructed with a `dlq_table`, `validate_schema()` runs a
second pass over the DLQ table the same way.

## Install

Expand Down
Loading