Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions docs/operations/checklist.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ story.
`[validate]` extra. Do **not** call at `broker.start()` — that
would crash-loop on a pending migration. See [Schema validation §
Where to call it](../usage/schema-validation.md#where-to-call-it).
- [ ] **Outbox `table_name` ≤ ~56 chars** — NOTIFY channel name is
`outbox_<table_name>`. Postgres' 63-char identifier limit silently
truncates longer names and `LISTEN/NOTIFY` short-circuit degrades
to plain polling.
- [ ] **Outbox `table_name` short enough for the NOTIFY channel** — the
channel name is `outbox_<table_name>`, and `make_outbox_table` raises
`ValueError` at table-build time when that exceeds Postgres' 63-**byte**
identifier limit. There is no silent truncation or polling fallback — the
guard makes an over-long name impossible to ship.

## Observability

Expand All @@ -76,8 +77,9 @@ story.
- [ ] **Alert on `lease_lost` rate** — non-zero means
`lease_ttl_seconds < handler P99` for at least one subscriber. See
[Troubleshooting § `event=lease_lost`](./troubleshooting.md#event-lease_lost-recurring-in-logs).
- [ ] **`LISTEN/NOTIFY` fallback warning checked at startup** — if
the asyncpg connection fails (driver missing, permission error),
the subscriber logs once and falls back to polling. Operator
silently lives with up-to-`max_fetch_interval` idle latency
otherwise.
- [ ] **`LISTEN/NOTIFY` fallback understood** — a *connection* or
*permission* failure (`asyncpg.connect` / `add_listener` raising) logs a
WARNING once and falls back to polling. A **missing asyncpg driver or a
non-asyncpg engine URL falls back silently** (no log) — diagnose those
from the engine URL, not the logs. Either way the subscriber lives with
up-to-`max_fetch_interval` idle latency.
8 changes: 6 additions & 2 deletions docs/operations/troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,12 @@ the SQLAlchemy fetch connection; common failure modes are: the
asyncpg driver isn't installed (no `[asyncpg]` extra), the engine URL
is not asyncpg, or Postgres user lacks `LISTEN` permission.

**Diagnose.** Check startup logs for a WARNING noting NOTIFY fallback
to polling. The subscriber logs it once and continues without crashing.
**Diagnose.** A connection or permission failure (`asyncpg.connect` /
`add_listener` raising) logs a WARNING once at startup noting the NOTIFY
fallback to polling. A **missing asyncpg driver or a non-asyncpg engine URL
falls back silently** — there is no log line, so check the engine URL
(`drivername` must be `postgresql+asyncpg`) and that the `[asyncpg]` extra
is installed.

**Fix.** Install the `[asyncpg]` extra and use an asyncpg-driven
engine URL (`postgresql+asyncpg://...`). Restart the subscriber.
Expand Down
7 changes: 5 additions & 2 deletions docs/tutorials/add-kafka-relay.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,11 @@ the foreign publish would raise, the outbox row would be nacked, and
the configured `retry_strategy` would reschedule it. The next dispatch
re-runs the handler and re-attempts the foreign publish. The net effect
is **at-least-once delivery to the foreign broker** — the outbox row is
the durability boundary, and it stays in the table until Kafka actually
acks the publish.
the durability boundary, and it stays in the table for the duration of the
retry budget (the default `ExponentialRetry` allows 10 attempts). Once the
budget is exhausted the row is deleted — the default configures no DLQ — so
configure a longer `retry_strategy` or a `dlq_table` to survive outages
beyond that (with the default schedule, ~13–14 minutes).

In practice, `aiokafka`'s producer has its own client-side reconnect
and retry logic, so a short Kafka outage usually completes from the
Expand Down
5 changes: 3 additions & 2 deletions docs/usage/dlq.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,9 @@ interval '90 days'` from a daily cron is enough.

## Test broker

`TestOutboxBroker` accumulates audit rows in
`broker.fake_client.dlq_rows` so tests can assert on archive content
`TestOutboxBroker` accumulates audit rows on the harness's
`fake_client.dlq_rows` (bind the `TestOutboxBroker` to a name, as the
snippet below does) so tests can assert on archive content
without a real Postgres. The fake mirrors the production CTE
side-effect: the source row is removed from `fake_client.rows` and an
audit dict is appended to `fake_client.dlq_rows` in the same call.
Expand Down
11 changes: 8 additions & 3 deletions docs/usage/fastapi.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine

from faststream_outbox import make_outbox_table
from faststream_outbox.fastapi import OutboxBroker, OutboxRouter
from faststream_outbox.fastapi import OutboxRouter


metadata = MetaData()
Expand Down Expand Up @@ -53,11 +53,10 @@ async def handle(
@router.post("/orders")
async def create_order(
order: OrderIn,
broker: OutboxBroker,
session: AsyncSession = Depends(get_session),
) -> dict:
session.add(Order(...))
await broker.publish({"order_id": ...}, queue="orders", session=session)
await router.broker.publish({"order_id": ...}, queue="orders", session=session)
return {"ok": True}


Expand Down Expand Up @@ -105,6 +104,12 @@ They resolve via FastStream's `Context()` paths but go through FastAPI's
dependency resolver, so `Depends(...)` and these shortcuts can be mixed
freely.

These shortcuts resolve through FastStream's subscriber-dispatch
machinery, so they work **only inside `@router.subscriber` handlers** — not
in HTTP routes. In an HTTP route, reach the broker via `router.broker` (as
the quickstart's `create_order` does); a `broker: OutboxBroker` annotation
there resolves as a request field and fails with a 422.

## What's intentionally not exposed

`apply_types` and the broker's FastStream `dependencies` argument are
Expand Down
6 changes: 3 additions & 3 deletions docs/usage/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ broken recorder never poisons the dispatch loop.
| `fetched` | `queue`, `subscriber`, `count` | | Fetch loop, every cycle (including empty) |
| `dispatched` | `queue`, `subscriber` | | Worker loop, before handler runs |
| `acked` | `queue`, `subscriber` | `duration_seconds` | Handler returned successfully |
| `nacked_retried` | `queue`, `subscriber`, `attempts_count`, `deliveries_count` | `exception_type` | Retry scheduled |
| `nacked_retried` | `queue`, `subscriber`, `deliveries_count`, `duration_seconds`, `next_delay_seconds` | `exception_type` | Retry scheduled |
| `nacked_terminal` | `queue`, `subscriber`, `deliveries_count`, `reason` | `exception_type` | Row terminally failed |
| `lease_lost` | `queue`, `phase`, `row_id`, `deliveries_count` | | Terminal write found `rowcount == 0` |
| `published` | `queue`, `destination` | `duration_seconds`, `payload_size_bytes` | Producer INSERT committed |
| `published` | `queue`, `status`, `count`, `size_bytes`, `duration_seconds` | `exception_type` | Producer, after the INSERT executes (pre-commit; also fires on error with `status="error"`) |
| `dlq_written` | `queue`, `subscriber`, `deliveries_count`, `failure_reason` | `exception_type` | DLQ CTE wrote an audit row |

`reason` on `nacked_terminal` is one of `max_deliveries`,
Expand Down Expand Up @@ -97,7 +97,7 @@ histogram_quantile(0.99,
rate(faststream_published_messages_duration_seconds_bucket{broker="outbox"}[5m]))

# DLQ misconfiguration: terminal-failure rate diverges from DLQ-write rate
rate(faststream_outbox_nacked_terminal_total[5m])
rate(faststream_outbox_terminal_total[5m])
-
rate(faststream_outbox_dlq_written_total[5m])
> 0
Expand Down
3 changes: 2 additions & 1 deletion docs/usage/publisher.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ transactional contract applies (you provide the session, the row commits
with your domain writes):

```python
from faststream_outbox import OutboxMessage, OutboxResponse
from faststream_outbox import OutboxResponse
from faststream_outbox.annotations import OutboxMessage


@broker.subscriber("orders")
Expand Down
6 changes: 3 additions & 3 deletions docs/usage/relay.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ boundary, and the relay carries an at-least-once guarantee end to end.
from faststream.kafka import KafkaBroker
from faststream_outbox import OutboxBroker

broker_outbox = OutboxBroker(engine=engine)
broker_outbox = OutboxBroker(engine, outbox_table=outbox_table)
broker_kafka = KafkaBroker("127.0.0.1:9092")
publisher_kafka = broker_kafka.publisher("kafka_topic")

Expand Down Expand Up @@ -59,7 +59,7 @@ from fastapi import FastAPI
from faststream.kafka.fastapi import KafkaRouter
from faststream_outbox.fastapi import OutboxRouter

outbox_router = OutboxRouter(engine=engine)
outbox_router = OutboxRouter(engine, outbox_table=outbox_table)
kafka_router = KafkaRouter("127.0.0.1:9092")

publisher_kafka = kafka_router.publisher("kafka_topic")
Expand All @@ -86,7 +86,7 @@ from faststream import FastStream
from faststream.kafka import KafkaBroker
from faststream_outbox import OutboxBroker

broker_outbox = OutboxBroker(engine=engine)
broker_outbox = OutboxBroker(engine, outbox_table=outbox_table)
broker_kafka = KafkaBroker("127.0.0.1:9092")
publisher_kafka = broker_kafka.publisher("kafka_topic")

Expand Down
3 changes: 2 additions & 1 deletion docs/usage/router.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ app = FastStream(broker)
decorators, which is useful for code-gen or plugin patterns:

```python
from faststream_outbox import OutboxRoute, OutboxRouter
from faststream_outbox import OutboxRouter
from faststream_outbox.router import OutboxRoute


async def handle_order(order_id: int) -> None:
Expand Down
40 changes: 31 additions & 9 deletions docs/usage/setup-prometheus-opentelemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from prometheus_client import REGISTRY, make_asgi_app
from prometheus_client import CollectorRegistry, make_asgi_app
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import create_async_engine

Expand All @@ -184,6 +184,12 @@ trace.set_tracer_provider(tracer_provider)
meter_provider = MeterProvider(resource=resource, metric_readers=[PrometheusMetricReader()])
metrics.set_meter_provider(meter_provider)

# Two registries: the middleware and the recorder both define the same
# faststream_* consume/publish collectors, so sharing one registry raises
# "Duplicated timeseries in CollectorRegistry" at broker construction.
MIDDLEWARE_REGISTRY = CollectorRegistry()
RECORDER_REGISTRY = CollectorRegistry()

# ----- Outbox broker -----
metadata = MetaData()
outbox_table = make_outbox_table(metadata, table_name="outbox")
Expand All @@ -195,24 +201,40 @@ broker = OutboxBroker(
middlewares=[
# Bus-scope spans + meters around consume_scope / publish_scope.
OutboxTelemetryMiddleware(tracer_provider=tracer_provider, meter_provider=meter_provider),
OutboxPrometheusMiddleware(registry=REGISTRY, app_name="my-outbox-service"),
OutboxPrometheusMiddleware(registry=MIDDLEWARE_REGISTRY, app_name="my-outbox-service"),
],
# Outbox-internal events (fetched, lease_lost, terminal reasons) that have
# no message context and can't reach the middleware bus.
metrics_recorder=PrometheusRecorder(registry=REGISTRY, app_name="my-outbox-service"),
# Outbox-internal events (fetched, lease_lost, terminal reasons, dlq_written)
# that have no message context and can't reach the middleware bus.
metrics_recorder=PrometheusRecorder(registry=RECORDER_REGISTRY, app_name="my-outbox-service"),
)


@broker.subscriber("orders", max_workers=4)
async def handle_order(body: dict) -> None: ...


app = AsgiFastStream(broker, asgi_routes=[("/metrics", make_asgi_app(registry=REGISTRY))])
app = AsgiFastStream(
broker,
asgi_routes=[
("/metrics", make_asgi_app(registry=MIDDLEWARE_REGISTRY)),
("/metrics/outbox", make_asgi_app(registry=RECORDER_REGISTRY)),
],
)
```

Traces flow to OTLP (Jaeger / Tempo / Honeycomb / collector); meters
and the recorder's outbox-internal counters land on `/metrics` for
Prometheus to scrape. One process, one ASGI app, one scrape endpoint.
Traces flow to OTLP (Jaeger / Tempo / Honeycomb / collector); the
middleware's meters land on `/metrics` and the recorder's outbox-internal
counters on `/metrics/outbox` for Prometheus to scrape — two scrape targets,
one process.

**The two seams overlap on consume/publish series.** Both the middleware
and the recorder emit the same `faststream_received_*` / `faststream_published_*`
collectors, which is why they must live on **separate registries** (above) —
sharing one raises `Duplicated timeseries in CollectorRegistry` at broker
construction, and summing across both double-counts every consume and
publish. Treat the middleware as the source of truth for consume/publish;
the recorder's unique value is the outbox-internal events the middleware
can't see (`fetched`, `lease_lost`, terminal reasons, `dlq_written`).

The providers set `messaging.system = "outbox"`, matching the
recorder-seam adapters. The OTel provider maps `row.id →
Expand Down
9 changes: 5 additions & 4 deletions docs/usage/subscriber.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,11 @@ guarantee. The factory rejects it at registration.

```python
from faststream import AckPolicy
from faststream_outbox.annotations import OutboxMessage


@broker.subscriber("audit", ack_policy=AckPolicy.MANUAL)
async def handle(msg, body: dict) -> None:
async def handle(msg: OutboxMessage, body: dict) -> None:
try:
await write_audit(body)
await msg.ack()
Expand Down Expand Up @@ -184,13 +185,13 @@ Strategies receive the raised `exception` so users may subclass for

```python
class TransientOnly(ExponentialRetry):
def get_next_attempt_at(self, *, exception=None, **kw):
def get_next_attempt_delay(self, *, exception=None, **kw):
if exception and not isinstance(exception, TransientError):
return None # terminal — DELETE
return super().get_next_attempt_at(exception=exception, **kw)
return super().get_next_attempt_delay(exception=exception, **kw)
```

Returning `None` from `get_next_attempt_at` signals a terminal failure.
Returning `None` from `get_next_attempt_delay` signals a terminal failure.
`_RetryStrategyTemplate` also enforces `max_attempts` and
`max_total_delay_seconds` for you.

Expand Down
19 changes: 14 additions & 5 deletions docs/usage/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,18 @@ async def test_handler() -> None:
```

In sync mode, `session=` is optional — the test broker patches
`broker.publish` to ignore it. The fake client maintains an in-memory list
of `_FakeRow` dicts which you can inspect via
`broker.fake_client.rows`.
`broker.publish` to ignore it. The fake client keeps an in-memory list of
rows you can inspect via `fake_client.rows` — but `fake_client` is an
attribute of the `TestOutboxBroker` harness, not the broker, so bind the
harness to a name:

```python
tb = TestOutboxBroker(broker)
async with tb:
await broker.publish(1, queue="orders")

assert len(tb.fake_client.rows) == 1
```

## Testing publishers

Expand Down Expand Up @@ -85,8 +94,8 @@ return`).
## Notes

- **`activate_in` / `activate_at` are ignored in sync mode.** Timers fire
immediately. The intended firing time is preserved on
`broker.fake_client.rows[i].next_attempt_at` for assertions. Use
immediately. The intended firing time is preserved on the harness's
`fake_client.rows[i].next_attempt_at` for assertions. Use
`run_loops=True` if you need scheduled delivery to actually wait.
- **`cancel_timer` and `fetch_unprocessed` are patched** to operate on the
fake client. The `session` argument is ignored in tests.
Expand Down
7 changes: 4 additions & 3 deletions docs/usage/timers.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ In tests using `TestOutboxBroker` (default `run_loops=False` mode),
— sync dispatch ignores `next_attempt_at`. This trades production parity
for test ergonomics: tests can assert handler effects without time travel.

The schedule is still recorded on the fake row
(`broker.fake_client.rows[0].next_attempt_at`) if a test needs to assert
on it. Pass `run_loops=True` if you need scheduled delivery to actually
The schedule is still recorded on the fake row — bind the
`TestOutboxBroker` to a name and read
`tb.fake_client.rows[0].next_attempt_at` — if a test needs to assert on it.
Pass `run_loops=True` if you need scheduled delivery to actually
wait. See [Testing](./testing.md).