diff --git a/CLAUDE.md b/CLAUDE.md index 02ffa738..9d3a7886 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -304,3 +304,4 @@ Errors are classified by origin (user vs infra) and retryability. The framework 3. **Extensions return plain errors** — extension interfaces (`MergeChecker`, `Storage`, `Publisher`) return standard `error` values with their own domain sentinels (e.g. `storage.ErrNotFound`). They do NOT classify errors as user or infra. 4. **Controllers classify errors** — the service controller that calls an extension decides whether the failure is user-caused or infrastructure-caused. The same extension error may be classified differently depending on context. 5. **Error chain works end-to-end** — extensions wrap custom errors, controllers wrap with `errs.New*Error`, and `errors.Is`/`errors.As` walks the full chain. +6. **Default classifiers** — primary pipeline consumers compose one or more classifiers (each owning a focused concern such as transport-level signals or a specific driver/library's errors) into `errs.NewClassifierProcessor(...)`. Pick classifiers that match the failure surfaces the consumer actually touches; add a new classifier package when a backend introduces error shapes that no existing one understands, rather than teaching an unrelated classifier about them. DLQ reconciliation consumers use `errs.AlwaysRetryableProcessor` instead so any failure is redelivered rather than dropped. diff --git a/core/errs/BUILD.bazel b/core/errs/BUILD.bazel index 05c655d0..82f152d7 100644 --- a/core/errs/BUILD.bazel +++ b/core/errs/BUILD.bazel @@ -2,14 +2,20 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "errs", - srcs = ["errs.go"], + srcs = [ + "errs.go", + "processor.go", + ], importpath = "github.com/uber/submitqueue/core/errs", visibility = ["//visibility:public"], ) go_test( name = "errs_test", - srcs = ["errs_test.go"], + srcs = [ + "errs_test.go", + "processor_test.go", + ], embed = [":errs"], deps = [ "@com_github_stretchr_testify//assert", diff --git a/core/errs/README.md b/core/errs/README.md index 1f51f4a1..8a1c5407 100644 --- a/core/errs/README.md +++ b/core/errs/README.md @@ -23,11 +23,11 @@ Errors are classified along two axes: A returned `error` reaches `IsUserError` / `IsRetryable` / `IsDependencyError` carrying one of the framework types (`*userError` / `*infraError`). It gets there one of two ways: 1. **Explicit wrap by the controller** — the controller knows the meaning of the failure and wraps the cause with `NewUserError`, `NewRetryableError`, `NewDependencyError`, or `NewRetryableDependencyError` before returning. -2. **Automatic wrap by `Classify`** — the controller returns a raw driver/library/sentinel error, and a per-backend `Classifier` recognises it later in the pipeline (typically inside the consumer) and adds the appropriate framework wrap. +2. **Automatic wrap by the classifier-based `ErrorProcessor`** — the controller returns a raw driver/library/sentinel error, and a per-backend `Classifier` recognises it later in the pipeline (typically inside the consumer, after `ErrorProcessor.Process` runs) and adds the appropriate framework wrap. Both routes feed the same downstream helpers; the chain that reaches `IsRetryable` looks identical regardless of who wrapped it. -## `Classify` and the `Classifier` Interface +## `ErrorProcessor`, `Classifier`, and the Processing Pass `Classifier` inspects a **single error node** and returns a `Verdict`: @@ -39,14 +39,22 @@ type Classifier interface { Verdicts: `Unknown` (this node carries no signal), `User`, `Infra`, `InfraRetryable`, `InfraDependency`, `InfraDependencyRetryable`. -`Classify(err, classifiers...)` is the single, explicit pass that turns a raw chain into a wrapped one. It is called **exactly once per chain** — typically by the consumer immediately after the controller returns. After that point, callers use only the `IsXxx` helpers, which are pure type checks. +An `ErrorProcessor` runs the per-chain pass that turns a raw chain into a wrapped one. It is called **exactly once per chain** — typically by the consumer immediately after the controller returns. After that point, callers use only the `IsXxx` helpers, which are pure type checks. -`Classify` walks the chain twice: +Two implementations ship in this package: -1. **Pass 1 — framework-wrap check.** A cheap type switch looks for an existing `*userError` / `*infraError` anywhere in the chain. If found, the chain is already interpretable and `Classify` returns `err` unchanged. **No classifier is invoked.** -2. **Pass 2 — classifier walk.** From outermost to innermost node, each registered classifier is asked for a verdict. The first non-`Unknown` verdict wins and `err` is wrapped with the matching framework constructor. +- **`NewClassifierProcessor(classifiers...)`** — the standard pass for primary pipeline consumers. Walks the chain twice: + 1. **Pass 1 — framework-wrap check.** A cheap type switch looks for an existing `*userError` / `*infraError` anywhere in the chain. If found, the chain is already interpretable and the processor returns `err` unchanged. **No classifier is invoked.** + 2. **Pass 2 — classifier walk.** From outermost to innermost node, each registered classifier is asked for a verdict. The first non-`Unknown` verdict wins and `err` is wrapped with the matching framework constructor. -If no classifier recognises anything, `err` is returned unchanged — and behaves as non-retryable infra at the helper layer. + If no classifier recognises anything, `err` is returned unchanged — and behaves as non-retryable infra at the helper layer. + +- **`AlwaysRetryableProcessor`** — unconditionally wraps every non-nil error with `NewRetryableError`, overriding any inner framework wrap. Use it for narrowly-scoped consumers — typically DLQ reconciliation — that must redeliver on any failure because there is no further dead-letter destination. Side-effect: an inner `*infraError(dependency=true)` is masked by the outer `retryable=true` wrap, since `errors.As` matches the outermost `*infraError` first. This is acceptable for the intended DLQ use case where only `IsRetryable` drives transport behaviour; do not pair this processor with a primary pipeline consumer or genuine user errors will retry forever instead of reaching their DLQ. + +### Choosing a processor + +- **Primary pipeline consumer** → `NewClassifierProcessor(...)`. Controllers' explicit `NewUserError` / `NewDependencyError` wraps must survive so user errors don't get retried, and unclassified backend errors must be inspected by the registered classifiers. +- **DLQ reconciliation consumer** → `AlwaysRetryableProcessor`. The DLQ is the last stop; any unprocessable message must come back for another attempt rather than silently drop. The DLQ subscription itself runs with a very high `Retry.MaxAttempts` and with its own DLQ disabled, so "always retryable + bounded-but-effectively-infinite attempts" is the convergence guarantee. ## Adding a Backend-Specific Classifier @@ -77,21 +85,24 @@ func (classifier) Classify(err error) errs.Verdict { } ``` -Servers wire each classifier into the consumer as a vararg. Order matters only when two classifiers might both match a node — earlier classifiers win: +Servers wire each classifier into the consumer's `ErrorProcessor`. Order matters only when two classifiers might both match a node — earlier classifiers win: ```go import ( + "github.com/uber/submitqueue/core/errs" genericerrs "github.com/uber/submitqueue/core/errs/generic" mysqlerrs "github.com/uber/submitqueue/core/errs/mysql" ) c := consumer.New(logger, scope, registry, - genericerrs.Classifier, - mysqlerrs.Classifier, + errs.NewClassifierProcessor( + genericerrs.Classifier, + mysqlerrs.Classifier, + ), ) ``` -Tests follow the same shape: assert per-node behaviour against `Classifier.Classify(node)` directly, and assert end-to-end behaviour by running `errs.Classify(err, Classifier)` and checking the helpers (`IsRetryable`, `IsUserError`, …) on the result. See `core/errs/mysql/mysql_test.go` and `core/errs/generic/generic_test.go`. +Tests follow the same shape: assert per-node behaviour against `Classifier.Classify(node)` directly, and assert end-to-end behaviour by running `errs.NewClassifierProcessor(Classifier).Process(err)` and checking the helpers (`IsRetryable`, `IsUserError`, …) on the result. See `core/errs/mysql/mysql_test.go` and `core/errs/generic/generic_test.go`. ## Overriding Classification from a Controller @@ -106,8 +117,9 @@ if errors.Is(err, storage.ErrNotFound) { return errs.NewUserError(fmt.Errorf("request %s: %w", id, err)) } if err != nil { - // Hand the raw error to Classify — the mysql classifier will recognise - // deadlocks, lock-wait timeouts, etc. and wrap them as retryable infra. + // Hand the raw error to the consumer's ErrorProcessor — the mysql + // classifier will recognise deadlocks, lock-wait timeouts, etc. and wrap + // them as retryable infra. return fmt.Errorf("get %s: %w", id, err) } ``` @@ -119,7 +131,7 @@ Two practical rules fall out of the short-circuit semantics: ## Extensions Return Plain Go Errors -Extension interfaces (`MergeChecker`, `Storage`, `Publisher`) return standard `error` values. They may define their own domain-specific sentinel errors (e.g. `storage.ErrNotFound`, `storage.ErrVersionMismatch`) but they do **not** classify errors as user or infra — that is the controller's (and `Classify`'s) job. +Extension interfaces (`MergeChecker`, `Storage`, `Publisher`) return standard `error` values. They may define their own domain-specific sentinel errors (e.g. `storage.ErrNotFound`, `storage.ErrVersionMismatch`) but they do **not** classify errors as user or infra — that is the controller's (and the consumer's `ErrorProcessor`'s) job. This separation keeps extensions reusable across contexts. The same `storage.ErrNotFound` might be a user error in one controller (user requested a non-existent resource) and an infra error in another (expected record is missing). @@ -151,4 +163,4 @@ errors.Is(err, ErrNotFound) // true — cause is in the chain | `IsRetryable(err)` | `err` is or wraps an infra error with the retryable flag set | | `IsDependencyError(err)` | `err` is or wraps an infra error marked as dependency | -All three are type-only checks. They do not invoke classifiers — pair them with a preceding `Classify` call when the controller's error may not carry an explicit wrap. +All three are type-only checks. They do not invoke classifiers — pair them with a preceding `ErrorProcessor.Process` call when the controller's error may not carry an explicit wrap. diff --git a/core/errs/errs.go b/core/errs/errs.go index 667b6f95..b4c40f98 100644 --- a/core/errs/errs.go +++ b/core/errs/errs.go @@ -143,8 +143,8 @@ const ( // // Classifiers must not call errors.As / errors.Is themselves, which would walk // the chain and could shadow a classification carried by an outer node (such -// as a controller's explicit NewUserError wrap). The package-level Classify -// function owns the walk. +// as a controller's explicit NewUserError wrap). The classifier-based +// ErrorProcessor (see NewClassifierProcessor) owns the walk. // // Classifiers are typically stateless; the canonical convention is to expose a // package-level singleton value (e.g. mysqlerrs.Classifier) rather than a @@ -153,80 +153,6 @@ type Classifier interface { Classify(err error) Verdict } -// Classify is the single, explicit classification pass. It is intended to be -// called exactly once per error chain — typically by the consumer immediately -// after a controller returns — and produces a chain that subsequent IsUserError -// / IsRetryable / IsDependencyError calls can interpret with simple type -// checks (no further classifier walks). -// -// Semantics: -// -// - nil in, nil out. -// - If err's chain already carries a framework classification (*userError or -// *infraError anywhere in the chain), returns err unchanged — the chain is -// already interpretable by IsUserError / IsRetryable / IsDependencyError. -// - Otherwise, walks the chain from outermost to innermost, asking each -// classifier per node. The FIRST non-Unknown verdict wins; the outermost -// such node determines the wrap. err is wrapped with the framework -// constructor matching that verdict (User -> NewUserError, InfraRetryable -// -> NewRetryableError, etc.) and the wrapped error is returned. -// - Verdict Infra means "non-retryable infra" — which is already the default -// behavior for an unwrapped chain, so no wrap is added. -// - If no classifier recognises anything, err is returned unchanged. -// -// Implementation: two passes over the chain. Pass 1 is a cheap type check -// looking for an existing framework wrap and short-circuits if one is found — -// no classifier is invoked. Pass 2 runs the configured classifiers per node. -// Walking the chain is cheap relative to a classifier call, so this avoids -// running classifiers whenever the chain is already classified deeper down. -// -// NOTE: this central classifier model cannot disambiguate errors of the same -// underlying type produced by different extensions (e.g. a net.OpError from a -// mysql connection vs the same type from an HTTP caller would both match the -// mysql classifier here). Resolving that requires per-extension provenance -// tagging; intentionally deferred. -func Classify(err error, classifiers ...Classifier) error { - if err == nil { - return nil - } - - // Pass 1 — cheap framework-wrap check. If any node already carries a - // framework type, the chain is interpretable as-is and classifiers are - // never invoked. - for cur := err; cur != nil; cur = errors.Unwrap(cur) { - switch cur.(type) { - case *userError, *infraError: - return err - } - } - - // Pass 2 — run classifiers per node from outermost to innermost. Stop at - // the first non-Unknown verdict. - var verdict Verdict - for cur := err; cur != nil && verdict == Unknown; cur = errors.Unwrap(cur) { - for _, c := range classifiers { - if v := c.Classify(cur); v != Unknown { - verdict = v - break - } - } - } - - switch verdict { - case User: - return NewUserError(err) - case InfraRetryable: - return NewRetryableError(err) - case InfraDependency: - return NewDependencyError(err) - case InfraDependencyRetryable: - return NewRetryableDependencyError(err) - } - // Unknown or Infra — no wrap needed; the existing chain already behaves as - // non-retryable infra at the IsRetryable / IsUserError layer. - return err -} - // IsUserError reports whether err is or wraps a user error, i.e. an error // produced by NewUserError. Inspects only the framework types in the chain. func IsUserError(err error) bool { diff --git a/core/errs/generic/generic.go b/core/errs/generic/generic.go index 47e794c0..bfc27aaf 100644 --- a/core/errs/generic/generic.go +++ b/core/errs/generic/generic.go @@ -25,16 +25,19 @@ import ( // Classifier recognises generic, non-backend-specific errors and returns // errs.Unknown for anything it does not recognise so the surrounding -// errs.Classify walker can keep looking down the unwrap chain. +// classifier-processor walk can keep looking down the unwrap chain. // // The classifier is stateless; this package-level singleton is the canonical -// handle. Pass it into consumer.New as a vararg. +// handle. Pass it as one of the variadic classifiers to +// errs.NewClassifierProcessor; the resulting processor is what gets handed to +// consumer.New. var Classifier errs.Classifier = classifier{} type classifier struct{} // Classify inspects a single node. Per the errs.Classifier contract, this -// must not call errors.Is / errors.As — errs.Classify owns the chain walk. +// must not call errors.Is / errors.As — the classifier-processor owns the +// chain walk. func (classifier) Classify(err error) errs.Verdict { // Cancellation signals that the caller aborted the work in flight // (process shutdown, deadline on the inbound RPC, parent operation gone) — @@ -44,8 +47,8 @@ func (classifier) Classify(err error) errs.Verdict { // Cases where cancellation truly means "do not run this again" are // caller-specific and should be expressed by wrapping with an explicit // NewUserError / NewDependencyError before returning; the pass-1 - // framework-wrap check in errs.Classify will then short-circuit before - // this classifier is consulted. + // framework-wrap check in the classifier-processor will then short-circuit + // before this classifier is consulted. if err == context.Canceled { return errs.InfraRetryable } diff --git a/core/errs/generic/generic_test.go b/core/errs/generic/generic_test.go index 048f2efd..0aa0150c 100644 --- a/core/errs/generic/generic_test.go +++ b/core/errs/generic/generic_test.go @@ -34,8 +34,8 @@ func TestClassifier_Unknown(t *testing.T) { err error }{ // Per-node contract — Classifier should NOT match a wrapped - // context.Canceled; the surrounding errs.Classify walk will reach the - // inner node and ask Classifier again there. + // context.Canceled; the surrounding classifier-processor walk will + // reach the inner node and ask Classifier again there. {"wrapped context.Canceled", fmt.Errorf("op: %w", context.Canceled)}, {"deadline exceeded", context.DeadlineExceeded}, {"plain error", errors.New("anything")}, @@ -49,9 +49,11 @@ func TestClassifier_Unknown(t *testing.T) { } } -func TestClassifier_AppliedViaClassify(t *testing.T) { +func TestClassifier_AppliedViaProcessor(t *testing.T) { + p := errs.NewClassifierProcessor(Classifier) + t.Run("bare context.Canceled becomes retryable infra", func(t *testing.T) { - out := errs.Classify(context.Canceled, Classifier) + out := p.Process(context.Canceled) assert.True(t, errs.IsRetryable(out)) }) @@ -59,7 +61,7 @@ func TestClassifier_AppliedViaClassify(t *testing.T) { // The chain walker reaches the inner context.Canceled node and the // classifier matches there. wrapped := fmt.Errorf("process: %w", context.Canceled) - out := errs.Classify(wrapped, Classifier) + out := p.Process(wrapped) assert.True(t, errs.IsRetryable(out)) }) @@ -68,7 +70,7 @@ func TestClassifier_AppliedViaClassify(t *testing.T) { // The pass-1 framework-wrap check short-circuits before Classifier // runs. err := errs.NewUserError(context.Canceled) - out := errs.Classify(err, Classifier) + out := p.Process(err) assert.Same(t, err, out) assert.False(t, errs.IsRetryable(out)) assert.True(t, errs.IsUserError(out)) diff --git a/core/errs/mysql/mysql.go b/core/errs/mysql/mysql.go index 7753366b..e5caf200 100644 --- a/core/errs/mysql/mysql.go +++ b/core/errs/mysql/mysql.go @@ -18,8 +18,8 @@ // // The classifier inspects a single error node at a time, as required by the // errs.Classifier contract. It returns errs.Unknown for nodes it does not -// recognise so the surrounding errs.Classify chain walk can continue to -// deeper nodes. +// recognise so the surrounding classifier-processor chain walk can continue +// to deeper nodes. package mysql import ( @@ -43,19 +43,21 @@ import ( // - net.Error values (including *net.OpError, *net.DNSError) — transient // network failures while talking to the server, retryable. // -// Anything else returns errs.Unknown so the surrounding errs.Classify walker -// can keep looking down the unwrap chain. +// Anything else returns errs.Unknown so the surrounding classifier-processor +// walker can keep looking down the unwrap chain. // // The classifier never returns errs.User. Constraint violations and similar // codes that a caller might want to surface as user errors must be wrapped // explicitly with errs.NewUserError at the controller — only the controller // knows whether a duplicate key, FK violation, etc. reflects bad input from // the user or an internal invariant being broken. The framework-wrap check -// in errs.Classify short-circuits before this classifier runs, so an -// explicit controller wrap always wins. +// in the classifier-processor short-circuits before this classifier runs, so +// an explicit controller wrap always wins. // // The classifier is stateless; this package-level singleton is the canonical -// handle. Pass it into errs.Classify (typically as a vararg to consumer.New). +// handle. Pass it as one of the variadic classifiers to +// errs.NewClassifierProcessor; the resulting processor is what gets handed to +// consumer.New. var Classifier errs.Classifier = classifier{} type classifier struct{} @@ -65,8 +67,9 @@ type classifier struct{} func (classifier) Classify(err error) errs.Verdict { // MySQL server-reported errors. We do the type assertion directly on the // current node (not errors.As) so an outer framework wrap (e.g. an explicit - // NewUserError from the controller) keeps winning — errs.Classify owns the - // chain walk and stops at any *userError / *infraError it sees first. + // NewUserError from the controller) keeps winning — the classifier-processor + // owns the chain walk and stops at any *userError / *infraError it sees + // first. if me, ok := err.(*gomysql.MySQLError); ok { return classifyMySQLNumber(me.Number) } diff --git a/core/errs/mysql/mysql_test.go b/core/errs/mysql/mysql_test.go index 788c5f57..675f93b9 100644 --- a/core/errs/mysql/mysql_test.go +++ b/core/errs/mysql/mysql_test.go @@ -117,19 +117,20 @@ func TestClassifier_NetErrors(t *testing.T) { } func TestClassifier_Unknown(t *testing.T) { - // A plain, unrecognised error must yield Unknown so the package-level - // errs.Classify walker can move on to the next node in the chain rather - // than locking in a verdict. + // A plain, unrecognised error must yield Unknown so the surrounding + // classifier-processor walker can move on to the next node in the chain + // rather than locking in a verdict. assert.Equal(t, errs.Unknown, Classifier.Classify(errors.New("anything"))) assert.Equal(t, errs.Unknown, Classifier.Classify(nil)) } -func TestClassifier_AppliedViaClassify(t *testing.T) { - // End-to-end behavior: the consumer's call site is - // `err = errs.Classify(err, mysqlerrs.Classifier)`, followed by - // errs.IsUserError / IsRetryable / IsDependencyError type checks. These - // tests pin that contract — given an err from a controller, the returned - // chain answers the right question. +func TestClassifier_AppliedViaProcessor(t *testing.T) { + // End-to-end behavior: the consumer's call site runs the configured + // errs.ErrorProcessor (typically errs.NewClassifierProcessor( + // mysqlerrs.Classifier)) and then inspects errs.IsUserError / + // IsRetryable / IsDependencyError. These tests pin that contract — given + // an err from a controller, the returned chain answers the right question. + p := errs.NewClassifierProcessor(Classifier) t.Run("mysql connection error surfaces as retryable infra", func(t *testing.T) { // Simulates the queue or storage layer returning a wrapped net.OpError @@ -137,7 +138,7 @@ func TestClassifier_AppliedViaClassify(t *testing.T) { netErr := &net.OpError{Op: "read", Net: "tcp", Err: errors.New("reset")} wrapped := fmt.Errorf("publish: %w", netErr) - out := errs.Classify(wrapped, Classifier) + out := p.Process(wrapped) assert.True(t, errs.IsRetryable(out)) assert.False(t, errs.IsUserError(out)) }) @@ -146,7 +147,7 @@ func TestClassifier_AppliedViaClassify(t *testing.T) { dl := &gomysql.MySQLError{Number: 1213, Message: "deadlock"} wrapped := fmt.Errorf("update: %w", dl) - out := errs.Classify(wrapped, Classifier) + out := p.Process(wrapped) assert.True(t, errs.IsRetryable(out)) }) @@ -155,8 +156,8 @@ func TestClassifier_AppliedViaClassify(t *testing.T) { wrapped := fmt.Errorf("select: %w", se) // Verdict Infra means "non-retryable infra" — the default for an - // unwrapped chain — so Classify leaves the chain alone. - out := errs.Classify(wrapped, Classifier) + // unwrapped chain — so the processor leaves the chain alone. + out := p.Process(wrapped) assert.False(t, errs.IsRetryable(out)) assert.False(t, errs.IsUserError(out)) assert.Same(t, wrapped, out, "Infra verdict should not add a wrap") @@ -167,7 +168,7 @@ func TestClassifier_AppliedViaClassify(t *testing.T) { // a duplicate-key as a user error must wrap with errs.NewUserError // explicitly (see the controller-override test below). dup := &gomysql.MySQLError{Number: 1062, Message: "duplicate"} - out := errs.Classify(dup, Classifier) + out := p.Process(dup) assert.False(t, errs.IsRetryable(out)) assert.False(t, errs.IsUserError(out)) assert.Same(t, dup, out, "Infra verdict should not add a wrap") @@ -177,7 +178,7 @@ func TestClassifier_AppliedViaClassify(t *testing.T) { dup := &gomysql.MySQLError{Number: 1062, Message: "duplicate"} err := errs.NewUserError(fmt.Errorf("create: %w", dup)) - out := errs.Classify(err, Classifier) + out := p.Process(err) assert.Same(t, err, out) assert.True(t, errs.IsUserError(out)) assert.False(t, errs.IsRetryable(out)) @@ -185,12 +186,12 @@ func TestClassifier_AppliedViaClassify(t *testing.T) { t.Run("controller-level User wrap beats deeper mysql classification", func(t *testing.T) { // Even though the underlying cause is a transient mysql deadlock, the - // outermost frame is an explicit NewUserError. Classify sees a framework - // wrap already in the chain and returns the chain untouched. + // outermost frame is an explicit NewUserError. The processor sees a + // framework wrap already in the chain and returns the chain untouched. deadlock := &gomysql.MySQLError{Number: 1213, Message: "deadlock"} err := errs.NewUserError(fmt.Errorf("conflict: %w", deadlock)) - out := errs.Classify(err, Classifier) + out := p.Process(err) assert.Same(t, err, out) assert.True(t, errs.IsUserError(out)) assert.False(t, errs.IsRetryable(out)) @@ -199,18 +200,18 @@ func TestClassifier_AppliedViaClassify(t *testing.T) { t.Run("controller-level Retryable wrap beats deeper mysql schema verdict", func(t *testing.T) { // Inverse: a schema error would normally be non-retryable infra, but the // controller chose to mark it retryable anyway. The outer framework wrap - // is already in the chain, so Classify is a no-op. + // is already in the chain, so the processor is a no-op. schemaErr := &gomysql.MySQLError{Number: 1054, Message: "unknown column"} err := errs.NewRetryableError(fmt.Errorf("query: %w", schemaErr)) - out := errs.Classify(err, Classifier) + out := p.Process(err) assert.Same(t, err, out) assert.True(t, errs.IsRetryable(out)) }) t.Run("unwrapped non-mysql error stays unclassified", func(t *testing.T) { raw := errors.New("something else") - out := errs.Classify(raw, Classifier) + out := p.Process(raw) assert.Same(t, raw, out) assert.False(t, errs.IsRetryable(out)) assert.False(t, errs.IsUserError(out)) diff --git a/core/errs/processor.go b/core/errs/processor.go new file mode 100644 index 00000000..6fd806f0 --- /dev/null +++ b/core/errs/processor.go @@ -0,0 +1,156 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package errs + +import "errors" + +// ErrorProcessor transforms an error returned by a controller into the error +// the surrounding transport will react to. It runs exactly once per failing +// delivery — typically called by the consumer immediately after a controller +// returns — and the result is what IsRetryable / IsUserError / IsDependencyError +// will subsequently inspect. +// +// Two implementations ship in this package: +// +// - NewClassifierProcessor runs a per-node classifier walk. This preserves +// controller-attached framework wraps (NewUserError, NewDependencyError, +// ...) verbatim and only invokes the supplied classifiers when the chain +// carries no existing framework type. Use it for primary pipeline +// consumers where controller-driven classification is the source of truth. +// +// - AlwaysRetryableProcessor unconditionally wraps every non-nil error with +// NewRetryableError, overriding any inner framework wrap. Use it for +// narrowly-scoped consumers — typically DLQ reconciliation — that must +// redeliver on any failure because there is no further dead-letter +// destination. +// +// Separating "decide how an error is interpreted" from "decide what to do +// with the interpreted error" lets the same consumer implementation host +// transports with very different retry policies without leaking the policy +// into each Controller. +type ErrorProcessor interface { + Process(err error) error +} + +// NewClassifierProcessor returns an ErrorProcessor that runs the supplied +// classifiers over the chain of any non-nil error. +// +// Semantics of Process on the returned processor: +// +// - nil in, nil out. +// - If err's chain already carries a framework classification (*userError +// or *infraError anywhere in the chain), returns err unchanged — the chain +// is already interpretable by IsUserError / IsRetryable / +// IsDependencyError. +// - Otherwise, walks the chain from outermost to innermost, asking each +// classifier per node. The FIRST non-Unknown verdict wins; the outermost +// such node determines the wrap. err is wrapped with the framework +// constructor matching that verdict (User -> NewUserError, InfraRetryable +// -> NewRetryableError, etc.) and the wrapped error is returned. +// - Verdict Infra means "non-retryable infra" — which is already the default +// behavior for an unwrapped chain, so no wrap is added. +// - If no classifier recognises anything, err is returned unchanged. +// +// Implementation: two passes over the chain. Pass 1 is a cheap type check +// looking for an existing framework wrap and short-circuits if one is found — +// no classifier is invoked. Pass 2 runs the configured classifiers per node. +// Walking the chain is cheap relative to a classifier call, so this avoids +// running classifiers whenever the chain is already classified deeper down. +// +// Passing no classifiers is valid — the processor will still honour any +// framework wrap already in the chain and otherwise return err unchanged. +// +// NOTE: this central classifier model cannot disambiguate errors of the same +// underlying type produced by different extensions (e.g. a net.OpError from a +// mysql connection vs the same type from an HTTP caller would both match the +// mysql classifier here). Resolving that requires per-extension provenance +// tagging; intentionally deferred. +func NewClassifierProcessor(classifiers ...Classifier) ErrorProcessor { + return classifierProcessor{classifiers: classifiers} +} + +type classifierProcessor struct { + classifiers []Classifier +} + +func (p classifierProcessor) Process(err error) error { + if err == nil { + return nil + } + + // Pass 1 — cheap framework-wrap check. If any node already carries a + // framework type, the chain is interpretable as-is and classifiers are + // never invoked. + for cur := err; cur != nil; cur = errors.Unwrap(cur) { + switch cur.(type) { + case *userError, *infraError: + return err + } + } + + // Pass 2 — run classifiers per node from outermost to innermost. Stop at + // the first non-Unknown verdict. + var verdict Verdict + for cur := err; cur != nil && verdict == Unknown; cur = errors.Unwrap(cur) { + for _, c := range p.classifiers { + if v := c.Classify(cur); v != Unknown { + verdict = v + break + } + } + } + + switch verdict { + case User: + return NewUserError(err) + case InfraRetryable: + return NewRetryableError(err) + case InfraDependency: + return NewDependencyError(err) + case InfraDependencyRetryable: + return NewRetryableDependencyError(err) + } + // Unknown or Infra — no wrap needed; the existing chain already behaves as + // non-retryable infra at the IsRetryable / IsUserError layer. + return err +} + +// AlwaysRetryableProcessor classifies every non-nil error as InfraRetryable by +// wrapping it with NewRetryableError. The wrap is unconditional: an inner +// *userError or non-retryable *infraError is overridden because errors.As +// (used by IsRetryable) matches the outermost *infraError first, and that +// outer wrap is always retryable=true. +// +// Side-effect: an inner *infraError carrying dependency=true is masked. The +// outer wrap is constructed with dependency=false, so IsDependencyError on +// the result returns false even though the original chain originated in a +// dependency. This is acceptable for the intended DLQ-reconciliation use +// case where only IsRetryable drives transport behavior; if dependency +// provenance ever needs to survive this processor it must be added here +// explicitly. +// +// Pair this only with consumers whose controllers should retry on any returned +// error. On a primary pipeline consumer this would loop forever on genuine +// user errors and prevent them from reaching the DLQ. +var AlwaysRetryableProcessor ErrorProcessor = alwaysRetryableProcessor{} + +type alwaysRetryableProcessor struct{} + +func (alwaysRetryableProcessor) Process(err error) error { + if err == nil { + return nil + } + return NewRetryableError(err) +} diff --git a/core/errs/processor_test.go b/core/errs/processor_test.go new file mode 100644 index 00000000..d81d5ed7 --- /dev/null +++ b/core/errs/processor_test.go @@ -0,0 +1,150 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package errs + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// stubClassifier returns a fixed verdict regardless of the error inspected. +// Used to verify that NewClassifierProcessor wires Classify correctly. +type stubClassifier struct{ verdict Verdict } + +func (s stubClassifier) Classify(error) Verdict { return s.verdict } + +func TestNewClassifierProcessor_NilIn(t *testing.T) { + p := NewClassifierProcessor() + assert.NoError(t, p.Process(nil)) +} + +func TestNewClassifierProcessor_PreservesFrameworkWrap(t *testing.T) { + // An error already carrying a framework wrap must pass through unchanged, + // even if a classifier would otherwise contradict it. This mirrors the + // pass-1 short-circuit in Classify. + p := NewClassifierProcessor(stubClassifier{verdict: InfraRetryable}) + + userErr := NewUserError(errors.New("bad input")) + out := p.Process(userErr) + assert.Same(t, userErr, out) + assert.True(t, IsUserError(out)) + assert.False(t, IsRetryable(out)) +} + +func TestNewClassifierProcessor_AppliesClassifierWhenNoWrap(t *testing.T) { + p := NewClassifierProcessor(stubClassifier{verdict: InfraRetryable}) + + raw := errors.New("transient") + out := p.Process(raw) + require.Error(t, out) + assert.True(t, IsRetryable(out)) +} + +func TestNewClassifierProcessor_NoClassifiersReturnsUnchanged(t *testing.T) { + // Empty classifier list still walks pass 1 (framework wraps preserved) but + // produces no wrap of its own — the chain stays as the caller passed it. + p := NewClassifierProcessor() + + raw := errors.New("transient") + out := p.Process(raw) + assert.Same(t, raw, out) + assert.False(t, IsRetryable(out)) + assert.False(t, IsUserError(out)) +} + +func TestAlwaysRetryableProcessor_NilIn(t *testing.T) { + assert.NoError(t, AlwaysRetryableProcessor.Process(nil)) +} + +func TestAlwaysRetryableProcessor_WrapsPlainError(t *testing.T) { + raw := errors.New("anything") + out := AlwaysRetryableProcessor.Process(raw) + require.Error(t, out) + assert.True(t, IsRetryable(out)) + // The wrap preserves the original cause for diagnostics. + assert.True(t, errors.Is(out, raw)) +} + +// TestAlwaysRetryableProcessor_OverridesUserError pins the headline behavior: +// even an explicit NewUserError from a controller must come out retryable so +// the surrounding consumer redelivers it. This is the whole reason this +// processor exists — Classify would short-circuit on the inner *userError and +// IsRetryable would return false. +func TestAlwaysRetryableProcessor_OverridesUserError(t *testing.T) { + inner := errors.New("bad input") + userErr := NewUserError(inner) + + out := AlwaysRetryableProcessor.Process(userErr) + require.Error(t, out) + assert.True(t, IsRetryable(out), "outer infraError(retryable=true) must win IsRetryable") + // The inner *userError is preserved in the chain for observability — a + // caller that explicitly classified its failure as user-caused did so for + // a reason, even if the transport overrides the retry decision. + assert.True(t, IsUserError(out)) + assert.True(t, errors.Is(out, inner)) +} + +func TestAlwaysRetryableProcessor_OverridesNonRetryableDependencyError(t *testing.T) { + depErr := NewDependencyError(errors.New("upstream 503")) + + out := AlwaysRetryableProcessor.Process(depErr) + require.Error(t, out) + assert.True(t, IsRetryable(out), "retryable=true must take precedence over inner non-retryable") + // The dependency bit is intentionally masked by the outer wrap — see the + // AlwaysRetryableProcessor doc comment. IsRetryable is the only contract + // this processor promises to satisfy. + assert.False(t, IsDependencyError(out)) +} + +func TestAlwaysRetryableProcessor_PreservesContextCancellation(t *testing.T) { + // context.Canceled is a special case for the consumer loop (treated as + // shutdown, not a controller failure) — but classification-wise it should + // still come back retryable so a non-shutdown caller redelivers. + out := AlwaysRetryableProcessor.Process(context.Canceled) + require.Error(t, out) + assert.True(t, IsRetryable(out)) + assert.True(t, errors.Is(out, context.Canceled)) +} + +func TestAlwaysRetryableProcessor_DoubleWrapIsBenign(t *testing.T) { + // Wrapping an already-retryable error is a no-op from the IsRetryable + // perspective. We do not collapse the wrap; the second layer is cheap. + already := NewRetryableError(errors.New("already retryable")) + out := AlwaysRetryableProcessor.Process(already) + require.Error(t, out) + assert.True(t, IsRetryable(out)) +} + +// TestErrorProcessor_InterfaceConformance is a compile-time assertion that +// both shipped implementations satisfy the ErrorProcessor interface. +func TestErrorProcessor_InterfaceConformance(t *testing.T) { + var _ ErrorProcessor = NewClassifierProcessor() + var _ ErrorProcessor = AlwaysRetryableProcessor + var _ ErrorProcessor = classifierProcessor{} + var _ ErrorProcessor = alwaysRetryableProcessor{} +} + +// Smoke-test that the processor result is interpretable by fmt-wrap callers +// that may further annotate the error before it reaches IsRetryable. +func TestAlwaysRetryableProcessor_SurvivesFmtWrap(t *testing.T) { + out := AlwaysRetryableProcessor.Process(errors.New("boom")) + wrapped := fmt.Errorf("downstream: %w", out) + assert.True(t, IsRetryable(wrapped)) +} diff --git a/doc/rfc/submitqueue/workflow.md b/doc/rfc/submitqueue/workflow.md index a3e10df0..2b7b33ee 100644 --- a/doc/rfc/submitqueue/workflow.md +++ b/doc/rfc/submitqueue/workflow.md @@ -80,3 +80,13 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate` | **merge** | BatchID | conclude, speculate | Merge the batch and advance the queue | | **conclude** | BatchID | — | Map terminal batch state to request state | | **log** | RequestLog | — | Append-only sink for request log events | + +## DLQ reconciliation + +Every primary pipeline topic above is paired with a `{topic}_dlq` subscription consumed by a dedicated DLQ controller. The consumer framework moves a message to its DLQ once the primary controller returns a non-retryable error or exhausts retries on a retryable one; without the DLQ side the affected request would stay in a non-terminal state forever and the gateway would still report it as "in progress". + +The DLQ controllers do not re-attempt the failed work. They decode the payload to recover the affected `RequestID` (start, validate, batch, cancel, log) or `BatchID` (score, speculate, build, buildsignal, merge, conclude) and drive the entity to a terminal failed state — `RequestStateError` for requests, `BatchStateFailed` for batches with fan-out to the member requests. State writes use the same optimistic-locking CAS as the primary pipeline, so a late primary-pipeline update wins cleanly and a version mismatch is asked back for redelivery. + +DLQ consumers are wired with `errs.AlwaysRetryableProcessor` and a very high `Retry.MaxAttempts`, with their own DLQ disabled. That combination makes reconciliation effectively non-droppable: any failure is forced retryable rather than escalating to a second-level dead-letter that nobody consumes. The trade-off is that a genuinely unprocessable DLQ message — typically a malformed payload — must be removed by an operator. + +See `submitqueue/orchestrator/controller/dlq/README.md` for the design constraints (simplest possible implementation, reconcile-only, no recovery) and the per-topic controller mapping. diff --git a/example/submitqueue/orchestrator/server/BUILD.bazel b/example/submitqueue/orchestrator/server/BUILD.bazel index 5190e02e..e7694905 100644 --- a/example/submitqueue/orchestrator/server/BUILD.bazel +++ b/example/submitqueue/orchestrator/server/BUILD.bazel @@ -11,6 +11,7 @@ go_library( importpath = "github.com/uber/submitqueue/example/submitqueue/orchestrator/server", visibility = ["//visibility:private"], deps = [ + "//core/errs", "//core/errs/generic", "//core/errs/mysql", "//core/httpclient", @@ -40,6 +41,7 @@ go_library( "//submitqueue/orchestrator/controller/buildsignal", "//submitqueue/orchestrator/controller/cancel", "//submitqueue/orchestrator/controller/conclude", + "//submitqueue/orchestrator/controller/dlq", "//submitqueue/orchestrator/controller/log", "//submitqueue/orchestrator/controller/merge", "//submitqueue/orchestrator/controller/score", diff --git a/example/submitqueue/orchestrator/server/main.go b/example/submitqueue/orchestrator/server/main.go index 5595cc89..8a6719ba 100644 --- a/example/submitqueue/orchestrator/server/main.go +++ b/example/submitqueue/orchestrator/server/main.go @@ -30,6 +30,7 @@ import ( "golang.org/x/oauth2" "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/errs" genericerrs "github.com/uber/submitqueue/core/errs/generic" mysqlerrs "github.com/uber/submitqueue/core/errs/mysql" "github.com/uber/submitqueue/core/httpclient" @@ -59,6 +60,7 @@ import ( "github.com/uber/submitqueue/submitqueue/orchestrator/controller/buildsignal" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/cancel" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/conclude" + "github.com/uber/submitqueue/submitqueue/orchestrator/controller/dlq" logctrl "github.com/uber/submitqueue/submitqueue/orchestrator/controller/log" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/merge" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/score" @@ -198,13 +200,23 @@ func run() error { return fmt.Errorf("failed to create topic registry: %w", err) } - // Create consumer. - c := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry, - genericerrs.Classifier, - // Storage (extension/storage/mysql) and queue (extension/messagequeue/mysql) - // both run on the same MySQL driver, so a single classifier covers - // errors surfaced from either backend. - mysqlerrs.Classifier, + // Two consumers share the topic registry but apply different error + // classification policies. The primary consumer runs the standard + // per-node classifier walk. The DLQ consumer uses the AlwaysRetryableProcessor + // so every non-nil error from a DLQ controller is forced retryable — + // reconciliation must redeliver on any failure because the DLQ + // subscriptions are final destinations (there is no further DLQ). + primaryConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry, + errs.NewClassifierProcessor( + genericerrs.Classifier, + // Storage (extension/storage/mysql) and queue (extension/messagequeue/mysql) + // both run on the same MySQL driver, so a single classifier covers + // errors surfaced from either backend. + mysqlerrs.Classifier, + ), + ) + dlqConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer-dlq"), registry, + errs.AlwaysRetryableProcessor, ) // Create merge checker @@ -230,19 +242,38 @@ func run() error { br := buildnoop.New() // Register controllers - if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, br, cnt, store); err != nil { + primaryCount, err := registerPrimaryControllers(primaryConsumer, logger.Sugar(), scope, registry, mc, cp, psh, br, cnt, store) + if err != nil { + return err + } + dlqCount, err := registerDLQControllers(dlqConsumer, logger.Sugar(), scope, store) + if err != nil { return err } - logger.Info("controllers registered", zap.Int("count", 11)) + logger.Info("controllers registered", zap.Int("primary", primaryCount), zap.Int("dlq", dlqCount)) - // Start consumers - if err := c.Start(ctx); err != nil { + // Start consumers. DLQ first because Start begins processing + // messages immediately; if the second (primary) consumer fails to + // start, the half we already started is the DLQ side, whose work + // is idempotent reconciliation and is safe to interrupt mid-flight + // for rollback. + if err := dlqConsumer.Start(ctx); err != nil { // The error can also be a result of a context cancellation due to SIGINT or SIGTERM. // This is expected, just propagate it. - return fmt.Errorf("failed to start consumers: %w", err) + return fmt.Errorf("failed to start dlq consumer: %w", err) + } + if err := primaryConsumer.Start(ctx); err != nil { + // Best-effort: stop the dlq consumer we just started so the + // caller does not need to know which half failed. Aggregate both + // errors with errors.Join so the operator sees the original cause. + stopErr := dlqConsumer.Stop(30000) + return errors.Join( + fmt.Errorf("failed to start primary consumer: %w", err), + stopErr, + ) } - logger.Info("consumer started") + logger.Info("consumers started") // Create gRPC server grpcServer := grpc.NewServer() @@ -305,8 +336,14 @@ func run() error { serverErr = fmt.Errorf("GRPC server exited with error: %w", serverErr) } - // Stop consumers with 30s timeout, by this time the context should be cancelled and the processing threads may already be exiting; recollect them - errStop := c.Stop(30000) + // Stop consumers with 30s timeout in reverse start order: primary + // first, then DLQ. The primary pipeline writes the state that DLQ + // reconciliation reads, so draining primary first means in-flight + // DLQ reconciliation finishes against a settled primary rather than + // racing its shutdown. + primaryStopErr := primaryConsumer.Stop(30000) + dlqStopErr := dlqConsumer.Stop(30000) + errStop := errors.Join(primaryStopErr, dlqStopErr) if errStop != nil { errStop = fmt.Errorf("failed to stop consumers: %w", errStop) } @@ -322,106 +359,74 @@ func run() error { // newTopicRegistry builds the TopicRegistry with all topic and subscription configs. func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRegistry, error) { - return consumer.NewTopicRegistry([]consumer.TopicConfig{ - { - Key: consumer.TopicKeyStart, - Name: "start", - Queue: q, - Subscription: extqueue.DefaultSubscriptionConfig( - subscriberName, "orchestrator-start", - ), - }, - { - Key: consumer.TopicKeyCancel, - Name: "cancel", - Queue: q, - Subscription: extqueue.DefaultSubscriptionConfig( - subscriberName, "orchestrator-cancel", - ), - }, - { - Key: consumer.TopicKeyValidate, - Name: "validate", - Queue: q, - Subscription: extqueue.DefaultSubscriptionConfig( - subscriberName, "orchestrator-validate", - ), - }, - { - Key: consumer.TopicKeyBatch, - Name: "batch", - Queue: q, - Subscription: extqueue.DefaultSubscriptionConfig( - subscriberName, "orchestrator-batch", - ), - }, - { - Key: consumer.TopicKeyScore, - Name: "score", - Queue: q, - Subscription: extqueue.DefaultSubscriptionConfig( - subscriberName, "orchestrator-score", - ), - }, - { - Key: consumer.TopicKeySpeculate, - Name: "speculate", + // primaryTopics enumerates the {key, name, group-suffix} for every primary + // pipeline topic. The DLQ topic for each is derived by appending "_dlq" to + // both the topic name and the consumer group; the topic-key suffix is + // owned by the dlq package (dlq.TopicKey). + type topicSpec struct { + key consumer.TopicKey + name string + groupSuffix string + } + primaryTopics := []topicSpec{ + {consumer.TopicKeyStart, "start", "orchestrator-start"}, + {consumer.TopicKeyCancel, "cancel", "orchestrator-cancel"}, + {consumer.TopicKeyValidate, "validate", "orchestrator-validate"}, + {consumer.TopicKeyBatch, "batch", "orchestrator-batch"}, + {consumer.TopicKeyScore, "score", "orchestrator-score"}, + {consumer.TopicKeySpeculate, "speculate", "orchestrator-speculate"}, + {consumer.TopicKeyBuild, "build", "orchestrator-build"}, + {consumer.TopicKeyBuildSignal, "buildsignal", "orchestrator-buildsignal"}, + {consumer.TopicKeyMerge, "merge", "orchestrator-merge"}, + {consumer.TopicKeyConclude, "conclude", "orchestrator-conclude"}, + {consumer.TopicKeyLog, "log", "orchestrator-log"}, + } + + configs := make([]consumer.TopicConfig, 0, 2*len(primaryTopics)) + for _, t := range primaryTopics { + configs = append(configs, consumer.TopicConfig{ + Key: t.key, + Name: t.name, Queue: q, Subscription: extqueue.DefaultSubscriptionConfig( - subscriberName, "orchestrator-speculate", + subscriberName, t.groupSuffix, ), - }, - { - Key: consumer.TopicKeyBuild, - Name: "build", - Queue: q, - Subscription: extqueue.DefaultSubscriptionConfig( - subscriberName, "orchestrator-build", - ), - }, - { - Key: consumer.TopicKeyBuildSignal, - Name: "buildsignal", - Queue: q, - Subscription: extqueue.DefaultSubscriptionConfig( - subscriberName, "orchestrator-buildsignal", - ), - }, - { - Key: consumer.TopicKeyMerge, - Name: "merge", - Queue: q, - Subscription: extqueue.DefaultSubscriptionConfig( - subscriberName, "orchestrator-merge", - ), - }, - { - Key: consumer.TopicKeyConclude, - Name: "conclude", - Queue: q, - Subscription: extqueue.DefaultSubscriptionConfig( - subscriberName, "orchestrator-conclude", - ), - }, - { - Key: consumer.TopicKeyLog, - Name: "log", - Queue: q, - Subscription: extqueue.DefaultSubscriptionConfig( - subscriberName, "orchestrator-log", - ), - }, - }) + }) + // DLQ subscription for the same primary stage. DLQ is disabled here + // to avoid a "_dlq_dlq" cascade: if DLQ reconciliation itself fails, + // the consumer retries forever and the failure is surfaced via logs + // and metrics rather than being moved to a second-level dead-letter + // topic that nobody consumes. + // + // MaxAttempts is bumped to a very high value so the per-message + // retry budget effectively never runs out — this pairs with the + // AlwaysRetryableProcessor wired into the DLQ consumer to guarantee + // reconciliation eventually converges instead of being silently + // dropped after the default retry count. + dlqSub := extqueue.DefaultSubscriptionConfig( + subscriberName, t.groupSuffix+"-dlq", + ) + dlqSub.DLQ.Enabled = false + dlqSub.Retry.MaxAttempts = 1000 + configs = append(configs, consumer.TopicConfig{ + Key: dlq.TopicKey(t.key), + Name: t.name + "_dlq", + Queue: q, + Subscription: dlqSub, + }) + } + + return consumer.NewTopicRegistry(configs) } -// registerControllers creates all pipeline controllers and registers them with the consumer. -// Pipeline: +// registerPrimaryControllers creates all pipeline controllers and registers +// them with the primary consumer. Pipeline: // -// request → validate → batch → score → speculate → build → buildsignal ─┐ -// ↑ ↘ ↻ poll │ -// │ merge → conclude │ -// │ │ │ -// └────────┴───────────────────────┘ +// request → validate → batch → score → speculate → build → buildsignal ─┐ +// ↑ ↘ ↻ poll │ +// │ merge → conclude │ +// │ │ │ +// └────────┴───────────────────────┘ // Static per-extension factories for the example server: every queue resolves // to the single configured implementation. A real deployment would vary the @@ -456,7 +461,8 @@ type conflictFactory struct{ impl conflict.Analyzer } func (f conflictFactory) For(conflict.Config) (conflict.Analyzer, error) { return f.impl, nil } -func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, br buildrunner.BuildRunner, cnt counter.Counter, store storage.Storage) error { +func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, br buildrunner.BuildRunner, cnt counter.Counter, store storage.Storage) (int, error) { + var count int requestController := start.NewController( logger, scope, @@ -466,8 +472,9 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t "orchestrator-start", ) if err := c.Register(requestController); err != nil { - return fmt.Errorf("failed to register request controller: %w", err) + return count, fmt.Errorf("failed to register request controller: %w", err) } + count++ cancelController := cancel.NewController( logger, @@ -478,8 +485,9 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t "orchestrator-cancel", ) if err := c.Register(cancelController); err != nil { - return fmt.Errorf("failed to register cancel controller: %w", err) + return count, fmt.Errorf("failed to register cancel controller: %w", err) } + count++ validateController := validate.NewController( logger, @@ -492,8 +500,9 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t "orchestrator-validate", ) if err := c.Register(validateController); err != nil { - return fmt.Errorf("failed to register validate controller: %w", err) + return count, fmt.Errorf("failed to register validate controller: %w", err) } + count++ batchController := batch.NewController( logger, @@ -508,8 +517,9 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t "orchestrator-batch", ) if err := c.Register(batchController); err != nil { - return fmt.Errorf("failed to register batch controller: %w", err) + return count, fmt.Errorf("failed to register batch controller: %w", err) } + count++ scoreController := score.NewController( logger, @@ -534,8 +544,9 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t "orchestrator-score", ) if err := c.Register(scoreController); err != nil { - return fmt.Errorf("failed to register score controller: %w", err) + return count, fmt.Errorf("failed to register score controller: %w", err) } + count++ speculateController := speculate.NewController( logger, @@ -546,8 +557,9 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t "orchestrator-speculate", ) if err := c.Register(speculateController); err != nil { - return fmt.Errorf("failed to register speculate controller: %w", err) + return count, fmt.Errorf("failed to register speculate controller: %w", err) } + count++ buildController := build.NewController( logger, @@ -559,8 +571,9 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t "orchestrator-build", ) if err := c.Register(buildController); err != nil { - return fmt.Errorf("failed to register build controller: %w", err) + return count, fmt.Errorf("failed to register build controller: %w", err) } + count++ buildsignalController := buildsignal.NewController( logger, @@ -572,8 +585,9 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t "orchestrator-buildsignal", ) if err := c.Register(buildsignalController); err != nil { - return fmt.Errorf("failed to register buildsignal controller: %w", err) + return count, fmt.Errorf("failed to register buildsignal controller: %w", err) } + count++ mergeController := merge.NewController( logger, @@ -585,8 +599,9 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t "orchestrator-merge", ) if err := c.Register(mergeController); err != nil { - return fmt.Errorf("failed to register merge controller: %w", err) + return count, fmt.Errorf("failed to register merge controller: %w", err) } + count++ concludeController := conclude.NewController( logger, @@ -597,8 +612,9 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t "orchestrator-conclude", ) if err := c.Register(concludeController); err != nil { - return fmt.Errorf("failed to register conclude controller: %w", err) + return count, fmt.Errorf("failed to register conclude controller: %w", err) } + count++ logController := logctrl.NewController( logger, @@ -608,10 +624,45 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t "orchestrator-log", ) if err := c.Register(logController); err != nil { - return fmt.Errorf("failed to register log controller: %w", err) + return count, fmt.Errorf("failed to register log controller: %w", err) + } + count++ + + return count, nil +} + +// registerDLQControllers creates one DLQ reconciler per primary stage and +// registers them with the DLQ consumer. Each reconciler drives the affected +// request or batch into a terminal Error/Failed state so the gateway stops +// reporting it as stuck-in-progress. The log DLQ is a metric-only no-op (log +// entries are observability, not pipeline state). +func registerDLQControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, store storage.Storage) (int, error) { + dlqScope := scope.SubScope("dlq") + dlqRegs := []struct { + name string + ctl consumer.Controller + }{ + {"start_dlq", dlq.NewDLQRequestController(logger, dlqScope, store, dlq.DecodeLandRequestID, dlq.TopicKey(consumer.TopicKeyStart), "orchestrator-start-dlq")}, + {"cancel_dlq", dlq.NewDLQRequestController(logger, dlqScope, store, dlq.DecodeCancelRequestID, dlq.TopicKey(consumer.TopicKeyCancel), "orchestrator-cancel-dlq")}, + {"validate_dlq", dlq.NewDLQRequestController(logger, dlqScope, store, dlq.DecodeRequestID, dlq.TopicKey(consumer.TopicKeyValidate), "orchestrator-validate-dlq")}, + {"batch_dlq", dlq.NewDLQRequestController(logger, dlqScope, store, dlq.DecodeRequestID, dlq.TopicKey(consumer.TopicKeyBatch), "orchestrator-batch-dlq")}, + {"score_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(consumer.TopicKeyScore), "orchestrator-score-dlq")}, + {"speculate_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(consumer.TopicKeySpeculate), "orchestrator-speculate-dlq")}, + {"build_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(consumer.TopicKeyBuild), "orchestrator-build-dlq")}, + {"buildsignal_dlq", dlq.NewDLQBuildSignalController(logger, dlqScope, store, dlq.TopicKey(consumer.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq")}, + {"merge_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(consumer.TopicKeyMerge), "orchestrator-merge-dlq")}, + {"conclude_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(consumer.TopicKeyConclude), "orchestrator-conclude-dlq")}, + {"log_dlq", dlq.NewDLQLogController(logger, dlqScope, dlq.TopicKey(consumer.TopicKeyLog), "orchestrator-log-dlq")}, + } + var count int + for _, reg := range dlqRegs { + if err := c.Register(reg.ctl); err != nil { + return count, fmt.Errorf("failed to register %s controller: %w", reg.name, err) + } + count++ } - return nil + return count, nil } // getEnv returns environment variable value or default if not set. diff --git a/submitqueue/core/consumer/README.md b/submitqueue/core/consumer/README.md index ff0ebac2..a2e911f9 100644 --- a/submitqueue/core/consumer/README.md +++ b/submitqueue/core/consumer/README.md @@ -29,7 +29,12 @@ registry, _ := consumer.NewTopicRegistry([]consumer.TopicConfig{ {Key: consumer.TopicKeyStart, Name: "request", Queue: q, Subscription: subConfig}, }) -c := consumer.New(logger, scope, registry) +c := consumer.New(logger, scope, registry, + errs.NewClassifierProcessor( + genericerrs.Classifier, + mysqlerrs.Classifier, + ), +) c.Register(myController) c.Start(ctx) @@ -40,6 +45,8 @@ if err := c.Stop(30000); err != nil { } ``` +The fourth argument is the `errs.ErrorProcessor` the consumer runs over every non-nil controller error before deciding ack/nack/reject. See `core/errs/README.md` for the contract; in short, a primary pipeline consumer takes `errs.NewClassifierProcessor(...)` with the project's standard classifiers, and a DLQ-reconciliation consumer takes `errs.AlwaysRetryableProcessor`. + ### Controller Business logic for processing queue messages. Implement this interface to handle deliveries for a specific topic. @@ -82,11 +89,16 @@ registry, _ := consumer.NewTopicRegistry([]consumer.TopicConfig{ ## Error Handling -Controllers signal processing outcome via the return value of `Process()`: +The consumer passes every non-nil controller error through the configured `errs.ErrorProcessor` once and then uses `errs.IsRetryable` to decide the transport action: - **`return nil`** — success, message is acked. -- **`return errs.NewRetryableError(err)`** — retryable failure, message is nacked for retry. -- **`return err`** — non-retryable error (e.g. poison pill), message is rejected and removed from the queue to prevent infinite retry loops. +- **non-nil, retryable after processing** — message is nacked for redelivery (visibility timeout drives the retry delay). +- **non-nil, non-retryable after processing** — message is rejected, which moves it to the DLQ if one is configured for the subscription, or simply acks-and-drops if not. + +Controllers therefore have two equally valid ways to surface a transient failure: + +1. Return an unclassified error and let a classifier wired into the processor recognise it (e.g. a `*gomysql.MySQLError` with a deadlock code → `mysqlerrs.Classifier` → retryable). +2. Return `errs.NewRetryableError(...)` (or `NewUserError`, `NewDependencyError`, ...) explicitly when the controller already knows the right verdict — these framework wraps short-circuit any classifier walk. ```go func (c *MyController) Process(ctx context.Context, delivery consumer.Delivery) error { @@ -94,16 +106,18 @@ func (c *MyController) Process(ctx context.Context, delivery consumer.Delivery) result, err := c.service.Process(ctx, msg.Payload) if err != nil { - if isTransient(err) { - return errs.NewRetryableError(err) // nack → retry + if isUserCaused(err) { + return errs.NewUserError(err) // reject → DLQ, never retried } - return err // reject → DLQ + return err // let the processor classify; nack if retryable } return nil // ack → done } ``` +When the consumer is wired with `errs.AlwaysRetryableProcessor` (DLQ reconciliation), the framework overrides this: every non-nil error is forced retryable so the DLQ message comes back for another attempt. See `submitqueue/orchestrator/controller/dlq/README.md`. + ## Lifecycle 1. **Register** controllers before starting. diff --git a/submitqueue/core/consumer/consumer.go b/submitqueue/core/consumer/consumer.go index 4740b214..b4b62ecb 100644 --- a/submitqueue/core/consumer/consumer.go +++ b/submitqueue/core/consumer/consumer.go @@ -61,7 +61,7 @@ type consumer struct { logger *zap.SugaredLogger metricsScope tally.Scope registry TopicRegistry - classifiers []errs.Classifier + processor errs.ErrorProcessor mu sync.Mutex stopped bool @@ -78,21 +78,20 @@ type activeSubscription struct { // New creates a new consumer. // -// registry provides queue and subscription config for topics. classifiers are -// the per-backend error classifiers used to decide whether an error returned -// by a controller is retryable (nack for redelivery) or non-retryable (reject -// to DLQ). The consumer runs errs.Classify(err, classifiers...) exactly once -// per failing delivery and then drives ack/nack/reject from the resulting -// chain via plain errs.IsRetryable type checks. Pass any backend-specific -// classifiers the controllers rely on (e.g. core/errs/mysql.Classifier); -// passing none is fine for tests where controllers always return explicit -// framework-wrapped errors. -func New(logger *zap.SugaredLogger, scope tally.Scope, registry TopicRegistry, classifiers ...errs.Classifier) Consumer { +// registry provides queue and subscription config for topics. processor is the +// error-classification policy applied exactly once per failing controller +// return — typically errs.NewClassifierProcessor(...) for primary pipeline +// consumers (per-node classifier walk that preserves controller-attached +// framework wraps), or errs.AlwaysRetryableProcessor for narrowly-scoped +// consumers such as DLQ reconciliation that must redeliver on any failure. +// processor must not be nil; callers that genuinely want no transformation +// can pass errs.NewClassifierProcessor() with no classifiers. +func New(logger *zap.SugaredLogger, scope tally.Scope, registry TopicRegistry, processor errs.ErrorProcessor) Consumer { return &consumer{ logger: logger, metricsScope: scope.SubScope("consumer"), registry: registry, - classifiers: classifiers, + processor: processor, subscriptions: make(map[TopicKey]*activeSubscription), } } @@ -381,11 +380,11 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d metrics.NamedTimer(controllerScope, opName, "controller_latency", elapsed, metrics.NewTag("success", successTag)) if err != nil { - // Single explicit classification pass: if err's chain does not already - // carry a framework wrap, ask the configured classifiers and prepend - // the matching framework type. Downstream errs.IsRetryable / IsUserError - // calls then only do a plain type check on the result. - err = errs.Classify(err, m.classifiers...) + // Single explicit classification pass through the configured + // ErrorProcessor. Primary consumers use a classifier-based processor + // (preserves controller framework wraps); DLQ consumers use the + // always-retryable processor (forces redelivery on any error). + err = m.processor.Process(err) // Check if the error is non-retryable (poison pill message) if !errs.IsRetryable(err) { diff --git a/submitqueue/core/consumer/consumer_test.go b/submitqueue/core/consumer/consumer_test.go index 5d8111ed..183cd4ed 100644 --- a/submitqueue/core/consumer/consumer_test.go +++ b/submitqueue/core/consumer/consumer_test.go @@ -91,7 +91,7 @@ func TestNew(t *testing.T) { reg, err := consumer.NewTopicRegistry(nil) require.NoError(t, err) - c := consumer.New(logger, tally.NoopScope, reg) + c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) require.NotNil(t, c) } @@ -100,7 +100,7 @@ func TestConsumer_Register(t *testing.T) { logger := zaptest.NewLogger(t).Sugar() reg, _ := consumer.NewTopicRegistry(nil) - c := consumer.New(logger, tally.NoopScope, reg) + c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) handler1 := consumermock.NewMockController(ctrl) setupController(handler1, "handler1", consumer.TopicKeyStart, "group1", nil) @@ -120,7 +120,7 @@ func TestConsumer_Register_DuplicateTopic(t *testing.T) { logger := zaptest.NewLogger(t).Sugar() reg, _ := consumer.NewTopicRegistry(nil) - c := consumer.New(logger, tally.NoopScope, reg) + c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) handler1 := consumermock.NewMockController(ctrl) setupController(handler1, "handler1", consumer.TopicKeyStart, "group1", nil) @@ -140,7 +140,7 @@ func TestConsumer_Register_AfterStop(t *testing.T) { logger := zaptest.NewLogger(t).Sugar() reg, _ := consumer.NewTopicRegistry(nil) - c := consumer.New(logger, tally.NoopScope, reg) + c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) err := c.Stop(1000) require.NoError(t, err) @@ -156,7 +156,7 @@ func TestConsumer_Start_NoHandlers(t *testing.T) { logger := zaptest.NewLogger(t).Sugar() reg, _ := consumer.NewTopicRegistry(nil) - c := consumer.New(logger, tally.NoopScope, reg) + c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) err := c.Start(context.Background()) assert.Error(t, err) @@ -167,7 +167,7 @@ func TestConsumer_Start_AfterStop(t *testing.T) { logger := zaptest.NewLogger(t).Sugar() reg, _ := consumer.NewTopicRegistry(nil) - c := consumer.New(logger, tally.NoopScope, reg) + c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) handler := consumermock.NewMockController(ctrl) setupController(handler, "handler1", consumer.TopicKeyStart, "group1", nil) @@ -193,7 +193,7 @@ func TestConsumer_Start_MissingSubscriptionConfig(t *testing.T) { ) require.NoError(t, err) - c := consumer.New(logger, tally.NoopScope, reg) + c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) handler := consumermock.NewMockController(ctrl) setupController(handler, "handler", consumer.TopicKeyStart, "group", nil) @@ -219,7 +219,7 @@ func TestConsumer_Start_SubscribeFailure(t *testing.T) { reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "group") - c := consumer.New(logger, tally.NoopScope, reg) + c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) handler := consumermock.NewMockController(ctrl) setupController(handler, "handler", consumer.TopicKeyStart, "group", nil) @@ -245,7 +245,7 @@ func TestConsumer_ProcessDelivery_Success(t *testing.T) { reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") - c := consumer.New(logger, tally.NoopScope, reg) + c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) handledMsg := "" handler := consumermock.NewMockController(ctrl) @@ -291,7 +291,7 @@ func TestConsumer_ProcessDelivery_Error(t *testing.T) { reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") - c := consumer.New(logger, tally.NoopScope, reg) + c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) handler := consumermock.NewMockController(ctrl) setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", @@ -333,7 +333,7 @@ func TestConsumer_ProcessDelivery_NonRetryableError(t *testing.T) { reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") - c := consumer.New(logger, tally.NoopScope, reg) + c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) handler := consumermock.NewMockController(ctrl) setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", @@ -384,7 +384,7 @@ func TestConsumer_Stop(t *testing.T) { reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") - c := consumer.New(logger, tally.NoopScope, reg) + c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) handler := consumermock.NewMockController(ctrl) setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", nil) @@ -442,7 +442,7 @@ func TestConsumer_ObservabilityTags(t *testing.T) { reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") - testC := consumer.New(logger, testScope, reg) + testC := consumer.New(logger, testScope, reg, errs.NewClassifierProcessor()) handler := consumermock.NewMockController(ctrl) setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", @@ -517,7 +517,7 @@ func TestConsumer_AckNackLatencyTracking(t *testing.T) { reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") - c := consumer.New(logger, scope, reg) + c := consumer.New(logger, scope, reg, errs.NewClassifierProcessor()) handler := consumermock.NewMockController(ctrl) setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", @@ -562,7 +562,7 @@ func TestConsumer_ErrorMetrics(t *testing.T) { reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") - c := consumer.New(logger, scope, reg) + c := consumer.New(logger, scope, reg, errs.NewClassifierProcessor()) handler := consumermock.NewMockController(ctrl) setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", @@ -618,7 +618,7 @@ func TestConsumer_PerPartitionProcessing(t *testing.T) { reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") - c := consumer.New(logger, tally.NoopScope, reg) + c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) // Track processing by partition partBDone := make(chan struct{}) @@ -703,7 +703,7 @@ func TestConsumer_PartitionOrdering(t *testing.T) { reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") - c := consumer.New(logger, tally.NoopScope, reg) + c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) // Mutex + shared slice captures processing order for assertion; // a channel would only signal completion, not record the sequence. @@ -772,7 +772,7 @@ func TestConsumer_PartitionWorkerCleanup(t *testing.T) { reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") - c := consumer.New(logger, tally.NoopScope, reg) + c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor()) processedCount := int64(0) diff --git a/submitqueue/orchestrator/controller/dlq/BUILD.bazel b/submitqueue/orchestrator/controller/dlq/BUILD.bazel new file mode 100644 index 00000000..094d56a5 --- /dev/null +++ b/submitqueue/orchestrator/controller/dlq/BUILD.bazel @@ -0,0 +1,48 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "dlq", + srcs = [ + "batch.go", + "buildsignal.go", + "dlq.go", + "log.go", + "request.go", + ], + importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/dlq", + visibility = ["//visibility:public"], + deps = [ + "//core/metrics", + "//submitqueue/core/consumer", + "//submitqueue/entity", + "//submitqueue/extension/storage", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "dlq_test", + srcs = [ + "batch_test.go", + "buildsignal_test.go", + "dlq_test.go", + "log_test.go", + "request_test.go", + ], + embed = [":dlq"], + deps = [ + "//core/errs", + "//entity/messagequeue", + "//extension/messagequeue/mock", + "//submitqueue/core/consumer", + "//submitqueue/entity", + "//submitqueue/extension/storage", + "//submitqueue/extension/storage/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/submitqueue/orchestrator/controller/dlq/README.md b/submitqueue/orchestrator/controller/dlq/README.md new file mode 100644 index 00000000..83512134 --- /dev/null +++ b/submitqueue/orchestrator/controller/dlq/README.md @@ -0,0 +1,51 @@ +# DLQ Reconciliation Controllers + +This package contains the controllers that drain each primary pipeline topic's `{topic}_dlq` companion and reconcile the affected request or batch into a terminal failed state. They are wired alongside the primary controllers in `example/submitqueue/orchestrator/server/main.go`. + +## Design principles + +**The DLQ is the final destination.** A message in `{topic}_dlq` has already failed the primary controller's retry budget (`Retry.MaxAttempts` for a retryable error) or surfaced as non-retryable on the first attempt. There is no further dead-letter beyond this point — every DLQ subscription is configured with `DLQ.Enabled = false`, so a DLQ controller's outcome is either "reconciled" or "the message keeps coming back". A genuinely unprocessable DLQ message (e.g. a malformed payload that no controller can ever decode) must be removed by an operator from the queue manually. + +**The implementation is deliberately the simplest and most reliable thing that can work.** Each controller does the same three steps: decode the payload to recover the affected `RequestID` or `BatchID`, fetch the entity, transition it to its terminal failed state with the same optimistic-locking CAS the primary pipeline uses. No business logic, no branching on the original error, no per-stage cleanup. Adding behaviour here trades off the one property the DLQ has to provide — convergence. + +**Reconcile only; do not attempt to recover.** A DLQ controller never tries to re-run the failed stage, retry the original action against an external dependency, or repair a partially-completed transition. The original controller already failed; the DLQ controller's only job is to make sure the request and batch state stop saying "in progress" and start saying "failed", so the gateway reports an honest answer and downstream tooling can move on. Recovery, when it is appropriate, is a separate concern handled by an operator or a future reconciliation job — not by this code path. + +## Convergence guarantee + +DLQ consumers are wired with `errs.AlwaysRetryableProcessor` and a very high `Retry.MaxAttempts` (currently 1000). Together with `DLQ.Enabled = false` on the DLQ subscription itself, this means any non-nil error returned from a DLQ controller — including a plain unclassified infra error — is forced retryable and redelivered rather than silently dropped. The combination is "always retryable + bounded-but-effectively-infinite attempts" and is the property the package relies on for convergence. + +The recognised error condition is handled explicitly in `dlq.go`: + +- `storage.ErrNotFound` → logged at warn and treated as success. The request or batch never persisted; there is nothing to reconcile. + +Everything else — including `storage.ErrVersionMismatch` on the CAS — is returned plain and, after the always-retryable processor wrap, redelivered until it either succeeds or hits the attempt cap. There is no point in pre-classifying retryability at this layer when the processor forces every non-nil error retryable anyway. + +## Request log entries are written directly, not via the queue + +When a DLQ controller transitions a request to `RequestStateError` it also appends a `RequestStatusError` row to `RequestLogStore` via a direct `Insert` call, rather than publishing a `RequestLog` message to the `log` topic the way the primary controllers do. This is deliberate: DLQ controllers must not call back into the primary pipeline. The primary pipeline is what failed and routed the message here in the first place, and re-entering it would risk the same failure mode that caused the DLQ trip. The direct insert produces an equivalent record because the primary log controller is itself a thin wrapper around the same `RequestLogStore`. + +## Controller mapping + +Two controller shapes cover the eleven primary pipeline topics: + +| Controller | Topics | Decoded ID | Terminal state | +|---|---|---|---| +| `NewDLQRequestController` | `start`, `validate`, `batch`, `cancel`, `log` | `RequestID` | `RequestStateError` | +| `NewDLQBatchController` | `score`, `speculate`, `build`, `merge`, `conclude` | `BatchID` | `BatchStateFailed` + fan-out to member requests as `RequestStateError` | + +`buildsignal` carries a `Build` payload and has its own small dedicated controller. The split exists because the DLQ message payload shape mirrors the primary topic's payload shape (the queue framework preserves bytes verbatim under the `_dlq` topic name), so the decoder is what changes per topic — not the reconciliation step. The package-level `RequestIDDecoder` interface plus `DecodeLandRequestID` / `DecodeCancelRequestID` / `DecodeRequestID` cover the three payload shapes used by request-scoped topics. + +## Idempotency and concurrent activity + +Reconciliation is safe to run more than once for the same message: + +- A request already in a terminal state skips the state-transition CAS but still gets a `RequestStatusError` row appended to the request log. The log write runs unconditionally so that a previous DLQ attempt which successfully flipped the state but then failed to insert the log is repaired on redelivery. A duplicate log entry from such a retry is the accepted trade-off; a missing one would leave the gateway-visible status divergent from the entity state. +- A batch already in a terminal state still fans out to member requests, because a previous attempt may have transitioned the batch but crashed before completing the fan-out. +- Per-request fan-out is itself idempotent via `failRequest`. +- A request in `RequestStateCancelling` is reconciled to `RequestStateError`, not left in place: DLQ means the pipeline failed to converge, so we cannot confirm the cancel completed cleanly. Writing Error is the honest signal. + +## See also + +- `core/errs/README.md` — the error-processing framework, including `AlwaysRetryableProcessor` and the choice of processor for primary vs. DLQ consumers. +- `submitqueue/core/consumer/README.md` — how the consumer applies the processor to controller errors and decides ack/nack/reject. +- `doc/rfc/submitqueue/workflow.md` — the per-stage primary pipeline that the DLQ companions mirror. diff --git a/submitqueue/orchestrator/controller/dlq/batch.go b/submitqueue/orchestrator/controller/dlq/batch.go new file mode 100644 index 00000000..3359b0cb --- /dev/null +++ b/submitqueue/orchestrator/controller/dlq/batch.go @@ -0,0 +1,119 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dlq + +import ( + "context" + "fmt" + + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/metrics" + "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/storage" + "go.uber.org/zap" +) + +// batchController is the DLQ reconciler for batch-scoped pipeline stages +// (score, speculate, build, merge, conclude). All five topics carry a +// BatchID payload, so this controller is registered five times — one per +// topic, each with the matching DLQ topic key and consumer group. +// +// On each delivery the controller decodes the BatchID, transitions the batch +// to BatchStateFailed (idempotent if already halted), and fans out by +// transitioning each member request to RequestStateError. The fan-out exists +// because conclude — which normally drives request state from batch state — +// will not run for a DLQ'd batch. +type batchController struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + store storage.Storage + topicKey consumer.TopicKey + consumerGroup string +} + +// Verify batchController implements consumer.Controller at compile time. +var _ consumer.Controller = (*batchController)(nil) + +// NewDLQBatchController builds a DLQ controller for a batch-scoped topic. +// topicKey must be the DLQ topic key (typically TopicKey(primary)). +func NewDLQBatchController( + logger *zap.SugaredLogger, + scope tally.Scope, + store storage.Storage, + topicKey consumer.TopicKey, + consumerGroup string, +) consumer.Controller { + name := string(topicKey) + "_controller" + return &batchController{ + logger: logger.Named(name), + metricsScope: scope.SubScope(name), + store: store, + topicKey: topicKey, + consumerGroup: consumerGroup, + } +} + +// Process reconciles a single DLQ delivery for a batch-scoped topic. +func (c *batchController) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + + bid, err := entity.BatchIDFromBytes(msg.Payload) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) + return fmt.Errorf("failed to decode batch id from dlq payload: %w", err) + } + if bid.ID == "" { + metrics.NamedCounter(c.metricsScope, opName, "empty_id_errors", 1) + return fmt.Errorf("dlq payload decoded to empty batch id") + } + + dmeta := delivery.Metadata() + c.logger.Warnw("dlq message received", + "batch_id", bid.ID, + "attempt", delivery.Attempt(), + "dlq_original_topic", dmeta["dlq.original_topic"], + "dlq_failure_count", dmeta["dlq.failure_count"], + "dlq_last_error", dmeta["dlq.last_error"], + ) + + if err := failBatch(ctx, c.store, c.logger, bid.ID); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "reconcile_errors", 1) + return err + } + + metrics.NamedCounter(c.metricsScope, opName, "reconciled", 1) + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *batchController) Name() string { + return string(c.topicKey) +} + +// TopicKey returns the topic key this controller subscribes to. +func (c *batchController) TopicKey() consumer.TopicKey { + return c.topicKey +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *batchController) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/submitqueue/orchestrator/controller/dlq/batch_test.go b/submitqueue/orchestrator/controller/dlq/batch_test.go new file mode 100644 index 00000000..45348c04 --- /dev/null +++ b/submitqueue/orchestrator/controller/dlq/batch_test.go @@ -0,0 +1,97 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dlq + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/entity" + storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +func TestDLQBatchController_InterfaceAndAccessors(t *testing.T) { + ctrl := gomock.NewController(t) + store := storagemock.NewMockStorage(ctrl) + + c := NewDLQBatchController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(consumer.TopicKeyMerge), "orchestrator-merge-dlq") + + assert.Equal(t, "merge_dlq", c.Name()) + assert.Equal(t, consumer.TopicKey("merge_dlq"), c.TopicKey()) + assert.Equal(t, "orchestrator-merge-dlq", c.ConsumerGroup()) +} + +func TestDLQBatchController_Process_FailsAndFansOut(t *testing.T) { + ctrl := gomock.NewController(t) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), "q/batch/9").Return(entity.Batch{ + ID: "q/batch/9", Queue: "q", Contains: []string{"q/1"}, + State: entity.BatchStateMerging, Version: 2, + }, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), "q/batch/9", int32(2), int32(3), entity.BatchStateFailed).Return(nil) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), "q/1").Return(entity.Request{ + ID: "q/1", Version: 1, State: entity.RequestStateProcessing, + }, nil) + requestStore.EXPECT().UpdateState(gomock.Any(), "q/1", int32(1), int32(2), entity.RequestStateError).Return(nil) + + logStore := storagemock.NewMockRequestLogStore(ctrl) + logStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() + + c := NewDLQBatchController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(consumer.TopicKeyMerge), "orchestrator-merge-dlq") + + payload, err := entity.BatchID{ID: "q/batch/9"}.ToBytes() + require.NoError(t, err) + + delivery := newMockDelivery(ctrl, payload) + require.NoError(t, c.Process(context.Background(), delivery)) +} + +func TestDLQBatchController_Process_MalformedPayloadFails(t *testing.T) { + ctrl := gomock.NewController(t) + + store := storagemock.NewMockStorage(ctrl) + c := NewDLQBatchController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(consumer.TopicKeyMerge), "orchestrator-merge-dlq") + + delivery := newMockDelivery(ctrl, []byte("garbage")) + err := c.Process(context.Background(), delivery) + require.Error(t, err) +} + +func TestDLQBatchController_Process_EmptyIDFails(t *testing.T) { + ctrl := gomock.NewController(t) + + store := storagemock.NewMockStorage(ctrl) + c := NewDLQBatchController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(consumer.TopicKeyMerge), "orchestrator-merge-dlq") + + payload, err := entity.BatchID{ID: ""}.ToBytes() + require.NoError(t, err) + + delivery := newMockDelivery(ctrl, payload) + err = c.Process(context.Background(), delivery) + require.Error(t, err) +} diff --git a/submitqueue/orchestrator/controller/dlq/buildsignal.go b/submitqueue/orchestrator/controller/dlq/buildsignal.go new file mode 100644 index 00000000..a05ae56f --- /dev/null +++ b/submitqueue/orchestrator/controller/dlq/buildsignal.go @@ -0,0 +1,143 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dlq + +import ( + "context" + "errors" + "fmt" + + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/metrics" + "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/storage" + "go.uber.org/zap" +) + +// buildSignalController is the DLQ reconciler for the buildsignal topic. Its +// payload carries a BuildID, so reconciliation needs an extra hop: look up +// the Build to recover its BatchID, then fan out via failBatch. +// +// The build itself is left in whatever non-terminal state the build runner +// last reported. Fixing the build entity is not useful here — the +// pipeline's source of truth for "did this batch finish" is the batch state, +// and that is what gates the gateway response and conclude. +type buildSignalController struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + store storage.Storage + topicKey consumer.TopicKey + consumerGroup string +} + +// Verify buildSignalController implements consumer.Controller at compile time. +var _ consumer.Controller = (*buildSignalController)(nil) + +// NewDLQBuildSignalController builds a DLQ controller for the buildsignal topic. +func NewDLQBuildSignalController( + logger *zap.SugaredLogger, + scope tally.Scope, + store storage.Storage, + topicKey consumer.TopicKey, + consumerGroup string, +) consumer.Controller { + name := string(topicKey) + "_controller" + return &buildSignalController{ + logger: logger.Named(name), + metricsScope: scope.SubScope(name), + store: store, + topicKey: topicKey, + consumerGroup: consumerGroup, + } +} + +// Process reconciles a single DLQ delivery for the buildsignal topic. +func (c *buildSignalController) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + + bid, err := entity.BuildIDFromBytes(msg.Payload) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) + return fmt.Errorf("failed to decode build id from dlq payload: %w", err) + } + if bid.ID == "" { + metrics.NamedCounter(c.metricsScope, opName, "empty_id_errors", 1) + return fmt.Errorf("dlq payload decoded to empty build id") + } + + dmeta := delivery.Metadata() + c.logger.Warnw("dlq message received", + "build_id", bid.ID, + "attempt", delivery.Attempt(), + "dlq_original_topic", dmeta["dlq.original_topic"], + "dlq_failure_count", dmeta["dlq.failure_count"], + "dlq_last_error", dmeta["dlq.last_error"], + ) + + build, err := c.store.GetBuildStore().Get(ctx, bid.ID) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + // The build was never persisted (e.g. the build controller crashed + // before Create). There is no batch to reconcile from this signal — + // any associated batch should be reconciled from its own DLQ. + c.logger.Warnw("dlq reconcile: build not found, skipping", + "build_id", bid.ID, + ) + metrics.NamedCounter(c.metricsScope, opName, "build_not_found", 1) + return nil + } + metrics.NamedCounter(c.metricsScope, opName, "build_store_errors", 1) + return fmt.Errorf("failed to get build %s: %w", bid.ID, err) + } + + if build.BatchID == "" { + // Defensive: a build without a batch is malformed and there is nothing + // to fan out to. Log and ack so the DLQ does not grow unbounded. + c.logger.Errorw("dlq reconcile: build has empty batch id, skipping", + "build_id", bid.ID, + ) + metrics.NamedCounter(c.metricsScope, opName, "build_missing_batch", 1) + return nil + } + + if err := failBatch(ctx, c.store, c.logger, build.BatchID); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "reconcile_errors", 1) + return err + } + + metrics.NamedCounter(c.metricsScope, opName, "reconciled", 1) + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *buildSignalController) Name() string { + return string(c.topicKey) +} + +// TopicKey returns the topic key this controller subscribes to. +func (c *buildSignalController) TopicKey() consumer.TopicKey { + return c.topicKey +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *buildSignalController) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/submitqueue/orchestrator/controller/dlq/buildsignal_test.go b/submitqueue/orchestrator/controller/dlq/buildsignal_test.go new file mode 100644 index 00000000..7ce4de3d --- /dev/null +++ b/submitqueue/orchestrator/controller/dlq/buildsignal_test.go @@ -0,0 +1,128 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dlq + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/storage" + storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +func TestDLQBuildSignalController_InterfaceAndAccessors(t *testing.T) { + ctrl := gomock.NewController(t) + store := storagemock.NewMockStorage(ctrl) + + c := NewDLQBuildSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(consumer.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq") + + assert.Equal(t, "buildsignal_dlq", c.Name()) + assert.Equal(t, consumer.TopicKey("buildsignal_dlq"), c.TopicKey()) + assert.Equal(t, "orchestrator-buildsignal-dlq", c.ConsumerGroup()) +} + +func TestDLQBuildSignalController_Process_FansOutToBatch(t *testing.T) { + ctrl := gomock.NewController(t) + + buildStore := storagemock.NewMockBuildStore(ctrl) + buildStore.EXPECT().Get(gomock.Any(), "build-1").Return(entity.Build{ + ID: "build-1", BatchID: "q/batch/2", Status: entity.BuildStatusRunning, + }, nil) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), "q/batch/2").Return(entity.Batch{ + ID: "q/batch/2", Queue: "q", Contains: []string{"q/1"}, + State: entity.BatchStateSpeculating, Version: 3, + }, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), "q/batch/2", int32(3), int32(4), entity.BatchStateFailed).Return(nil) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), "q/1").Return(entity.Request{ + ID: "q/1", Version: 1, State: entity.RequestStateProcessing, + }, nil) + requestStore.EXPECT().UpdateState(gomock.Any(), "q/1", int32(1), int32(2), entity.RequestStateError).Return(nil) + + logStore := storagemock.NewMockRequestLogStore(ctrl) + logStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() + + c := NewDLQBuildSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(consumer.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq") + + payload, err := entity.BuildID{ID: "build-1"}.ToBytes() + require.NoError(t, err) + + delivery := newMockDelivery(ctrl, payload) + require.NoError(t, c.Process(context.Background(), delivery)) +} + +func TestDLQBuildSignalController_Process_BuildNotFoundIsNoOp(t *testing.T) { + ctrl := gomock.NewController(t) + + buildStore := storagemock.NewMockBuildStore(ctrl) + buildStore.EXPECT().Get(gomock.Any(), "build-1").Return(entity.Build{}, storage.ErrNotFound) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() + + c := NewDLQBuildSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(consumer.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq") + + payload, err := entity.BuildID{ID: "build-1"}.ToBytes() + require.NoError(t, err) + + delivery := newMockDelivery(ctrl, payload) + require.NoError(t, c.Process(context.Background(), delivery)) +} + +func TestDLQBuildSignalController_Process_BuildMissingBatchIsNoOp(t *testing.T) { + ctrl := gomock.NewController(t) + + buildStore := storagemock.NewMockBuildStore(ctrl) + buildStore.EXPECT().Get(gomock.Any(), "build-1").Return(entity.Build{ + ID: "build-1", BatchID: "", Status: entity.BuildStatusRunning, + }, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBuildStore().Return(buildStore).AnyTimes() + + c := NewDLQBuildSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(consumer.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq") + + payload, err := entity.BuildID{ID: "build-1"}.ToBytes() + require.NoError(t, err) + + delivery := newMockDelivery(ctrl, payload) + require.NoError(t, c.Process(context.Background(), delivery)) +} + +func TestDLQBuildSignalController_Process_MalformedPayloadFails(t *testing.T) { + ctrl := gomock.NewController(t) + + store := storagemock.NewMockStorage(ctrl) + c := NewDLQBuildSignalController(zaptest.NewLogger(t).Sugar(), testScope(), store, TopicKey(consumer.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq") + + delivery := newMockDelivery(ctrl, []byte("garbage")) + err := c.Process(context.Background(), delivery) + require.Error(t, err) +} diff --git a/submitqueue/orchestrator/controller/dlq/dlq.go b/submitqueue/orchestrator/controller/dlq/dlq.go new file mode 100644 index 00000000..6d48434c --- /dev/null +++ b/submitqueue/orchestrator/controller/dlq/dlq.go @@ -0,0 +1,176 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package dlq contains controllers that consume messages from per-topic +// dead-letter queues and reconcile the affected request and batch entities +// into a terminal failed state. +// +// Background. The consumer framework moves a message to its DLQ after the +// controller for the original topic returns a non-retryable error or exhausts +// retries on a retryable error. Without DLQ reconciliation the affected +// request would remain stuck in a non-terminal state (e.g. Validated, Batched, +// Processing) forever — the gateway would still report it as "in progress" +// even though no pipeline stage is going to advance it. +// +// Reconciliation strategy. Each DLQ topic carries the same payload as its +// originating topic (the queue framework preserves the bytes verbatim under a +// new `{topic}_dlq` name). The DLQ controllers decode that payload to recover +// the affected request or batch, then transition it to a terminal failed +// state — Error for requests, Failed for batches — with an idempotent +// optimistic-locking write so concurrent activity (a late merge, a cancel +// race) wins cleanly. Batch failures also fan out to the member requests so +// the gateway no longer reports them as in-progress. +package dlq + +import ( + "context" + "errors" + "fmt" + + "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/storage" + "go.uber.org/zap" +) + +// topicSuffix is appended to a primary topic key to derive the corresponding +// DLQ topic key. The queue extension's DefaultSubscriptionConfig also uses +// "_dlq" as the DLQ topic suffix; keeping both in sync is intentional so +// that a registered DLQ subscription's topic name matches the controller's +// TopicKey(). +const topicSuffix = "_dlq" + +// TopicKey returns the DLQ topic key for the given primary pipeline topic. +// The returned key is meant to be used both when registering the DLQ topic +// with the topic registry and when the corresponding DLQ controller advertises +// its TopicKey(). It is exported so the orchestrator wiring layer can build +// matching pairs without duplicating the suffix literal. +func TopicKey(main consumer.TopicKey) consumer.TopicKey { + return consumer.TopicKey(string(main) + topicSuffix) +} + +// failRequest transitions a request to RequestStateError if it is not already +// in a terminal state, and unconditionally appends a RequestStatusError row to +// the request log. The state transition is idempotent — a request already in a +// terminal state skips the UpdateState CAS — but the log insert runs on every +// successful call so that a prior attempt that wrote the Error state but then +// failed to insert the log is repaired on redelivery. +// +// A request in RequestStateCancelling is reconciled to RequestStateError, not +// left in place: DLQ means the pipeline failed to converge, so we cannot +// confirm the cancel completed cleanly. Writing Error is the honest signal and +// keeps the request from being stuck in a non-terminal state forever. +func failRequest(ctx context.Context, store storage.Storage, logger *zap.SugaredLogger, requestID string) error { + request, err := store.GetRequestStore().Get(ctx, requestID) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + logger.Warnw("dlq reconcile: request not found, skipping", + "request_id", requestID, + ) + return nil + } + return fmt.Errorf("failed to get request %s: %w", requestID, err) + } + + logVersion := request.Version + if entity.IsRequestStateTerminal(request.State) { + logger.Infow("dlq reconcile: request already terminal, ensuring log entry", + "request_id", requestID, + "state", string(request.State), + ) + } else { + newVersion := request.Version + 1 + if err := store.GetRequestStore().UpdateState(ctx, requestID, request.Version, newVersion, entity.RequestStateError); err != nil { + return fmt.Errorf("failed to update request %s state to error: %w", requestID, err) + } + logVersion = newVersion + logger.Infow("dlq reconcile: request marked terminal error", + "request_id", requestID, + "previous_state", string(request.State), + ) + } + + // Append the terminal Error status to the request log directly via the + // RequestLogStore, bypassing the log topic and the primary log controller. + // DLQ controllers must not call back into the primary pipeline (publishing + // to a primary topic) — the primary pipeline is what failed and routed this + // message to the DLQ in the first place, and re-entering it would risk the + // same failure mode that put us here. The log store is the same storage + // backend the primary log controller eventually writes to, so a direct + // insert produces an equivalent record without the round-trip. + // + // The log entry is written unconditionally — even when the state was already + // terminal on entry — so a previous DLQ attempt that succeeded in flipping + // the state but then failed to insert the log is repaired on redelivery. + // A duplicate log entry from such a retry is the accepted trade-off; a + // missing one would leave the gateway-visible status divergent from the + // entity state. + logEntry := entity.NewRequestLog(requestID, entity.RequestStatusError, logVersion, "", nil) + if err := store.GetRequestLogStore().Insert(ctx, logEntry); err != nil { + return fmt.Errorf("failed to insert request log for %s: %w", requestID, err) + } + + return nil +} + +// failBatch transitions a batch to BatchStateFailed if it is not already in a +// terminal state, then fans out by transitioning each member request to +// RequestStateError. The fan-out mirrors what the conclude controller would do +// for a normally-completed batch, but skips re-publishing to the conclude +// topic — for DLQ messages there is no guarantee that conclude would ever run, +// so the reconciliation has to drive each request directly. +// +// A batch in BatchStateCancelling is reconciled to BatchStateFailed for the +// same reason failRequest reconciles Cancelling requests: DLQ means we cannot +// confirm the cancel completed, so the batch must reach a terminal state. +// +// Idempotency: if the batch is already terminal the function still fans out +// to the member requests, because a previous attempt may have transitioned +// the batch but crashed before completing the fan-out. Per-request fan-out +// is itself idempotent via failRequest. +func failBatch(ctx context.Context, store storage.Storage, logger *zap.SugaredLogger, batchID string) error { + batch, err := store.GetBatchStore().Get(ctx, batchID) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + logger.Warnw("dlq reconcile: batch not found, skipping", + "batch_id", batchID, + ) + return nil + } + return fmt.Errorf("failed to get batch %s: %w", batchID, err) + } + + if batch.State.IsTerminal() { + logger.Infow("dlq reconcile: batch already terminal, fanning out only", + "batch_id", batchID, + "state", string(batch.State), + ) + } else { + newVersion := batch.Version + 1 + if err := store.GetBatchStore().UpdateState(ctx, batchID, batch.Version, newVersion, entity.BatchStateFailed); err != nil { + return fmt.Errorf("failed to update batch %s state to failed: %w", batchID, err) + } + logger.Infow("dlq reconcile: batch marked failed", + "batch_id", batchID, + "previous_state", string(batch.State), + ) + } + + for _, requestID := range batch.Contains { + if err := failRequest(ctx, store, logger, requestID); err != nil { + return fmt.Errorf("fan-out for batch %s: %w", batchID, err) + } + } + return nil +} diff --git a/submitqueue/orchestrator/controller/dlq/dlq_test.go b/submitqueue/orchestrator/controller/dlq/dlq_test.go new file mode 100644 index 00000000..a8a0528d --- /dev/null +++ b/submitqueue/orchestrator/controller/dlq/dlq_test.go @@ -0,0 +1,306 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dlq + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/errs" + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/storage" + storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +// failRequest + +func TestFailRequest_TerminalSkipsUpdateButLogsAnyway(t *testing.T) { + // Truly-terminal states skip the UpdateState CAS but still get a log entry + // written. The log write runs unconditionally so that a previous attempt + // which flipped the state but then failed to insert the log is repaired on + // redelivery. Cancelling is intentionally NOT in this list — see + // TestFailRequest_CancellingTransitionsToError. DLQ must drive every + // non-terminal state to a terminal one or the request stays "in progress" + // forever from the gateway's perspective. + terminalStates := []entity.RequestState{ + entity.RequestStateLanded, + entity.RequestStateError, + entity.RequestStateCancelled, + } + for _, state := range terminalStates { + state := state + t.Run(string(state), func(t *testing.T) { + ctrl := gomock.NewController(t) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), "q/1").Return(entity.Request{ + ID: "q/1", Version: 5, State: state, + }, nil) + // no UpdateState expected — state is already terminal + + logStore := storagemock.NewMockRequestLogStore(ctrl) + logStore.EXPECT().Insert(gomock.Any(), gomock.AssignableToTypeOf(entity.RequestLog{})).DoAndReturn(func(_ context.Context, l entity.RequestLog) error { + assert.Equal(t, "q/1", l.RequestID) + assert.Equal(t, entity.RequestStatusError, l.Status) + assert.Equal(t, int32(5), l.RequestVersion) + return nil + }) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() + + err := failRequest(context.Background(), store, zaptest.NewLogger(t).Sugar(), "q/1") + require.NoError(t, err) + }) + } +} + +// TestFailRequest_CancellingTransitionsToError verifies that a request stuck in +// the non-terminal Cancelling state is reconciled to Error. If failRequest +// short-circuited on Cancelling the request would remain in-progress forever, +// because the cancel pipeline that owns the Cancelling → Cancelled transition +// has itself died (that's why we're in the DLQ). +func TestFailRequest_CancellingTransitionsToError(t *testing.T) { + ctrl := gomock.NewController(t) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), "q/1").Return(entity.Request{ + ID: "q/1", Version: 7, State: entity.RequestStateCancelling, + }, nil) + requestStore.EXPECT().UpdateState(gomock.Any(), "q/1", int32(7), int32(8), entity.RequestStateError).Return(nil) + + logStore := storagemock.NewMockRequestLogStore(ctrl) + logStore.EXPECT().Insert(gomock.Any(), gomock.AssignableToTypeOf(entity.RequestLog{})).DoAndReturn(func(_ context.Context, l entity.RequestLog) error { + assert.Equal(t, "q/1", l.RequestID) + assert.Equal(t, entity.RequestStatusError, l.Status) + assert.Equal(t, int32(8), l.RequestVersion) + return nil + }) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() + + err := failRequest(context.Background(), store, zaptest.NewLogger(t).Sugar(), "q/1") + require.NoError(t, err) +} + +func TestFailRequest_TransitionsToError(t *testing.T) { + ctrl := gomock.NewController(t) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), "q/1").Return(entity.Request{ + ID: "q/1", Version: 3, State: entity.RequestStateValidated, + }, nil) + requestStore.EXPECT().UpdateState(gomock.Any(), "q/1", int32(3), int32(4), entity.RequestStateError).Return(nil) + + logStore := storagemock.NewMockRequestLogStore(ctrl) + logStore.EXPECT().Insert(gomock.Any(), gomock.AssignableToTypeOf(entity.RequestLog{})).DoAndReturn(func(_ context.Context, l entity.RequestLog) error { + assert.Equal(t, "q/1", l.RequestID) + assert.Equal(t, entity.RequestStatusError, l.Status) + assert.Equal(t, int32(4), l.RequestVersion) + return nil + }) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() + + err := failRequest(context.Background(), store, zaptest.NewLogger(t).Sugar(), "q/1") + require.NoError(t, err) +} + +// TestFailRequest_LogInsertErrorPropagates verifies that a failure to append +// the terminal Error row to the request log is surfaced as an error so the +// always-retryable processor will redeliver the DLQ message. We cannot ack a +// half-reconciled request — the state has flipped to Error but the gateway's +// log-store-derived status would still report the prior non-terminal status. +func TestFailRequest_LogInsertErrorPropagates(t *testing.T) { + ctrl := gomock.NewController(t) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), "q/1").Return(entity.Request{ + ID: "q/1", Version: 3, State: entity.RequestStateValidated, + }, nil) + requestStore.EXPECT().UpdateState(gomock.Any(), "q/1", int32(3), int32(4), entity.RequestStateError).Return(nil) + + logStore := storagemock.NewMockRequestLogStore(ctrl) + logStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(fmt.Errorf("log store boom")) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() + + err := failRequest(context.Background(), store, zaptest.NewLogger(t).Sugar(), "q/1") + require.Error(t, err) +} + +func TestFailRequest_NotFoundIsNoOp(t *testing.T) { + ctrl := gomock.NewController(t) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), "q/1").Return(entity.Request{}, storage.ErrNotFound) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + + err := failRequest(context.Background(), store, zaptest.NewLogger(t).Sugar(), "q/1") + require.NoError(t, err) +} + +func TestFailRequest_GenericGetErrorIsNonRetryable(t *testing.T) { + ctrl := gomock.NewController(t) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), "q/1").Return(entity.Request{}, fmt.Errorf("boom")) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + + err := failRequest(context.Background(), store, zaptest.NewLogger(t).Sugar(), "q/1") + require.Error(t, err) + assert.False(t, errs.IsRetryable(err)) +} + +// failBatch + +func TestFailBatch_TransitionsAndFansOut(t *testing.T) { + ctrl := gomock.NewController(t) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), "q/batch/1").Return(entity.Batch{ + ID: "q/batch/1", Queue: "q", Contains: []string{"q/1", "q/2"}, + State: entity.BatchStateMerging, Version: 4, + }, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), "q/batch/1", int32(4), int32(5), entity.BatchStateFailed).Return(nil) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), "q/1").Return(entity.Request{ + ID: "q/1", Version: 2, State: entity.RequestStateProcessing, + }, nil) + requestStore.EXPECT().UpdateState(gomock.Any(), "q/1", int32(2), int32(3), entity.RequestStateError).Return(nil) + requestStore.EXPECT().Get(gomock.Any(), "q/2").Return(entity.Request{ + ID: "q/2", Version: 1, State: entity.RequestStateProcessing, + }, nil) + requestStore.EXPECT().UpdateState(gomock.Any(), "q/2", int32(1), int32(2), entity.RequestStateError).Return(nil) + + logStore := storagemock.NewMockRequestLogStore(ctrl) + logStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil).Times(2) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() + + err := failBatch(context.Background(), store, zaptest.NewLogger(t).Sugar(), "q/batch/1") + require.NoError(t, err) +} + +func TestFailBatch_AlreadyTerminalFansOutOnly(t *testing.T) { + // Already-terminal batch: skip the CAS but still drive the fan-out so a + // prior crashed attempt that updated the batch but not the requests still + // reconciles to a clean terminal state. + ctrl := gomock.NewController(t) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), "q/batch/1").Return(entity.Batch{ + ID: "q/batch/1", Queue: "q", Contains: []string{"q/1"}, + State: entity.BatchStateFailed, Version: 5, + }, nil) + // no batchStore.UpdateState expected + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), "q/1").Return(entity.Request{ + ID: "q/1", Version: 2, State: entity.RequestStateProcessing, + }, nil) + requestStore.EXPECT().UpdateState(gomock.Any(), "q/1", int32(2), int32(3), entity.RequestStateError).Return(nil) + + logStore := storagemock.NewMockRequestLogStore(ctrl) + logStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() + + err := failBatch(context.Background(), store, zaptest.NewLogger(t).Sugar(), "q/batch/1") + require.NoError(t, err) +} + +// TestFailBatch_CancellingTransitionsToFailed verifies that a batch stuck in +// the non-terminal Cancelling state is reconciled to Failed and its member +// requests are driven from Cancelling to Error. Same rationale as +// TestFailRequest_CancellingTransitionsToError: the cancel pipeline that owns +// the Cancelling → Cancelled transition has died, so DLQ must converge the +// batch and its members to a terminal state. +func TestFailBatch_CancellingTransitionsToFailed(t *testing.T) { + ctrl := gomock.NewController(t) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), "q/batch/1").Return(entity.Batch{ + ID: "q/batch/1", Queue: "q", Contains: []string{"q/1"}, + State: entity.BatchStateCancelling, Version: 6, + }, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), "q/batch/1", int32(6), int32(7), entity.BatchStateFailed).Return(nil) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), "q/1").Return(entity.Request{ + ID: "q/1", Version: 3, State: entity.RequestStateCancelling, + }, nil) + requestStore.EXPECT().UpdateState(gomock.Any(), "q/1", int32(3), int32(4), entity.RequestStateError).Return(nil) + + logStore := storagemock.NewMockRequestLogStore(ctrl) + logStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() + + err := failBatch(context.Background(), store, zaptest.NewLogger(t).Sugar(), "q/batch/1") + require.NoError(t, err) +} + +func TestFailBatch_NotFoundIsNoOp(t *testing.T) { + ctrl := gomock.NewController(t) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), "q/batch/1").Return(entity.Batch{}, storage.ErrNotFound) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + + err := failBatch(context.Background(), store, zaptest.NewLogger(t).Sugar(), "q/batch/1") + require.NoError(t, err) +} + +// TopicKey + +func TestDLQTopicKey(t *testing.T) { + assert.Equal(t, "start_dlq", string(TopicKey("start"))) + assert.Equal(t, "buildsignal_dlq", string(TopicKey("buildsignal"))) +} + +// Helper to build a tally scope shared across tests. +func testScope() tally.Scope { + return tally.NoopScope +} diff --git a/submitqueue/orchestrator/controller/dlq/log.go b/submitqueue/orchestrator/controller/dlq/log.go new file mode 100644 index 00000000..38c32c3d --- /dev/null +++ b/submitqueue/orchestrator/controller/dlq/log.go @@ -0,0 +1,92 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dlq + +import ( + "context" + + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/metrics" + "github.com/uber/submitqueue/submitqueue/core/consumer" + "go.uber.org/zap" +) + +// logController is the DLQ reconciler for the log topic. +// +// Unlike the other DLQ controllers, this one performs no state reconciliation. +// The log topic carries observability events (RequestLog rows). Failing to +// persist a log entry has no functional effect on the pipeline — the request +// and batch entities are unaffected — so the right action is to emit a metric +// and a warning and ack the message so it does not sit in the DLQ forever. +type logController struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + topicKey consumer.TopicKey + consumerGroup string +} + +// Verify logController implements consumer.Controller at compile time. +var _ consumer.Controller = (*logController)(nil) + +// NewDLQLogController builds a DLQ controller for the log topic. +func NewDLQLogController( + logger *zap.SugaredLogger, + scope tally.Scope, + topicKey consumer.TopicKey, + consumerGroup string, +) consumer.Controller { + name := string(topicKey) + "_controller" + return &logController{ + logger: logger.Named(name), + metricsScope: scope.SubScope(name), + topicKey: topicKey, + consumerGroup: consumerGroup, + } +} + +// Process records that a log message landed in the DLQ and acks it. +func (c *logController) Process(_ context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + dmeta := delivery.Metadata() + c.logger.Warnw("log message dropped to dlq", + "message_id", msg.ID, + "attempt", delivery.Attempt(), + "dlq_original_topic", dmeta["dlq.original_topic"], + "dlq_failure_count", dmeta["dlq.failure_count"], + "dlq_last_error", dmeta["dlq.last_error"], + ) + metrics.NamedCounter(c.metricsScope, opName, "dropped", 1) + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *logController) Name() string { + return string(c.topicKey) +} + +// TopicKey returns the topic key this controller subscribes to. +func (c *logController) TopicKey() consumer.TopicKey { + return c.topicKey +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *logController) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/submitqueue/orchestrator/controller/dlq/log_test.go b/submitqueue/orchestrator/controller/dlq/log_test.go new file mode 100644 index 00000000..cfb40ab4 --- /dev/null +++ b/submitqueue/orchestrator/controller/dlq/log_test.go @@ -0,0 +1,43 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dlq + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/submitqueue/submitqueue/core/consumer" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +func TestDLQLogController_InterfaceAndAccessors(t *testing.T) { + c := NewDLQLogController(zaptest.NewLogger(t).Sugar(), testScope(), TopicKey(consumer.TopicKeyLog), "orchestrator-log-dlq") + + assert.Equal(t, "log_dlq", c.Name()) + assert.Equal(t, consumer.TopicKey("log_dlq"), c.TopicKey()) + assert.Equal(t, "orchestrator-log-dlq", c.ConsumerGroup()) +} + +func TestDLQLogController_Process_AcksUnconditionally(t *testing.T) { + ctrl := gomock.NewController(t) + + c := NewDLQLogController(zaptest.NewLogger(t).Sugar(), testScope(), TopicKey(consumer.TopicKeyLog), "orchestrator-log-dlq") + + delivery := newMockDelivery(ctrl, []byte("anything goes")) + require.NoError(t, c.Process(context.Background(), delivery)) +} diff --git a/submitqueue/orchestrator/controller/dlq/request.go b/submitqueue/orchestrator/controller/dlq/request.go new file mode 100644 index 00000000..99c99755 --- /dev/null +++ b/submitqueue/orchestrator/controller/dlq/request.go @@ -0,0 +1,159 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dlq + +import ( + "context" + "fmt" + + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/metrics" + "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/storage" + "go.uber.org/zap" +) + +// RequestIDDecoder extracts the affected request ID from the raw payload bytes +// of a DLQ message. Different primary topics carry different payload shapes +// (LandRequest on start, CancelRequest on cancel, RequestID on validate / +// batch), so the caller injects the right decoder for the topic being +// reconciled. Returning an empty ID is treated as a decode failure. +type RequestIDDecoder func(payload []byte) (string, error) + +// DecodeLandRequestID extracts the request ID from a LandRequest payload +// (the shape used by the start topic). +func DecodeLandRequestID(payload []byte) (string, error) { + lr, err := entity.LandRequestFromBytes(payload) + if err != nil { + return "", err + } + return lr.ID, nil +} + +// DecodeCancelRequestID extracts the request ID from a CancelRequest payload +// (the shape used by the cancel topic). +func DecodeCancelRequestID(payload []byte) (string, error) { + cr, err := entity.CancelRequestFromBytes(payload) + if err != nil { + return "", err + } + return cr.ID, nil +} + +// DecodeRequestID extracts the request ID from a RequestID payload (the shape +// used by the validate and batch topics). +func DecodeRequestID(payload []byte) (string, error) { + rid, err := entity.RequestIDFromBytes(payload) + if err != nil { + return "", err + } + return rid.ID, nil +} + +// requestController is the DLQ reconciler for request-scoped pipeline stages. +// It is registered once per primary request-scoped topic (start, cancel, +// validate, batch) with the matching decoder. On each delivery it decodes the +// request ID and transitions the request to RequestStateError if it is not +// already halted. +type requestController struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + store storage.Storage + decode RequestIDDecoder + topicKey consumer.TopicKey + consumerGroup string +} + +// Verify requestController implements consumer.Controller at compile time. +var _ consumer.Controller = (*requestController)(nil) + +// NewDLQRequestController builds a DLQ controller for a request-scoped topic. +// topicKey must be the DLQ topic key (typically TopicKey(primary)); decode +// must match the payload shape of the primary topic this DLQ drains. +func NewDLQRequestController( + logger *zap.SugaredLogger, + scope tally.Scope, + store storage.Storage, + decode RequestIDDecoder, + topicKey consumer.TopicKey, + consumerGroup string, +) consumer.Controller { + name := string(topicKey) + "_controller" + return &requestController{ + logger: logger.Named(name), + metricsScope: scope.SubScope(name), + store: store, + decode: decode, + topicKey: topicKey, + consumerGroup: consumerGroup, + } +} + +// Process reconciles a single DLQ delivery for a request-scoped topic. +func (c *requestController) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + + requestID, err := c.decode(msg.Payload) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) + // Malformed DLQ payload is non-retryable: a re-delivery will decode the + // same bytes and fail the same way. The framework will route this DLQ + // message to its own DLQ if one is configured; otherwise the message is + // acked and dropped after the error is logged. + return fmt.Errorf("failed to decode dlq payload: %w", err) + } + if requestID == "" { + metrics.NamedCounter(c.metricsScope, opName, "empty_id_errors", 1) + return fmt.Errorf("dlq payload decoded to empty request id") + } + + dmeta := delivery.Metadata() + c.logger.Warnw("dlq message received", + "request_id", requestID, + "attempt", delivery.Attempt(), + "dlq_original_topic", dmeta["dlq.original_topic"], + "dlq_failure_count", dmeta["dlq.failure_count"], + "dlq_last_error", dmeta["dlq.last_error"], + ) + + if err := failRequest(ctx, c.store, c.logger, requestID); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "reconcile_errors", 1) + return err + } + + metrics.NamedCounter(c.metricsScope, opName, "reconciled", 1) + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *requestController) Name() string { + return string(c.topicKey) +} + +// TopicKey returns the topic key this controller subscribes to. +func (c *requestController) TopicKey() consumer.TopicKey { + return c.topicKey +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *requestController) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/submitqueue/orchestrator/controller/dlq/request_test.go b/submitqueue/orchestrator/controller/dlq/request_test.go new file mode 100644 index 00000000..b0dfdc4e --- /dev/null +++ b/submitqueue/orchestrator/controller/dlq/request_test.go @@ -0,0 +1,185 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dlq + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + queue "github.com/uber/submitqueue/entity/messagequeue" + queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" + "github.com/uber/submitqueue/submitqueue/core/consumer" + "github.com/uber/submitqueue/submitqueue/entity" + storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +func TestDLQRequestController_InterfaceAndAccessors(t *testing.T) { + ctrl := gomock.NewController(t) + store := storagemock.NewMockStorage(ctrl) + + c := NewDLQRequestController(zaptest.NewLogger(t).Sugar(), testScope(), store, DecodeRequestID, TopicKey(consumer.TopicKeyValidate), "orchestrator-validate-dlq") + + assert.Equal(t, "validate_dlq", c.Name()) + assert.Equal(t, consumer.TopicKey("validate_dlq"), c.TopicKey()) + assert.Equal(t, "orchestrator-validate-dlq", c.ConsumerGroup()) +} + +func TestDLQRequestController_Process_LandRequestPayload(t *testing.T) { + ctrl := gomock.NewController(t) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), "q/1").Return(entity.Request{ + ID: "q/1", Version: 1, State: entity.RequestStateStarted, + }, nil) + requestStore.EXPECT().UpdateState(gomock.Any(), "q/1", int32(1), int32(2), entity.RequestStateError).Return(nil) + + logStore := storagemock.NewMockRequestLogStore(ctrl) + logStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() + + c := NewDLQRequestController(zaptest.NewLogger(t).Sugar(), testScope(), store, DecodeLandRequestID, TopicKey(consumer.TopicKeyStart), "orchestrator-start-dlq") + + payload, err := entity.LandRequest{ID: "q/1", Queue: "q"}.ToBytes() + require.NoError(t, err) + + delivery := newMockDelivery(ctrl, payload) + require.NoError(t, c.Process(context.Background(), delivery)) +} + +func TestDLQRequestController_Process_CancelRequestPayload(t *testing.T) { + ctrl := gomock.NewController(t) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), "q/7").Return(entity.Request{ + ID: "q/7", Version: 2, State: entity.RequestStateBatched, + }, nil) + requestStore.EXPECT().UpdateState(gomock.Any(), "q/7", int32(2), int32(3), entity.RequestStateError).Return(nil) + + logStore := storagemock.NewMockRequestLogStore(ctrl) + logStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() + + c := NewDLQRequestController(zaptest.NewLogger(t).Sugar(), testScope(), store, DecodeCancelRequestID, TopicKey(consumer.TopicKeyCancel), "orchestrator-cancel-dlq") + + payload, err := entity.CancelRequest{ID: "q/7", Reason: "user"}.ToBytes() + require.NoError(t, err) + + delivery := newMockDelivery(ctrl, payload) + require.NoError(t, c.Process(context.Background(), delivery)) +} + +func TestDLQRequestController_Process_RequestIDPayload(t *testing.T) { + ctrl := gomock.NewController(t) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), "q/3").Return(entity.Request{ + ID: "q/3", Version: 1, State: entity.RequestStateValidated, + }, nil) + requestStore.EXPECT().UpdateState(gomock.Any(), "q/3", int32(1), int32(2), entity.RequestStateError).Return(nil) + + logStore := storagemock.NewMockRequestLogStore(ctrl) + logStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() + + c := NewDLQRequestController(zaptest.NewLogger(t).Sugar(), testScope(), store, DecodeRequestID, TopicKey(consumer.TopicKeyBatch), "orchestrator-batch-dlq") + + payload, err := entity.RequestID{ID: "q/3"}.ToBytes() + require.NoError(t, err) + + delivery := newMockDelivery(ctrl, payload) + require.NoError(t, c.Process(context.Background(), delivery)) +} + +func TestDLQRequestController_Process_AlreadyTerminalSkipsUpdateButLogsAnyway(t *testing.T) { + ctrl := gomock.NewController(t) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), "q/1").Return(entity.Request{ + ID: "q/1", Version: 5, State: entity.RequestStateLanded, + }, nil) + // no UpdateState expected + + logStore := storagemock.NewMockRequestLogStore(ctrl) + logStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + store.EXPECT().GetRequestLogStore().Return(logStore).AnyTimes() + + c := NewDLQRequestController(zaptest.NewLogger(t).Sugar(), testScope(), store, DecodeRequestID, TopicKey(consumer.TopicKeyValidate), "orchestrator-validate-dlq") + + payload, err := entity.RequestID{ID: "q/1"}.ToBytes() + require.NoError(t, err) + + delivery := newMockDelivery(ctrl, payload) + require.NoError(t, c.Process(context.Background(), delivery)) +} + +func TestDLQRequestController_Process_MalformedPayloadFails(t *testing.T) { + ctrl := gomock.NewController(t) + + store := storagemock.NewMockStorage(ctrl) + // no store calls expected + + c := NewDLQRequestController(zaptest.NewLogger(t).Sugar(), testScope(), store, DecodeRequestID, TopicKey(consumer.TopicKeyValidate), "orchestrator-validate-dlq") + + delivery := newMockDelivery(ctrl, []byte("not json")) + err := c.Process(context.Background(), delivery) + require.Error(t, err) +} + +func TestDLQRequestController_Process_EmptyIDFails(t *testing.T) { + ctrl := gomock.NewController(t) + + store := storagemock.NewMockStorage(ctrl) + // no store calls expected + + c := NewDLQRequestController(zaptest.NewLogger(t).Sugar(), testScope(), store, DecodeRequestID, TopicKey(consumer.TopicKeyValidate), "orchestrator-validate-dlq") + + payload, err := entity.RequestID{ID: ""}.ToBytes() + require.NoError(t, err) + + delivery := newMockDelivery(ctrl, payload) + err = c.Process(context.Background(), delivery) + require.Error(t, err) +} + +// newMockDelivery returns a MockDelivery wired up enough to be passed through +// the DLQ controller Process flow. +func newMockDelivery(ctrl *gomock.Controller, payload []byte) *queuemock.MockDelivery { + msg := queue.NewMessage("dlq-msg-1", payload, "", nil) + d := queuemock.NewMockDelivery(ctrl) + d.EXPECT().Message().Return(msg).AnyTimes() + d.EXPECT().Attempt().Return(1).AnyTimes() + d.EXPECT().Metadata().Return(map[string]string{ + "dlq.original_topic": "validate", + "dlq.failure_count": "3", + "dlq.last_error": "boom", + }).AnyTimes() + return d +} diff --git a/test/integration/submitqueue/core/consumer/BUILD.bazel b/test/integration/submitqueue/core/consumer/BUILD.bazel index 9ebf35e2..c2766418 100644 --- a/test/integration/submitqueue/core/consumer/BUILD.bazel +++ b/test/integration/submitqueue/core/consumer/BUILD.bazel @@ -12,6 +12,7 @@ go_test( "integration", ], deps = [ + "//core/errs", "//entity/messagequeue", "//extension/messagequeue", "//extension/messagequeue/mysql", diff --git a/test/integration/submitqueue/core/consumer/consumer_test.go b/test/integration/submitqueue/core/consumer/consumer_test.go index 6bfc668a..11a08dc6 100644 --- a/test/integration/submitqueue/core/consumer/consumer_test.go +++ b/test/integration/submitqueue/core/consumer/consumer_test.go @@ -14,6 +14,7 @@ import ( "github.com/uber-go/tally/v4" "go.uber.org/zap/zaptest" + "github.com/uber/submitqueue/core/errs" entityqueue "github.com/uber/submitqueue/entity/messagequeue" extqueue "github.com/uber/submitqueue/extension/messagequeue" queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql" @@ -146,7 +147,7 @@ func (s *ConsumerIntegrationSuite) newConsumer(t *testing.T, q extqueue.Queue, t }) require.NoError(t, err) - return consumer.New(logger, tally.NoopScope, registry) + return consumer.New(logger, tally.NoopScope, registry, errs.NewClassifierProcessor()) } func (s *ConsumerIntegrationSuite) TestConsumerPerPartitionIsolation() {