diff --git a/docs/operations/checklist.md b/docs/operations/checklist.md index 8064dbf..4de099d 100644 --- a/docs/operations/checklist.md +++ b/docs/operations/checklist.md @@ -63,10 +63,11 @@ story. `[validate]` extra. Do **not** call at `broker.start()` — that would crash-loop on a pending migration. See [Schema validation § Where to call it](../usage/schema-validation.md#where-to-call-it). -- [ ] **Outbox `table_name` ≤ ~56 chars** — NOTIFY channel name is - `outbox_`. Postgres' 63-char identifier limit silently - truncates longer names and `LISTEN/NOTIFY` short-circuit degrades - to plain polling. +- [ ] **Outbox `table_name` short enough for the NOTIFY channel** — the + channel name is `outbox_`, and `make_outbox_table` raises + `ValueError` at table-build time when that exceeds Postgres' 63-**byte** + identifier limit. There is no silent truncation or polling fallback — the + guard makes an over-long name impossible to ship. ## Observability @@ -76,8 +77,9 @@ story. - [ ] **Alert on `lease_lost` rate** — non-zero means `lease_ttl_seconds < handler P99` for at least one subscriber. See [Troubleshooting § `event=lease_lost`](./troubleshooting.md#event-lease_lost-recurring-in-logs). -- [ ] **`LISTEN/NOTIFY` fallback warning checked at startup** — if - the asyncpg connection fails (driver missing, permission error), - the subscriber logs once and falls back to polling. Operator - silently lives with up-to-`max_fetch_interval` idle latency - otherwise. +- [ ] **`LISTEN/NOTIFY` fallback understood** — a *connection* or + *permission* failure (`asyncpg.connect` / `add_listener` raising) logs a + WARNING once and falls back to polling. A **missing asyncpg driver or a + non-asyncpg engine URL falls back silently** (no log) — diagnose those + from the engine URL, not the logs. Either way the subscriber lives with + up-to-`max_fetch_interval` idle latency. diff --git a/docs/operations/troubleshooting.md b/docs/operations/troubleshooting.md index f7aa908..51a7c3d 100644 --- a/docs/operations/troubleshooting.md +++ b/docs/operations/troubleshooting.md @@ -98,8 +98,12 @@ the SQLAlchemy fetch connection; common failure modes are: the asyncpg driver isn't installed (no `[asyncpg]` extra), the engine URL is not asyncpg, or Postgres user lacks `LISTEN` permission. -**Diagnose.** Check startup logs for a WARNING noting NOTIFY fallback -to polling. The subscriber logs it once and continues without crashing. +**Diagnose.** A connection or permission failure (`asyncpg.connect` / +`add_listener` raising) logs a WARNING once at startup noting the NOTIFY +fallback to polling. A **missing asyncpg driver or a non-asyncpg engine URL +falls back silently** — there is no log line, so check the engine URL +(`drivername` must be `postgresql+asyncpg`) and that the `[asyncpg]` extra +is installed. **Fix.** Install the `[asyncpg]` extra and use an asyncpg-driven engine URL (`postgresql+asyncpg://...`). Restart the subscriber. diff --git a/docs/tutorials/add-kafka-relay.md b/docs/tutorials/add-kafka-relay.md index 9188b97..57ecead 100644 --- a/docs/tutorials/add-kafka-relay.md +++ b/docs/tutorials/add-kafka-relay.md @@ -230,8 +230,11 @@ the foreign publish would raise, the outbox row would be nacked, and the configured `retry_strategy` would reschedule it. The next dispatch re-runs the handler and re-attempts the foreign publish. The net effect is **at-least-once delivery to the foreign broker** — the outbox row is -the durability boundary, and it stays in the table until Kafka actually -acks the publish. +the durability boundary, and it stays in the table for the duration of the +retry budget (the default `ExponentialRetry` allows 10 attempts). Once the +budget is exhausted the row is deleted — the default configures no DLQ — so +configure a longer `retry_strategy` or a `dlq_table` to survive outages +beyond that (with the default schedule, ~13–14 minutes). In practice, `aiokafka`'s producer has its own client-side reconnect and retry logic, so a short Kafka outage usually completes from the diff --git a/docs/usage/dlq.md b/docs/usage/dlq.md index 665e92c..f215b75 100644 --- a/docs/usage/dlq.md +++ b/docs/usage/dlq.md @@ -179,8 +179,9 @@ interval '90 days'` from a daily cron is enough. ## Test broker -`TestOutboxBroker` accumulates audit rows in -`broker.fake_client.dlq_rows` so tests can assert on archive content +`TestOutboxBroker` accumulates audit rows on the harness's +`fake_client.dlq_rows` (bind the `TestOutboxBroker` to a name, as the +snippet below does) so tests can assert on archive content without a real Postgres. The fake mirrors the production CTE side-effect: the source row is removed from `fake_client.rows` and an audit dict is appended to `fake_client.dlq_rows` in the same call. diff --git a/docs/usage/fastapi.md b/docs/usage/fastapi.md index 4c465ff..fb6c8d7 100644 --- a/docs/usage/fastapi.md +++ b/docs/usage/fastapi.md @@ -25,7 +25,7 @@ from sqlalchemy import MetaData from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from faststream_outbox import make_outbox_table -from faststream_outbox.fastapi import OutboxBroker, OutboxRouter +from faststream_outbox.fastapi import OutboxRouter metadata = MetaData() @@ -53,11 +53,10 @@ async def handle( @router.post("/orders") async def create_order( order: OrderIn, - broker: OutboxBroker, session: AsyncSession = Depends(get_session), ) -> dict: session.add(Order(...)) - await broker.publish({"order_id": ...}, queue="orders", session=session) + await router.broker.publish({"order_id": ...}, queue="orders", session=session) return {"ok": True} @@ -105,6 +104,12 @@ They resolve via FastStream's `Context()` paths but go through FastAPI's dependency resolver, so `Depends(...)` and these shortcuts can be mixed freely. +These shortcuts resolve through FastStream's subscriber-dispatch +machinery, so they work **only inside `@router.subscriber` handlers** — not +in HTTP routes. In an HTTP route, reach the broker via `router.broker` (as +the quickstart's `create_order` does); a `broker: OutboxBroker` annotation +there resolves as a request field and fails with a 422. + ## What's intentionally not exposed `apply_types` and the broker's FastStream `dependencies` argument are diff --git a/docs/usage/observability.md b/docs/usage/observability.md index 6663eae..a5a7115 100644 --- a/docs/usage/observability.md +++ b/docs/usage/observability.md @@ -54,10 +54,10 @@ broken recorder never poisons the dispatch loop. | `fetched` | `queue`, `subscriber`, `count` | | Fetch loop, every cycle (including empty) | | `dispatched` | `queue`, `subscriber` | | Worker loop, before handler runs | | `acked` | `queue`, `subscriber` | `duration_seconds` | Handler returned successfully | -| `nacked_retried` | `queue`, `subscriber`, `attempts_count`, `deliveries_count` | `exception_type` | Retry scheduled | +| `nacked_retried` | `queue`, `subscriber`, `deliveries_count`, `duration_seconds`, `next_delay_seconds` | `exception_type` | Retry scheduled | | `nacked_terminal` | `queue`, `subscriber`, `deliveries_count`, `reason` | `exception_type` | Row terminally failed | | `lease_lost` | `queue`, `phase`, `row_id`, `deliveries_count` | | Terminal write found `rowcount == 0` | -| `published` | `queue`, `destination` | `duration_seconds`, `payload_size_bytes` | Producer INSERT committed | +| `published` | `queue`, `status`, `count`, `size_bytes`, `duration_seconds` | `exception_type` | Producer, after the INSERT executes (pre-commit; also fires on error with `status="error"`) | | `dlq_written` | `queue`, `subscriber`, `deliveries_count`, `failure_reason` | `exception_type` | DLQ CTE wrote an audit row | `reason` on `nacked_terminal` is one of `max_deliveries`, @@ -97,7 +97,7 @@ histogram_quantile(0.99, rate(faststream_published_messages_duration_seconds_bucket{broker="outbox"}[5m])) # DLQ misconfiguration: terminal-failure rate diverges from DLQ-write rate -rate(faststream_outbox_nacked_terminal_total[5m]) +rate(faststream_outbox_terminal_total[5m]) - rate(faststream_outbox_dlq_written_total[5m]) > 0 diff --git a/docs/usage/publisher.md b/docs/usage/publisher.md index edc5030..79cab0d 100644 --- a/docs/usage/publisher.md +++ b/docs/usage/publisher.md @@ -128,7 +128,8 @@ transactional contract applies (you provide the session, the row commits with your domain writes): ```python -from faststream_outbox import OutboxMessage, OutboxResponse +from faststream_outbox import OutboxResponse +from faststream_outbox.annotations import OutboxMessage @broker.subscriber("orders") diff --git a/docs/usage/relay.md b/docs/usage/relay.md index 60749a2..3971df3 100644 --- a/docs/usage/relay.md +++ b/docs/usage/relay.md @@ -28,7 +28,7 @@ boundary, and the relay carries an at-least-once guarantee end to end. from faststream.kafka import KafkaBroker from faststream_outbox import OutboxBroker -broker_outbox = OutboxBroker(engine=engine) +broker_outbox = OutboxBroker(engine, outbox_table=outbox_table) broker_kafka = KafkaBroker("127.0.0.1:9092") publisher_kafka = broker_kafka.publisher("kafka_topic") @@ -59,7 +59,7 @@ from fastapi import FastAPI from faststream.kafka.fastapi import KafkaRouter from faststream_outbox.fastapi import OutboxRouter -outbox_router = OutboxRouter(engine=engine) +outbox_router = OutboxRouter(engine, outbox_table=outbox_table) kafka_router = KafkaRouter("127.0.0.1:9092") publisher_kafka = kafka_router.publisher("kafka_topic") @@ -86,7 +86,7 @@ from faststream import FastStream from faststream.kafka import KafkaBroker from faststream_outbox import OutboxBroker -broker_outbox = OutboxBroker(engine=engine) +broker_outbox = OutboxBroker(engine, outbox_table=outbox_table) broker_kafka = KafkaBroker("127.0.0.1:9092") publisher_kafka = broker_kafka.publisher("kafka_topic") diff --git a/docs/usage/router.md b/docs/usage/router.md index f88c7e9..48c0957 100644 --- a/docs/usage/router.md +++ b/docs/usage/router.md @@ -56,7 +56,8 @@ app = FastStream(broker) decorators, which is useful for code-gen or plugin patterns: ```python -from faststream_outbox import OutboxRoute, OutboxRouter +from faststream_outbox import OutboxRouter +from faststream_outbox.router import OutboxRoute async def handle_order(order_id: int) -> None: diff --git a/docs/usage/setup-prometheus-opentelemetry.md b/docs/usage/setup-prometheus-opentelemetry.md index 71dbe29..6ff6979 100644 --- a/docs/usage/setup-prometheus-opentelemetry.md +++ b/docs/usage/setup-prometheus-opentelemetry.md @@ -165,7 +165,7 @@ from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor -from prometheus_client import REGISTRY, make_asgi_app +from prometheus_client import CollectorRegistry, make_asgi_app from sqlalchemy import MetaData from sqlalchemy.ext.asyncio import create_async_engine @@ -184,6 +184,12 @@ trace.set_tracer_provider(tracer_provider) meter_provider = MeterProvider(resource=resource, metric_readers=[PrometheusMetricReader()]) metrics.set_meter_provider(meter_provider) +# Two registries: the middleware and the recorder both define the same +# faststream_* consume/publish collectors, so sharing one registry raises +# "Duplicated timeseries in CollectorRegistry" at broker construction. +MIDDLEWARE_REGISTRY = CollectorRegistry() +RECORDER_REGISTRY = CollectorRegistry() + # ----- Outbox broker ----- metadata = MetaData() outbox_table = make_outbox_table(metadata, table_name="outbox") @@ -195,11 +201,11 @@ broker = OutboxBroker( middlewares=[ # Bus-scope spans + meters around consume_scope / publish_scope. OutboxTelemetryMiddleware(tracer_provider=tracer_provider, meter_provider=meter_provider), - OutboxPrometheusMiddleware(registry=REGISTRY, app_name="my-outbox-service"), + OutboxPrometheusMiddleware(registry=MIDDLEWARE_REGISTRY, app_name="my-outbox-service"), ], - # Outbox-internal events (fetched, lease_lost, terminal reasons) that have - # no message context and can't reach the middleware bus. - metrics_recorder=PrometheusRecorder(registry=REGISTRY, app_name="my-outbox-service"), + # Outbox-internal events (fetched, lease_lost, terminal reasons, dlq_written) + # that have no message context and can't reach the middleware bus. + metrics_recorder=PrometheusRecorder(registry=RECORDER_REGISTRY, app_name="my-outbox-service"), ) @@ -207,12 +213,28 @@ broker = OutboxBroker( async def handle_order(body: dict) -> None: ... -app = AsgiFastStream(broker, asgi_routes=[("/metrics", make_asgi_app(registry=REGISTRY))]) +app = AsgiFastStream( + broker, + asgi_routes=[ + ("/metrics", make_asgi_app(registry=MIDDLEWARE_REGISTRY)), + ("/metrics/outbox", make_asgi_app(registry=RECORDER_REGISTRY)), + ], +) ``` -Traces flow to OTLP (Jaeger / Tempo / Honeycomb / collector); meters -and the recorder's outbox-internal counters land on `/metrics` for -Prometheus to scrape. One process, one ASGI app, one scrape endpoint. +Traces flow to OTLP (Jaeger / Tempo / Honeycomb / collector); the +middleware's meters land on `/metrics` and the recorder's outbox-internal +counters on `/metrics/outbox` for Prometheus to scrape — two scrape targets, +one process. + +**The two seams overlap on consume/publish series.** Both the middleware +and the recorder emit the same `faststream_received_*` / `faststream_published_*` +collectors, which is why they must live on **separate registries** (above) — +sharing one raises `Duplicated timeseries in CollectorRegistry` at broker +construction, and summing across both double-counts every consume and +publish. Treat the middleware as the source of truth for consume/publish; +the recorder's unique value is the outbox-internal events the middleware +can't see (`fetched`, `lease_lost`, terminal reasons, `dlq_written`). The providers set `messaging.system = "outbox"`, matching the recorder-seam adapters. The OTel provider maps `row.id → diff --git a/docs/usage/subscriber.md b/docs/usage/subscriber.md index c564e7a..8ecb2bd 100644 --- a/docs/usage/subscriber.md +++ b/docs/usage/subscriber.md @@ -131,10 +131,11 @@ guarantee. The factory rejects it at registration. ```python from faststream import AckPolicy +from faststream_outbox.annotations import OutboxMessage @broker.subscriber("audit", ack_policy=AckPolicy.MANUAL) -async def handle(msg, body: dict) -> None: +async def handle(msg: OutboxMessage, body: dict) -> None: try: await write_audit(body) await msg.ack() @@ -184,13 +185,13 @@ Strategies receive the raised `exception` so users may subclass for ```python class TransientOnly(ExponentialRetry): - def get_next_attempt_at(self, *, exception=None, **kw): + def get_next_attempt_delay(self, *, exception=None, **kw): if exception and not isinstance(exception, TransientError): return None # terminal — DELETE - return super().get_next_attempt_at(exception=exception, **kw) + return super().get_next_attempt_delay(exception=exception, **kw) ``` -Returning `None` from `get_next_attempt_at` signals a terminal failure. +Returning `None` from `get_next_attempt_delay` signals a terminal failure. `_RetryStrategyTemplate` also enforces `max_attempts` and `max_total_delay_seconds` for you. diff --git a/docs/usage/testing.md b/docs/usage/testing.md index bd0a818..3bef0f4 100644 --- a/docs/usage/testing.md +++ b/docs/usage/testing.md @@ -36,9 +36,18 @@ async def test_handler() -> None: ``` In sync mode, `session=` is optional — the test broker patches -`broker.publish` to ignore it. The fake client maintains an in-memory list -of `_FakeRow` dicts which you can inspect via -`broker.fake_client.rows`. +`broker.publish` to ignore it. The fake client keeps an in-memory list of +rows you can inspect via `fake_client.rows` — but `fake_client` is an +attribute of the `TestOutboxBroker` harness, not the broker, so bind the +harness to a name: + +```python +tb = TestOutboxBroker(broker) +async with tb: + await broker.publish(1, queue="orders") + +assert len(tb.fake_client.rows) == 1 +``` ## Testing publishers @@ -85,8 +94,8 @@ return`). ## Notes - **`activate_in` / `activate_at` are ignored in sync mode.** Timers fire - immediately. The intended firing time is preserved on - `broker.fake_client.rows[i].next_attempt_at` for assertions. Use + immediately. The intended firing time is preserved on the harness's + `fake_client.rows[i].next_attempt_at` for assertions. Use `run_loops=True` if you need scheduled delivery to actually wait. - **`cancel_timer` and `fetch_unprocessed` are patched** to operate on the fake client. The `session` argument is ignored in tests. diff --git a/docs/usage/timers.md b/docs/usage/timers.md index c68f529..c559b63 100644 --- a/docs/usage/timers.md +++ b/docs/usage/timers.md @@ -120,7 +120,8 @@ In tests using `TestOutboxBroker` (default `run_loops=False` mode), — sync dispatch ignores `next_attempt_at`. This trades production parity for test ergonomics: tests can assert handler effects without time travel. -The schedule is still recorded on the fake row -(`broker.fake_client.rows[0].next_attempt_at`) if a test needs to assert -on it. Pass `run_loops=True` if you need scheduled delivery to actually +The schedule is still recorded on the fake row — bind the +`TestOutboxBroker` to a name and read +`tb.fake_client.rows[0].next_attempt_at` — if a test needs to assert on it. +Pass `run_loops=True` if you need scheduled delivery to actually wait. See [Testing](./testing.md).