diff --git a/docs/concepts/comparison.md b/docs/concepts/comparison.md
index 801909c..b06951a 100644
--- a/docs/concepts/comparison.md
+++ b/docs/concepts/comparison.md
@@ -101,9 +101,8 @@ no replay, no persistence, no retry.
`faststream-outbox` keeps the **outbox row** as the durability boundary
and uses NOTIFY only as a wake-up short-circuit on top of polling. If
-the NOTIFY is lost — listener reconnecting, channel name too long
-(Postgres' 63-char limit), `LISTEN` setup failed at startup — the
-subscriber still finds the row on its next poll cycle. The worst case
+the NOTIFY is lost — listener reconnecting, or `LISTEN` setup failed at
+startup — the subscriber still finds the row on its next poll cycle. The worst case
is one `max_fetch_interval` of idle latency (default 10 seconds), not
data loss.
diff --git a/docs/concepts/instrumentation-seams.md b/docs/concepts/instrumentation-seams.md
index 04f43f7..5e561e8 100644
--- a/docs/concepts/instrumentation-seams.md
+++ b/docs/concepts/instrumentation-seams.md
@@ -39,7 +39,7 @@ This is the "spans + bus parity" mode the native middleware
## What the middleware seam *can't* observe
-Four events fire **outside** the handler invocation, with no
+Three events fire **outside** the handler invocation, with no
`StreamMessage` in scope:
- **`fetched` ticks (including empty fetches).** Emitted by the fetch
@@ -57,11 +57,6 @@ Four events fire **outside** the handler invocation, with no
the `max_deliveries` ceiling and was dropped *without invoking the
handler*. No handler call = no `consume_scope`. The middleware has
nothing to wrap.
-- **The empty-fetch idle counter.** Same shape as `fetched` ticks —
- fires when the fetch loop went a round without finding anything to
- claim. Useful for tuning `min_fetch_interval` and `max_fetch_interval`.
- The middleware bus has no concept of "the broker checked and found
- nothing."
## What the recorder seam observes naturally
@@ -69,7 +64,7 @@ The recorder is a `Callable[[str, Mapping[str, Any]], None]` invoked at
six subscriber events and one producer event. Plus `dlq_written` when
the DLQ is configured. It fires whether or not a handler is in scope:
-- All four bus-invisible events above.
+- All three bus-invisible events above.
- Plus `acked` / `nacked_retried` / `nacked_terminal` / `dispatched` /
`published` from inside the handler-execution paths, with explicit
`subscriber` and `queue` tags.
@@ -90,7 +85,6 @@ physically cannot observe.
| `fetched` ticks (including empty) | ❌ (no `StreamMessage` at fetch time) | ✅ |
| `lease_lost` after `consume_scope` exits | ❌ | ✅ |
| `nacked_terminal(reason="max_deliveries")` before consume opens | ❌ | ✅ |
-| Empty-fetch idle counter | ❌ | ✅ |
## Operator implication
diff --git a/docs/introduction/how-it-works.md b/docs/introduction/how-it-works.md
index ee1fca1..0213bcb 100644
--- a/docs/introduction/how-it-works.md
+++ b/docs/introduction/how-it-works.md
@@ -46,8 +46,9 @@ round-trip for many rows.
The producer also emits `SELECT pg_notify('outbox_
', queue)` on the
caller's session right after the INSERT, **except** when the row is
-future-dated (`activate_in` / `activate_at` set) or a `timer_id` conflict
-made the insert a no-op. NOTIFY is transactional, so listeners only see it
+genuinely future-dated (a future `activate_in` / `activate_at` — a *past*
+`activate_at`, e.g. a recovered idempotency token, still notifies) or a
+`timer_id` conflict made the insert a no-op. NOTIFY is transactional, so listeners only see it
after the user's transaction commits — atomicity with the row insert is
automatic.
@@ -68,15 +69,23 @@ WITH claimed AS (
acquired_token IS NULL
OR acquired_at < now() - make_interval(secs => :lease_ttl)
)
- ORDER BY id
+ ORDER BY next_attempt_at, id
LIMIT :batch
FOR UPDATE SKIP LOCKED
)
-UPDATE outbox SET acquired_token = :uuid, acquired_at = now()
+UPDATE outbox
+SET acquired_token = :uuid, acquired_at = now(),
+ deliveries_count = deliveries_count + 1
WHERE id IN (SELECT id FROM claimed)
RETURNING *
```
+This is **simplified for illustration**. The real query writes each `OR`
+disjunct with its own partial-index predicate spelled out as a conjunct, so
+Postgres can use the `outbox_pending_idx` / `outbox_lease_idx` partial
+indexes instead of a seq-scan — the naive `OR` above is the exact shape the
+code avoids.
+
The CTE reclaims both unleased rows AND rows whose lease has expired
(`acquired_at < now() - lease_ttl_seconds`), so there is no separate stuck-row
reaper. The idle-sleep is short-circuited by NOTIFY via an `asyncio.Event` —
diff --git a/docs/introduction/installation.md b/docs/introduction/installation.md
index 1efb192..fc936e3 100644
--- a/docs/introduction/installation.md
+++ b/docs/introduction/installation.md
@@ -40,12 +40,15 @@ docker run -d -p 5432:5432 \
## Optional extras
-The base install ships only the SQLAlchemy-driven polling broker. Each
-optional extra unlocks one feature; nothing else changes if you omit them.
+The base install ships only `faststream` + `sqlalchemy[asyncio]` — **no
+async Postgres driver**, so you must install one (the `asyncpg` extra below,
+or another async driver such as `psycopg`); the `postgresql+asyncpg://` DSNs
+in the examples need `asyncpg` specifically. Each optional extra unlocks one
+feature; nothing else changes if you omit them.
| Extra | Install | What it enables |
|---|---|---|
-| `asyncpg` | `pip install 'faststream-outbox[asyncpg]'` | The `asyncpg` SQLAlchemy driver. Required to get `LISTEN/NOTIFY` short-circuit wakeups in the subscriber's fetch loop — without it the loop falls back to plain polling, which adds up to `max_fetch_interval` (default 10s) of idle latency between an INSERT and a dispatch. |
+| `asyncpg` | `pip install 'faststream-outbox[asyncpg]'` | The `asyncpg` SQLAlchemy driver. Required to get `LISTEN/NOTIFY` short-circuit wakeups in the subscriber's fetch loop. With a *different* async driver (e.g. `psycopg`) but no `asyncpg`, the broker still works but the loop falls back to plain polling, which adds up to `max_fetch_interval` (default 10s) of idle latency between an INSERT and a dispatch; with no async driver at all the engine can't connect. |
| `fastapi` | `pip install 'faststream-outbox[fastapi]'` | The `faststream_outbox.fastapi.OutboxRouter` — see [FastAPI integration](../usage/fastapi.md). |
| `validate` | `pip install 'faststream-outbox[validate]'` | Alembic, for `broker.validate_schema()` — see [Schema validation](../usage/schema-validation.md). Calling `validate_schema()` without this extra raises `ImportError`; every other code path works. |
| `prometheus` | `pip install 'faststream-outbox[prometheus]'` | The `PrometheusRecorder` metrics adapter and native `OutboxPrometheusMiddleware` — see [Observability](../usage/observability.md). |
diff --git a/docs/operations/alembic.md b/docs/operations/alembic.md
index c13076a..b3b36f5 100644
--- a/docs/operations/alembic.md
+++ b/docs/operations/alembic.md
@@ -20,7 +20,7 @@ op.create_table('outbox',
sa.Column('id', sa.BigInteger(), autoincrement=True, nullable=False),
sa.Column('queue', sa.String(length=255), nullable=False),
sa.Column('payload', sa.LargeBinary(), nullable=False),
- sa.Column('headers', postgresql.JSONB(astext_type=Text()), nullable=True),
+ sa.Column('headers', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column('attempts_count', sa.BigInteger(), server_default='0', nullable=False),
sa.Column('deliveries_count', sa.BigInteger(), server_default='0', nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
@@ -66,8 +66,9 @@ maintained by the broker; no application code touches them.
The `# please adjust!` comment from Alembic is misleading here —
**don't adjust**. The column types, predicates, and indexes are
exactly what the broker depends on. The
-[`validate_schema()`](../usage/schema-validation.md) check will refuse
-to start a service whose live DB drifts from this declaration.
+[`validate_schema()`](../usage/schema-validation.md) check — when you wire
+it into a `/health` probe or CI gate — fails when the live DB drifts from
+this declaration. (It is opt-in; it never runs at `broker.start()`.)
## Adding the DLQ after the fact
@@ -83,7 +84,7 @@ op.create_table('outbox_dlq',
sa.Column('original_id', sa.BigInteger(), nullable=False),
sa.Column('queue', sa.String(length=255), nullable=False),
sa.Column('payload', sa.LargeBinary(), nullable=False),
- sa.Column('headers', postgresql.JSONB(astext_type=Text()), nullable=True),
+ sa.Column('headers', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column('deliveries_count', sa.BigInteger(), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('failed_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
diff --git a/docs/operations/checklist.md b/docs/operations/checklist.md
index 4de099d..aafec3b 100644
--- a/docs/operations/checklist.md
+++ b/docs/operations/checklist.md
@@ -11,9 +11,10 @@ story.
writer per worker + one fetch) plus one raw asyncpg connection for
`LISTEN`. Sub-budget formula in [Subscriber § Connection
budget](../usage/subscriber.md#connection-budget).
-- [ ] **Postgres `max_connections` ≥ `replicas × Σ subs × (max_workers + 1)`**
- — the formula is per-process; rolling deploys multiply it.
- Failure mode: pods refuse with `FATAL: too many connections`.
+- [ ] **Postgres `max_connections` ≥ `replicas × Σ subs × (max_workers + 2)`**
+ — `max_workers + 1` pool connections **plus** the raw asyncpg `LISTEN`
+ connection per subscriber; the formula is per-process and rolling deploys
+ multiply it. Failure mode: pods refuse with `FATAL: too many connections`.
## Subscribers
diff --git a/docs/operations/troubleshooting.md b/docs/operations/troubleshooting.md
index 51a7c3d..b152067 100644
--- a/docs/operations/troubleshooting.md
+++ b/docs/operations/troubleshooting.md
@@ -10,7 +10,7 @@ and a link into the reference page that owns the underlying design.
| [Outbox row count grows + `lease_lost` spike](#outbox-row-count-grows-lease_lost-spike) | DLQ CTE failing (DLQ schema drift) |
| [Outbox row count grows, no `lease_lost`](#outbox-row-count-grows-no-lease_lost) | Fetch loop not running, or rows future-dated |
| [Idle dispatch latency > `max_fetch_interval`](#idle-dispatch-latency-max_fetch_interval) | LISTEN setup failed → polling fallback |
-| [Subscriber blocks at `broker.start()`](#subscriber-blocks-at-brokerstart) | Engine pool exhausted on writer-connection checkout |
+| [Subscriber dispatch never starts; rows pile up](#subscriber-blocks-at-brokerstart) | Engine pool exhausted on writer-connection checkout |
| [Duplicate handler invocations](#duplicate-handler-invocations) | Lease expired before handler returned, or handler not idempotent |
| [Rolling deploy leaks rows](#rolling-deploy-leaks-rows) | `graceful_timeout` < handler P99, or k8s grace too short |
| [`activate_in` / `activate_at` fires immediately in tests](#activate_in-activate_at-fires-immediately-in-tests) | `TestOutboxBroker(run_loops=False)` ignores scheduling |
@@ -112,16 +112,20 @@ engine URL (`postgresql+asyncpg://...`). Restart the subscriber.
](../introduction/installation.md#optional-extras), [How it works §
Fetch loop](../introduction/how-it-works.md#subscriber-two-async-loops).
-## Subscriber blocks at `broker.start()` { #subscriber-blocks-at-brokerstart }
+## Subscriber dispatch never starts; rows pile up { #subscriber-blocks-at-brokerstart }
-**Symptom.** Process hangs on `broker.start()` (or the FastAPI
-`include_router` lifespan) and never completes startup.
+**Symptom.** Rows are published but never dispatched (the table grows) and
+the subscriber's loops emit repeating reconnect ERROR logs. `broker.start()`
+(or the FastAPI `include_router` lifespan) itself returns normally — it only
+schedules the loop tasks, so the failure shows up *after* startup, not as a
+hang.
**Likely cause.** SQLAlchemy pool exhausted on the per-worker writer
-connection checkout. Each subscriber needs `max_workers + 1` pool
-connections; the default pool is `pool_size=5, max_overflow=10`. A
-handful of single-worker subscribers fits, but a fleet of high-
-`max_workers` subscribers does not.
+connection checkout — the fetch/worker loops can't acquire their
+connections, so each cycle errors and backs off. Each subscriber needs
+`max_workers + 1` pool connections; the default pool is `pool_size=5,
+max_overflow=10`. A handful of single-worker subscribers fits, but a fleet
+of high-`max_workers` subscribers does not.
**Diagnose.** Inspect the engine pool. Compute `Σ subs × (max_workers
+ 1)` from your subscriber registrations and compare to
@@ -129,7 +133,8 @@ handful of single-worker subscribers fits, but a fleet of high-
**Fix.** Raise `pool_size` / `max_overflow` on the engine, OR lower
`max_workers` per subscriber. Also confirm Postgres
-`max_connections ≥ replicas × Σ subs × (max_workers + 1)` — rolling
+`max_connections ≥ replicas × Σ subs × (max_workers + 2)` (the pool's
+`max_workers + 1` plus the raw `LISTEN` connection) — rolling
deploys multiply the demand.
**Reference.** [Subscriber § Connection
diff --git a/docs/tutorials/add-kafka-relay.md b/docs/tutorials/add-kafka-relay.md
index 57ecead..9ef88d9 100644
--- a/docs/tutorials/add-kafka-relay.md
+++ b/docs/tutorials/add-kafka-relay.md
@@ -16,19 +16,25 @@ relay and seen the row arrive at a `kafka-console-consumer`.
- You finished [Tutorial: Your first outbox app](./first-outbox-app.md).
This tutorial extends that same `app.py`, the same `outbox-postgres`
- container, and the same project directory.
+ container, and the same project directory. **If you ran Tutorial 1's
+ final cleanup**, its `--rm` Postgres container (and its data) is gone —
+ re-run Tutorial 1's Postgres-start and schema-creation steps first; this
+ tutorial assumes `outbox-postgres` is up with the `outbox` table.
- Docker Compose (the `docker compose` CLI) for the Kafka container.
- Another ten minutes.
## Step 1: Add Kafka via docker-compose
-Postgres is already running from Tutorial 1. Add Kafka via a small
-`docker-compose.yml`. Single-broker [KRaft
-mode](https://kafka.apache.org/documentation/#kraft) — no separate
+Postgres should still be running from Tutorial 1 (see the note above if you
+ran its cleanup). Add Kafka via a small `docker-compose.yml`. Single-broker
+[KRaft mode](https://kafka.apache.org/documentation/#kraft) — no separate
ZooKeeper service, and Confluent's `cp-kafka:7.6.0` image is known to
-run well on Apple Silicon. Two listeners: one for clients on the host
-(your `faststream run` process) and one for clients inside the Docker
-network (the `kafka-console-consumer` we'll use in Step 5).
+run well on Apple Silicon. Two listeners: one on the host at `localhost:9092`
+(for your `faststream run` process) and one inside the Docker network at
+`kafka:29092` (inter-broker traffic). The Step 5 console consumer runs
+*inside* the broker container via `docker compose exec`, so it reaches the
+host listener at `kafka:9092` directly — no separate in-network client
+listener is needed for it.
```yaml title="docker-compose.yml"
services:
diff --git a/docs/usage/dlq.md b/docs/usage/dlq.md
index f215b75..741e3d6 100644
--- a/docs/usage/dlq.md
+++ b/docs/usage/dlq.md
@@ -141,7 +141,7 @@ Tags:
| `subscriber` | Subscriber handler name (`call_name`). |
| `deliveries_count` | Attempt count at terminal flush. |
| `failure_reason` | Same value set as the schema column. |
-| `exception_type` | Present only when `last_exception` was set (omitted for `max_deliveries` and manual `reject()` without an exception). |
+| `exception_type` | Always present; the exception class name, or `None` for terminals with no exception (`max_deliveries`, or a manual `reject()` without one). Custom recorders always see the key. |
The bundled adapters surface the event without further wiring:
@@ -187,6 +187,8 @@ side-effect: the source row is removed from `fake_client.rows` and an
audit dict is appended to `fake_client.dlq_rows` in the same call.
```python
+from sqlalchemy import MetaData
+
from faststream_outbox import NoRetry, OutboxBroker, TestOutboxBroker, make_dlq_table, make_outbox_table
diff --git a/docs/usage/fastapi.md b/docs/usage/fastapi.md
index fb6c8d7..4a3d915 100644
--- a/docs/usage/fastapi.md
+++ b/docs/usage/fastapi.md
@@ -122,9 +122,10 @@ intentionally **not exposed** on `OutboxRouter.__init__`:
— the broker's FastStream `Dependant` list is the wrong shape for this
flow.
-If you need broker-level FastStream middlewares or dependencies, set them
-on the broker before mounting the router and use the FastAPI `Depends(...)`
-mechanism in handlers.
+If you need broker-level FastStream middlewares, pass them to
+`OutboxRouter(middlewares=[...])` — the router builds the broker for you, so
+there is no separate broker to configure. Use the FastAPI `Depends(...)`
+mechanism in handlers for dependencies.
## Engine ownership
diff --git a/docs/usage/observability.md b/docs/usage/observability.md
index a5a7115..5784b82 100644
--- a/docs/usage/observability.md
+++ b/docs/usage/observability.md
@@ -17,7 +17,7 @@ MetricsRecorder = Callable[[str, Mapping[str, Any]], None]
The default (`_noop_recorder`) lets instrumentation sites call
unconditionally. The recorder threads through `OutboxBrokerConfig` to:
-- The subscriber's six emission points via `OutboxSubscriber._emit_metric`
+- The subscriber's seven emission points via `OutboxSubscriber._emit_metric`
- The producer's single emission point via `OutboxProducer._emit_metric`
### Bare seam
@@ -28,7 +28,7 @@ from faststream_outbox import MetricsRecorder, OutboxBroker
def recorder(event: str, tags: dict) -> None:
# event ∈ {fetched, dispatched, acked, nacked_retried, nacked_terminal,
- # lease_lost, published}
+ # lease_lost, dlq_written, published}
# tags always include "queue"; subscriber-side events also include "subscriber"
print(event, tags)
@@ -52,13 +52,13 @@ broken recorder never poisons the dispatch loop.
| Event | Tags (always present) | Tags (situational) | Fired by |
|---|---|---|---|
| `fetched` | `queue`, `subscriber`, `count` | | Fetch loop, every cycle (including empty) |
-| `dispatched` | `queue`, `subscriber` | | Worker loop, before handler runs |
-| `acked` | `queue`, `subscriber` | `duration_seconds` | Handler returned successfully |
+| `dispatched` | `queue`, `subscriber`, `deliveries_count`, `size_bytes` | | Worker loop, before handler runs |
+| `acked` | `queue`, `subscriber`, `deliveries_count`, `duration_seconds` | | Handler returned successfully |
| `nacked_retried` | `queue`, `subscriber`, `deliveries_count`, `duration_seconds`, `next_delay_seconds` | `exception_type` | Retry scheduled |
-| `nacked_terminal` | `queue`, `subscriber`, `deliveries_count`, `reason` | `exception_type` | Row terminally failed |
-| `lease_lost` | `queue`, `phase`, `row_id`, `deliveries_count` | | Terminal write found `rowcount == 0` |
+| `nacked_terminal` | `queue`, `subscriber`, `deliveries_count`, `reason` | `duration_seconds`, `exception_type` | Row terminally failed (`duration_seconds` absent for `max_deliveries`, which never ran the handler) |
+| `lease_lost` | `queue`, `subscriber`, `phase`, `row_id`, `deliveries_count` | | Terminal or retry write found `rowcount == 0` (`phase` = `terminal` \| `retry`) |
| `published` | `queue`, `status`, `count`, `size_bytes`, `duration_seconds` | `exception_type` | Producer, after the INSERT executes (pre-commit; also fires on error with `status="error"`) |
-| `dlq_written` | `queue`, `subscriber`, `deliveries_count`, `failure_reason` | `exception_type` | DLQ CTE wrote an audit row |
+| `dlq_written` | `queue`, `subscriber`, `deliveries_count`, `failure_reason`, `exception_type` | | DLQ CTE wrote an audit row (`exception_type` is `None` when the terminal had no exception) |
`reason` on `nacked_terminal` is one of `max_deliveries`,
`retry_terminal`, `rejected`. The same value lands in the DLQ
diff --git a/docs/usage/publisher.md b/docs/usage/publisher.md
index 79cab0d..6b86663 100644
--- a/docs/usage/publisher.md
+++ b/docs/usage/publisher.md
@@ -115,8 +115,10 @@ contract.
For "consume from queue A → enqueue to queue B" relays, either call
`broker.publish(value, queue="B", session=session)` directly inside your
-handler — on the same session that owns the inbound row's terminal write —
-or `return OutboxResponse(...)` (see below).
+handler — on the same session that holds your domain writes — or
+`return OutboxResponse(...)` (see below). (The inbound row's own terminal
+DELETE runs separately, on the worker's autocommit connection, not this
+session.)
## Chained publishing
diff --git a/docs/usage/schema-validation.md b/docs/usage/schema-validation.md
index de04e2d..e646ac1 100644
--- a/docs/usage/schema-validation.md
+++ b/docs/usage/schema-validation.md
@@ -7,7 +7,9 @@ everything the broker needs at runtime.
`broker.validate_schema()` delegates to Alembic's
`autogenerate.compare_metadata` against a throwaway `MetaData` populated
by `make_outbox_table(...)`. The canonical `Table` is the single source
-of truth; the validator never duplicates the schema declaration.
+of truth; the validator never duplicates the schema declaration. When the
+broker was constructed with a `dlq_table`, `validate_schema()` runs a
+second pass over the DLQ table the same way.
## Install
diff --git a/docs/usage/setup-prometheus-opentelemetry.md b/docs/usage/setup-prometheus-opentelemetry.md
index 6ff6979..0948e32 100644
--- a/docs/usage/setup-prometheus-opentelemetry.md
+++ b/docs/usage/setup-prometheus-opentelemetry.md
@@ -123,12 +123,15 @@ Prometheus exposition format on `/metrics`; for OTLP push instead,
swap the reader for `PeriodicExportingMetricReader(OTLPMetricExporter(...))`
and drop the `/metrics` route.
-Instrument names (`messaging.process.duration`,
-`messaging.publish.duration`, `messaging.process.messages` when
-`include_messages_counters=True`), units, and constructor args
-(`meter_provider`, `meter`, `include_messages_counters`) match
-`faststream.opentelemetry.TelemetryMiddleware`. The
-`messaging.system="outbox"` attribute disambiguates outbox traffic
+Instrument names match `faststream.opentelemetry.TelemetryMiddleware` for
+the bus-scope metrics — `messaging.process.duration`,
+`messaging.publish.duration`, and (when `include_messages_counters=True`)
+`messaging.process.messages` / `messaging.publish.messages` — plus three
+outbox-specific counters the middleware can't emit:
+`messaging.outbox.fetch.batches`, `messaging.outbox.lease_lost`, and
+`messaging.outbox.dlq_written`. Units and constructor args
+(`meter_provider`, `meter`, `include_messages_counters`) follow upstream.
+The `messaging.system="outbox"` attribute disambiguates outbox traffic
from Kafka / Rabbit data on the same instruments.
**Tracing (spans) is not modelled by this adapter** — the callable
@@ -139,7 +142,7 @@ middleware](#native-middleware-spans--bus-parity) integration below.
For OTel spans wrapping `consume_scope` / `publish_scope` and the
exact upstream label / instrument schema, register the native
-middleware subclasses via `broker_middlewares=[...]` — same
+middleware subclasses via `middlewares=[...]` — same
registration pattern as `KafkaPrometheusMiddleware` /
`RabbitTelemetryMiddleware`.
diff --git a/docs/usage/subscriber.md b/docs/usage/subscriber.md
index 8ecb2bd..c523079 100644
--- a/docs/usage/subscriber.md
+++ b/docs/usage/subscriber.md
@@ -64,7 +64,7 @@ Per-subscriber knobs, passed to `@broker.subscriber("…", …)`:
|---|---|---|
| `max_workers` | `1` | Concurrent handlers per subscriber |
| `fetch_batch_size` | `10` | Rows claimed per fetch cycle |
-| `min_fetch_interval` | `1.0` s | Base poll interval; the floor used when the queue has work |
+| `min_fetch_interval` | `1.0` s | Base for the adaptive idle backoff (jittered ±50%, so an actual wait can land below it) and the wait when the inflight queue is full; no sleep at all while fetches keep returning rows |
| `max_fetch_interval` | `10.0` s | Ceiling for the adaptive idle backoff (with jitter) |
| `lease_ttl_seconds` | `60.0` s | How long a claim is valid before another fetch may reclaim it. **Must exceed your handler's P99 with margin.** |
| `max_deliveries` | `None` (unbounded) | Total claims (including lease-expiry re-claims) after which the row is dropped without invoking the handler. Defends against handlers that consistently wedge. |
@@ -199,15 +199,21 @@ Returning `None` from `get_next_attempt_delay` signals a terminal failure.
Each subscriber holds `max_workers + 1` long-lived SQLAlchemy pool
connections (one writer per worker + one fetch), plus one raw asyncpg
-connection for `LISTEN` when available. Size your engine for `Σ subscribers
-× (max_workers + 1)` or `broker.start()` will block on pool checkout.
-SQLAlchemy's default `pool_size=5, max_overflow=10` covers a handful of
-single-worker subscribers; raise it for larger fleets.
-
-The formula is **per process**. Each replica opens its own pool, so your
-Postgres `max_connections` needs to cover `replicas × Σ subscribers ×
-(max_workers + 1)` — otherwise additional replicas (or rolling deployments)
-will be refused at startup with `FATAL: too many connections`.
+connection for `LISTEN` when available. Size your **engine pool** for
+`Σ subscribers × (max_workers + 1)`. An undersized pool does **not** block
+`broker.start()` — `start()` only schedules the loop tasks and returns;
+instead the fetch/worker loops stall on pool checkout and surface as
+repeating reconnect ERROR logs with dispatch silently starved. SQLAlchemy's
+default `pool_size=5, max_overflow=10` covers a handful of single-worker
+subscribers; raise it for larger fleets.
+
+Server-side, the footprint is one larger: the raw asyncpg `LISTEN`
+connection lives **outside** the pool, so each subscriber consumes
+`max_workers + 2` Postgres connections. The budget is **per process** —
+each replica opens its own pool and LISTEN connections, so your Postgres
+`max_connections` needs to cover `replicas × Σ subscribers × (max_workers +
+2)`, otherwise additional replicas (or rolling deployments) are refused at
+startup with `FATAL: too many connections`.
*Operator-side: [Production checklist § Sizing](../operations/checklist.md#sizing).*
diff --git a/docs/usage/timers.md b/docs/usage/timers.md
index c559b63..7dc7532 100644
--- a/docs/usage/timers.md
+++ b/docs/usage/timers.md
@@ -73,9 +73,11 @@ second = await broker.publish(
assert second is None
```
-NOTIFY is skipped when `activate_in` / `activate_at` is set OR the conflict
-path returned no row — both cases would either wake listeners that find
-nothing, or wake them prematurely.
+NOTIFY is skipped when the row is genuinely future-dated (a *future*
+`activate_in` / `activate_at`) OR the conflict path returned no row — both
+cases would either wake listeners that find nothing, or wake them
+prematurely. A *past* `activate_at` is already eligible, so it still
+notifies.
`timer_id` is only available on single `publish`, not on `publish_batch`
(per-row dedup makes no sense for a batch).