diff --git a/docs/concepts/comparison.md b/docs/concepts/comparison.md index 801909c..b06951a 100644 --- a/docs/concepts/comparison.md +++ b/docs/concepts/comparison.md @@ -101,9 +101,8 @@ no replay, no persistence, no retry. `faststream-outbox` keeps the **outbox row** as the durability boundary and uses NOTIFY only as a wake-up short-circuit on top of polling. If -the NOTIFY is lost — listener reconnecting, channel name too long -(Postgres' 63-char limit), `LISTEN` setup failed at startup — the -subscriber still finds the row on its next poll cycle. The worst case +the NOTIFY is lost — listener reconnecting, or `LISTEN` setup failed at +startup — the subscriber still finds the row on its next poll cycle. The worst case is one `max_fetch_interval` of idle latency (default 10 seconds), not data loss. diff --git a/docs/concepts/instrumentation-seams.md b/docs/concepts/instrumentation-seams.md index 04f43f7..5e561e8 100644 --- a/docs/concepts/instrumentation-seams.md +++ b/docs/concepts/instrumentation-seams.md @@ -39,7 +39,7 @@ This is the "spans + bus parity" mode the native middleware ## What the middleware seam *can't* observe -Four events fire **outside** the handler invocation, with no +Three events fire **outside** the handler invocation, with no `StreamMessage` in scope: - **`fetched` ticks (including empty fetches).** Emitted by the fetch @@ -57,11 +57,6 @@ Four events fire **outside** the handler invocation, with no the `max_deliveries` ceiling and was dropped *without invoking the handler*. No handler call = no `consume_scope`. The middleware has nothing to wrap. -- **The empty-fetch idle counter.** Same shape as `fetched` ticks — - fires when the fetch loop went a round without finding anything to - claim. Useful for tuning `min_fetch_interval` and `max_fetch_interval`. - The middleware bus has no concept of "the broker checked and found - nothing." ## What the recorder seam observes naturally @@ -69,7 +64,7 @@ The recorder is a `Callable[[str, Mapping[str, Any]], None]` invoked at six subscriber events and one producer event. Plus `dlq_written` when the DLQ is configured. It fires whether or not a handler is in scope: -- All four bus-invisible events above. +- All three bus-invisible events above. - Plus `acked` / `nacked_retried` / `nacked_terminal` / `dispatched` / `published` from inside the handler-execution paths, with explicit `subscriber` and `queue` tags. @@ -90,7 +85,6 @@ physically cannot observe. | `fetched` ticks (including empty) | ❌ (no `StreamMessage` at fetch time) | ✅ | | `lease_lost` after `consume_scope` exits | ❌ | ✅ | | `nacked_terminal(reason="max_deliveries")` before consume opens | ❌ | ✅ | -| Empty-fetch idle counter | ❌ | ✅ | ## Operator implication diff --git a/docs/introduction/how-it-works.md b/docs/introduction/how-it-works.md index ee1fca1..0213bcb 100644 --- a/docs/introduction/how-it-works.md +++ b/docs/introduction/how-it-works.md @@ -46,8 +46,9 @@ round-trip for many rows. The producer also emits `SELECT pg_notify('outbox_', queue)` on the caller's session right after the INSERT, **except** when the row is -future-dated (`activate_in` / `activate_at` set) or a `timer_id` conflict -made the insert a no-op. NOTIFY is transactional, so listeners only see it +genuinely future-dated (a future `activate_in` / `activate_at` — a *past* +`activate_at`, e.g. a recovered idempotency token, still notifies) or a +`timer_id` conflict made the insert a no-op. NOTIFY is transactional, so listeners only see it after the user's transaction commits — atomicity with the row insert is automatic. @@ -68,15 +69,23 @@ WITH claimed AS ( acquired_token IS NULL OR acquired_at < now() - make_interval(secs => :lease_ttl) ) - ORDER BY id + ORDER BY next_attempt_at, id LIMIT :batch FOR UPDATE SKIP LOCKED ) -UPDATE outbox SET acquired_token = :uuid, acquired_at = now() +UPDATE outbox +SET acquired_token = :uuid, acquired_at = now(), + deliveries_count = deliveries_count + 1 WHERE id IN (SELECT id FROM claimed) RETURNING * ``` +This is **simplified for illustration**. The real query writes each `OR` +disjunct with its own partial-index predicate spelled out as a conjunct, so +Postgres can use the `outbox_pending_idx` / `outbox_lease_idx` partial +indexes instead of a seq-scan — the naive `OR` above is the exact shape the +code avoids. + The CTE reclaims both unleased rows AND rows whose lease has expired (`acquired_at < now() - lease_ttl_seconds`), so there is no separate stuck-row reaper. The idle-sleep is short-circuited by NOTIFY via an `asyncio.Event` — diff --git a/docs/introduction/installation.md b/docs/introduction/installation.md index 1efb192..fc936e3 100644 --- a/docs/introduction/installation.md +++ b/docs/introduction/installation.md @@ -40,12 +40,15 @@ docker run -d -p 5432:5432 \ ## Optional extras -The base install ships only the SQLAlchemy-driven polling broker. Each -optional extra unlocks one feature; nothing else changes if you omit them. +The base install ships only `faststream` + `sqlalchemy[asyncio]` — **no +async Postgres driver**, so you must install one (the `asyncpg` extra below, +or another async driver such as `psycopg`); the `postgresql+asyncpg://` DSNs +in the examples need `asyncpg` specifically. Each optional extra unlocks one +feature; nothing else changes if you omit them. | Extra | Install | What it enables | |---|---|---| -| `asyncpg` | `pip install 'faststream-outbox[asyncpg]'` | The `asyncpg` SQLAlchemy driver. Required to get `LISTEN/NOTIFY` short-circuit wakeups in the subscriber's fetch loop — without it the loop falls back to plain polling, which adds up to `max_fetch_interval` (default 10s) of idle latency between an INSERT and a dispatch. | +| `asyncpg` | `pip install 'faststream-outbox[asyncpg]'` | The `asyncpg` SQLAlchemy driver. Required to get `LISTEN/NOTIFY` short-circuit wakeups in the subscriber's fetch loop. With a *different* async driver (e.g. `psycopg`) but no `asyncpg`, the broker still works but the loop falls back to plain polling, which adds up to `max_fetch_interval` (default 10s) of idle latency between an INSERT and a dispatch; with no async driver at all the engine can't connect. | | `fastapi` | `pip install 'faststream-outbox[fastapi]'` | The `faststream_outbox.fastapi.OutboxRouter` — see [FastAPI integration](../usage/fastapi.md). | | `validate` | `pip install 'faststream-outbox[validate]'` | Alembic, for `broker.validate_schema()` — see [Schema validation](../usage/schema-validation.md). Calling `validate_schema()` without this extra raises `ImportError`; every other code path works. | | `prometheus` | `pip install 'faststream-outbox[prometheus]'` | The `PrometheusRecorder` metrics adapter and native `OutboxPrometheusMiddleware` — see [Observability](../usage/observability.md). | diff --git a/docs/operations/alembic.md b/docs/operations/alembic.md index c13076a..b3b36f5 100644 --- a/docs/operations/alembic.md +++ b/docs/operations/alembic.md @@ -20,7 +20,7 @@ op.create_table('outbox', sa.Column('id', sa.BigInteger(), autoincrement=True, nullable=False), sa.Column('queue', sa.String(length=255), nullable=False), sa.Column('payload', sa.LargeBinary(), nullable=False), - sa.Column('headers', postgresql.JSONB(astext_type=Text()), nullable=True), + sa.Column('headers', postgresql.JSONB(astext_type=sa.Text()), nullable=True), sa.Column('attempts_count', sa.BigInteger(), server_default='0', nullable=False), sa.Column('deliveries_count', sa.BigInteger(), server_default='0', nullable=False), sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False), @@ -66,8 +66,9 @@ maintained by the broker; no application code touches them. The `# please adjust!` comment from Alembic is misleading here — **don't adjust**. The column types, predicates, and indexes are exactly what the broker depends on. The -[`validate_schema()`](../usage/schema-validation.md) check will refuse -to start a service whose live DB drifts from this declaration. +[`validate_schema()`](../usage/schema-validation.md) check — when you wire +it into a `/health` probe or CI gate — fails when the live DB drifts from +this declaration. (It is opt-in; it never runs at `broker.start()`.) ## Adding the DLQ after the fact @@ -83,7 +84,7 @@ op.create_table('outbox_dlq', sa.Column('original_id', sa.BigInteger(), nullable=False), sa.Column('queue', sa.String(length=255), nullable=False), sa.Column('payload', sa.LargeBinary(), nullable=False), - sa.Column('headers', postgresql.JSONB(astext_type=Text()), nullable=True), + sa.Column('headers', postgresql.JSONB(astext_type=sa.Text()), nullable=True), sa.Column('deliveries_count', sa.BigInteger(), nullable=False), sa.Column('created_at', sa.DateTime(timezone=True), nullable=False), sa.Column('failed_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False), diff --git a/docs/operations/checklist.md b/docs/operations/checklist.md index 4de099d..aafec3b 100644 --- a/docs/operations/checklist.md +++ b/docs/operations/checklist.md @@ -11,9 +11,10 @@ story. writer per worker + one fetch) plus one raw asyncpg connection for `LISTEN`. Sub-budget formula in [Subscriber § Connection budget](../usage/subscriber.md#connection-budget). -- [ ] **Postgres `max_connections` ≥ `replicas × Σ subs × (max_workers + 1)`** - — the formula is per-process; rolling deploys multiply it. - Failure mode: pods refuse with `FATAL: too many connections`. +- [ ] **Postgres `max_connections` ≥ `replicas × Σ subs × (max_workers + 2)`** + — `max_workers + 1` pool connections **plus** the raw asyncpg `LISTEN` + connection per subscriber; the formula is per-process and rolling deploys + multiply it. Failure mode: pods refuse with `FATAL: too many connections`. ## Subscribers diff --git a/docs/operations/troubleshooting.md b/docs/operations/troubleshooting.md index 51a7c3d..b152067 100644 --- a/docs/operations/troubleshooting.md +++ b/docs/operations/troubleshooting.md @@ -10,7 +10,7 @@ and a link into the reference page that owns the underlying design. | [Outbox row count grows + `lease_lost` spike](#outbox-row-count-grows-lease_lost-spike) | DLQ CTE failing (DLQ schema drift) | | [Outbox row count grows, no `lease_lost`](#outbox-row-count-grows-no-lease_lost) | Fetch loop not running, or rows future-dated | | [Idle dispatch latency > `max_fetch_interval`](#idle-dispatch-latency-max_fetch_interval) | LISTEN setup failed → polling fallback | -| [Subscriber blocks at `broker.start()`](#subscriber-blocks-at-brokerstart) | Engine pool exhausted on writer-connection checkout | +| [Subscriber dispatch never starts; rows pile up](#subscriber-blocks-at-brokerstart) | Engine pool exhausted on writer-connection checkout | | [Duplicate handler invocations](#duplicate-handler-invocations) | Lease expired before handler returned, or handler not idempotent | | [Rolling deploy leaks rows](#rolling-deploy-leaks-rows) | `graceful_timeout` < handler P99, or k8s grace too short | | [`activate_in` / `activate_at` fires immediately in tests](#activate_in-activate_at-fires-immediately-in-tests) | `TestOutboxBroker(run_loops=False)` ignores scheduling | @@ -112,16 +112,20 @@ engine URL (`postgresql+asyncpg://...`). Restart the subscriber. ](../introduction/installation.md#optional-extras), [How it works § Fetch loop](../introduction/how-it-works.md#subscriber-two-async-loops). -## Subscriber blocks at `broker.start()` { #subscriber-blocks-at-brokerstart } +## Subscriber dispatch never starts; rows pile up { #subscriber-blocks-at-brokerstart } -**Symptom.** Process hangs on `broker.start()` (or the FastAPI -`include_router` lifespan) and never completes startup. +**Symptom.** Rows are published but never dispatched (the table grows) and +the subscriber's loops emit repeating reconnect ERROR logs. `broker.start()` +(or the FastAPI `include_router` lifespan) itself returns normally — it only +schedules the loop tasks, so the failure shows up *after* startup, not as a +hang. **Likely cause.** SQLAlchemy pool exhausted on the per-worker writer -connection checkout. Each subscriber needs `max_workers + 1` pool -connections; the default pool is `pool_size=5, max_overflow=10`. A -handful of single-worker subscribers fits, but a fleet of high- -`max_workers` subscribers does not. +connection checkout — the fetch/worker loops can't acquire their +connections, so each cycle errors and backs off. Each subscriber needs +`max_workers + 1` pool connections; the default pool is `pool_size=5, +max_overflow=10`. A handful of single-worker subscribers fits, but a fleet +of high-`max_workers` subscribers does not. **Diagnose.** Inspect the engine pool. Compute `Σ subs × (max_workers + 1)` from your subscriber registrations and compare to @@ -129,7 +133,8 @@ handful of single-worker subscribers fits, but a fleet of high- **Fix.** Raise `pool_size` / `max_overflow` on the engine, OR lower `max_workers` per subscriber. Also confirm Postgres -`max_connections ≥ replicas × Σ subs × (max_workers + 1)` — rolling +`max_connections ≥ replicas × Σ subs × (max_workers + 2)` (the pool's +`max_workers + 1` plus the raw `LISTEN` connection) — rolling deploys multiply the demand. **Reference.** [Subscriber § Connection diff --git a/docs/tutorials/add-kafka-relay.md b/docs/tutorials/add-kafka-relay.md index 57ecead..9ef88d9 100644 --- a/docs/tutorials/add-kafka-relay.md +++ b/docs/tutorials/add-kafka-relay.md @@ -16,19 +16,25 @@ relay and seen the row arrive at a `kafka-console-consumer`. - You finished [Tutorial: Your first outbox app](./first-outbox-app.md). This tutorial extends that same `app.py`, the same `outbox-postgres` - container, and the same project directory. + container, and the same project directory. **If you ran Tutorial 1's + final cleanup**, its `--rm` Postgres container (and its data) is gone — + re-run Tutorial 1's Postgres-start and schema-creation steps first; this + tutorial assumes `outbox-postgres` is up with the `outbox` table. - Docker Compose (the `docker compose` CLI) for the Kafka container. - Another ten minutes. ## Step 1: Add Kafka via docker-compose -Postgres is already running from Tutorial 1. Add Kafka via a small -`docker-compose.yml`. Single-broker [KRaft -mode](https://kafka.apache.org/documentation/#kraft) — no separate +Postgres should still be running from Tutorial 1 (see the note above if you +ran its cleanup). Add Kafka via a small `docker-compose.yml`. Single-broker +[KRaft mode](https://kafka.apache.org/documentation/#kraft) — no separate ZooKeeper service, and Confluent's `cp-kafka:7.6.0` image is known to -run well on Apple Silicon. Two listeners: one for clients on the host -(your `faststream run` process) and one for clients inside the Docker -network (the `kafka-console-consumer` we'll use in Step 5). +run well on Apple Silicon. Two listeners: one on the host at `localhost:9092` +(for your `faststream run` process) and one inside the Docker network at +`kafka:29092` (inter-broker traffic). The Step 5 console consumer runs +*inside* the broker container via `docker compose exec`, so it reaches the +host listener at `kafka:9092` directly — no separate in-network client +listener is needed for it. ```yaml title="docker-compose.yml" services: diff --git a/docs/usage/dlq.md b/docs/usage/dlq.md index f215b75..741e3d6 100644 --- a/docs/usage/dlq.md +++ b/docs/usage/dlq.md @@ -141,7 +141,7 @@ Tags: | `subscriber` | Subscriber handler name (`call_name`). | | `deliveries_count` | Attempt count at terminal flush. | | `failure_reason` | Same value set as the schema column. | -| `exception_type` | Present only when `last_exception` was set (omitted for `max_deliveries` and manual `reject()` without an exception). | +| `exception_type` | Always present; the exception class name, or `None` for terminals with no exception (`max_deliveries`, or a manual `reject()` without one). Custom recorders always see the key. | The bundled adapters surface the event without further wiring: @@ -187,6 +187,8 @@ side-effect: the source row is removed from `fake_client.rows` and an audit dict is appended to `fake_client.dlq_rows` in the same call. ```python +from sqlalchemy import MetaData + from faststream_outbox import NoRetry, OutboxBroker, TestOutboxBroker, make_dlq_table, make_outbox_table diff --git a/docs/usage/fastapi.md b/docs/usage/fastapi.md index fb6c8d7..4a3d915 100644 --- a/docs/usage/fastapi.md +++ b/docs/usage/fastapi.md @@ -122,9 +122,10 @@ intentionally **not exposed** on `OutboxRouter.__init__`: — the broker's FastStream `Dependant` list is the wrong shape for this flow. -If you need broker-level FastStream middlewares or dependencies, set them -on the broker before mounting the router and use the FastAPI `Depends(...)` -mechanism in handlers. +If you need broker-level FastStream middlewares, pass them to +`OutboxRouter(middlewares=[...])` — the router builds the broker for you, so +there is no separate broker to configure. Use the FastAPI `Depends(...)` +mechanism in handlers for dependencies. ## Engine ownership diff --git a/docs/usage/observability.md b/docs/usage/observability.md index a5a7115..5784b82 100644 --- a/docs/usage/observability.md +++ b/docs/usage/observability.md @@ -17,7 +17,7 @@ MetricsRecorder = Callable[[str, Mapping[str, Any]], None] The default (`_noop_recorder`) lets instrumentation sites call unconditionally. The recorder threads through `OutboxBrokerConfig` to: -- The subscriber's six emission points via `OutboxSubscriber._emit_metric` +- The subscriber's seven emission points via `OutboxSubscriber._emit_metric` - The producer's single emission point via `OutboxProducer._emit_metric` ### Bare seam @@ -28,7 +28,7 @@ from faststream_outbox import MetricsRecorder, OutboxBroker def recorder(event: str, tags: dict) -> None: # event ∈ {fetched, dispatched, acked, nacked_retried, nacked_terminal, - # lease_lost, published} + # lease_lost, dlq_written, published} # tags always include "queue"; subscriber-side events also include "subscriber" print(event, tags) @@ -52,13 +52,13 @@ broken recorder never poisons the dispatch loop. | Event | Tags (always present) | Tags (situational) | Fired by | |---|---|---|---| | `fetched` | `queue`, `subscriber`, `count` | | Fetch loop, every cycle (including empty) | -| `dispatched` | `queue`, `subscriber` | | Worker loop, before handler runs | -| `acked` | `queue`, `subscriber` | `duration_seconds` | Handler returned successfully | +| `dispatched` | `queue`, `subscriber`, `deliveries_count`, `size_bytes` | | Worker loop, before handler runs | +| `acked` | `queue`, `subscriber`, `deliveries_count`, `duration_seconds` | | Handler returned successfully | | `nacked_retried` | `queue`, `subscriber`, `deliveries_count`, `duration_seconds`, `next_delay_seconds` | `exception_type` | Retry scheduled | -| `nacked_terminal` | `queue`, `subscriber`, `deliveries_count`, `reason` | `exception_type` | Row terminally failed | -| `lease_lost` | `queue`, `phase`, `row_id`, `deliveries_count` | | Terminal write found `rowcount == 0` | +| `nacked_terminal` | `queue`, `subscriber`, `deliveries_count`, `reason` | `duration_seconds`, `exception_type` | Row terminally failed (`duration_seconds` absent for `max_deliveries`, which never ran the handler) | +| `lease_lost` | `queue`, `subscriber`, `phase`, `row_id`, `deliveries_count` | | Terminal or retry write found `rowcount == 0` (`phase` = `terminal` \| `retry`) | | `published` | `queue`, `status`, `count`, `size_bytes`, `duration_seconds` | `exception_type` | Producer, after the INSERT executes (pre-commit; also fires on error with `status="error"`) | -| `dlq_written` | `queue`, `subscriber`, `deliveries_count`, `failure_reason` | `exception_type` | DLQ CTE wrote an audit row | +| `dlq_written` | `queue`, `subscriber`, `deliveries_count`, `failure_reason`, `exception_type` | | DLQ CTE wrote an audit row (`exception_type` is `None` when the terminal had no exception) | `reason` on `nacked_terminal` is one of `max_deliveries`, `retry_terminal`, `rejected`. The same value lands in the DLQ diff --git a/docs/usage/publisher.md b/docs/usage/publisher.md index 79cab0d..6b86663 100644 --- a/docs/usage/publisher.md +++ b/docs/usage/publisher.md @@ -115,8 +115,10 @@ contract. For "consume from queue A → enqueue to queue B" relays, either call `broker.publish(value, queue="B", session=session)` directly inside your -handler — on the same session that owns the inbound row's terminal write — -or `return OutboxResponse(...)` (see below). +handler — on the same session that holds your domain writes — or +`return OutboxResponse(...)` (see below). (The inbound row's own terminal +DELETE runs separately, on the worker's autocommit connection, not this +session.) ## Chained publishing diff --git a/docs/usage/schema-validation.md b/docs/usage/schema-validation.md index de04e2d..e646ac1 100644 --- a/docs/usage/schema-validation.md +++ b/docs/usage/schema-validation.md @@ -7,7 +7,9 @@ everything the broker needs at runtime. `broker.validate_schema()` delegates to Alembic's `autogenerate.compare_metadata` against a throwaway `MetaData` populated by `make_outbox_table(...)`. The canonical `Table` is the single source -of truth; the validator never duplicates the schema declaration. +of truth; the validator never duplicates the schema declaration. When the +broker was constructed with a `dlq_table`, `validate_schema()` runs a +second pass over the DLQ table the same way. ## Install diff --git a/docs/usage/setup-prometheus-opentelemetry.md b/docs/usage/setup-prometheus-opentelemetry.md index 6ff6979..0948e32 100644 --- a/docs/usage/setup-prometheus-opentelemetry.md +++ b/docs/usage/setup-prometheus-opentelemetry.md @@ -123,12 +123,15 @@ Prometheus exposition format on `/metrics`; for OTLP push instead, swap the reader for `PeriodicExportingMetricReader(OTLPMetricExporter(...))` and drop the `/metrics` route. -Instrument names (`messaging.process.duration`, -`messaging.publish.duration`, `messaging.process.messages` when -`include_messages_counters=True`), units, and constructor args -(`meter_provider`, `meter`, `include_messages_counters`) match -`faststream.opentelemetry.TelemetryMiddleware`. The -`messaging.system="outbox"` attribute disambiguates outbox traffic +Instrument names match `faststream.opentelemetry.TelemetryMiddleware` for +the bus-scope metrics — `messaging.process.duration`, +`messaging.publish.duration`, and (when `include_messages_counters=True`) +`messaging.process.messages` / `messaging.publish.messages` — plus three +outbox-specific counters the middleware can't emit: +`messaging.outbox.fetch.batches`, `messaging.outbox.lease_lost`, and +`messaging.outbox.dlq_written`. Units and constructor args +(`meter_provider`, `meter`, `include_messages_counters`) follow upstream. +The `messaging.system="outbox"` attribute disambiguates outbox traffic from Kafka / Rabbit data on the same instruments. **Tracing (spans) is not modelled by this adapter** — the callable @@ -139,7 +142,7 @@ middleware](#native-middleware-spans--bus-parity) integration below. For OTel spans wrapping `consume_scope` / `publish_scope` and the exact upstream label / instrument schema, register the native -middleware subclasses via `broker_middlewares=[...]` — same +middleware subclasses via `middlewares=[...]` — same registration pattern as `KafkaPrometheusMiddleware` / `RabbitTelemetryMiddleware`. diff --git a/docs/usage/subscriber.md b/docs/usage/subscriber.md index 8ecb2bd..c523079 100644 --- a/docs/usage/subscriber.md +++ b/docs/usage/subscriber.md @@ -64,7 +64,7 @@ Per-subscriber knobs, passed to `@broker.subscriber("…", …)`: |---|---|---| | `max_workers` | `1` | Concurrent handlers per subscriber | | `fetch_batch_size` | `10` | Rows claimed per fetch cycle | -| `min_fetch_interval` | `1.0` s | Base poll interval; the floor used when the queue has work | +| `min_fetch_interval` | `1.0` s | Base for the adaptive idle backoff (jittered ±50%, so an actual wait can land below it) and the wait when the inflight queue is full; no sleep at all while fetches keep returning rows | | `max_fetch_interval` | `10.0` s | Ceiling for the adaptive idle backoff (with jitter) | | `lease_ttl_seconds` | `60.0` s | How long a claim is valid before another fetch may reclaim it. **Must exceed your handler's P99 with margin.** | | `max_deliveries` | `None` (unbounded) | Total claims (including lease-expiry re-claims) after which the row is dropped without invoking the handler. Defends against handlers that consistently wedge. | @@ -199,15 +199,21 @@ Returning `None` from `get_next_attempt_delay` signals a terminal failure. Each subscriber holds `max_workers + 1` long-lived SQLAlchemy pool connections (one writer per worker + one fetch), plus one raw asyncpg -connection for `LISTEN` when available. Size your engine for `Σ subscribers -× (max_workers + 1)` or `broker.start()` will block on pool checkout. -SQLAlchemy's default `pool_size=5, max_overflow=10` covers a handful of -single-worker subscribers; raise it for larger fleets. - -The formula is **per process**. Each replica opens its own pool, so your -Postgres `max_connections` needs to cover `replicas × Σ subscribers × -(max_workers + 1)` — otherwise additional replicas (or rolling deployments) -will be refused at startup with `FATAL: too many connections`. +connection for `LISTEN` when available. Size your **engine pool** for +`Σ subscribers × (max_workers + 1)`. An undersized pool does **not** block +`broker.start()` — `start()` only schedules the loop tasks and returns; +instead the fetch/worker loops stall on pool checkout and surface as +repeating reconnect ERROR logs with dispatch silently starved. SQLAlchemy's +default `pool_size=5, max_overflow=10` covers a handful of single-worker +subscribers; raise it for larger fleets. + +Server-side, the footprint is one larger: the raw asyncpg `LISTEN` +connection lives **outside** the pool, so each subscriber consumes +`max_workers + 2` Postgres connections. The budget is **per process** — +each replica opens its own pool and LISTEN connections, so your Postgres +`max_connections` needs to cover `replicas × Σ subscribers × (max_workers + +2)`, otherwise additional replicas (or rolling deployments) are refused at +startup with `FATAL: too many connections`. *Operator-side: [Production checklist § Sizing](../operations/checklist.md#sizing).* diff --git a/docs/usage/timers.md b/docs/usage/timers.md index c559b63..7dc7532 100644 --- a/docs/usage/timers.md +++ b/docs/usage/timers.md @@ -73,9 +73,11 @@ second = await broker.publish( assert second is None ``` -NOTIFY is skipped when `activate_in` / `activate_at` is set OR the conflict -path returned no row — both cases would either wake listeners that find -nothing, or wake them prematurely. +NOTIFY is skipped when the row is genuinely future-dated (a *future* +`activate_in` / `activate_at`) OR the conflict path returned no row — both +cases would either wake listeners that find nothing, or wake them +prematurely. A *past* `activate_at` is already eligible, so it still +notifies. `timer_id` is only available on single `publish`, not on `publish_batch` (per-row dedup makes no sense for a batch).