From 54f4c7b45604fc34bea3bab7ca2bedb20d7d8968 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Thu, 4 Jun 2026 20:48:50 -0700 Subject: [PATCH 1/2] refactor(request-log): gateway is sole writer of request log MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary ### Why? The request log had two persistence paths: the gateway wrote some entries directly, while the orchestrator ran the `log`-topic consumer that wrote all downstream entries to storage. Having the orchestrator persist the request log blurs ownership — the orchestrator is a pipeline that should only emit events, not own the request-log table. Concentrating all writes in the gateway gives a single owner for the request log and keeps the orchestrator free of request-log storage writes. ### What? The request-log persistence consumer moves from the orchestrator to the gateway: - Move `submitqueue/orchestrator/controller/log/` → `submitqueue/gateway/controller/log/` (importpath, doc comment, and default consumer group `orchestrator-log` → `gateway-log`). Logic is unchanged. - Orchestrator: `TopicKeyLog` becomes publish-only (subscription dropped), the log controller registration and import are removed, controller count 11 → 10. It still publishes via `submitqueue/core/request.PublishLog`. - Gateway: builds a consumer (generic + mysql classifiers), registers the moved log controller on `TopicKeyLog` with a subscription (group `gateway-log`), starts it, and drains it with `Stop(30000)` on shutdown — preserving the 128+SIGTERM graceful-exit contract. - Add `HOSTNAME=gateway-dev` to both gateway compose files for a stable subscriber name; update the workflow RFC and gateway README. - Tests: add a gateway integration test that publishes to the log topic (as the orchestrator does) and asserts the gateway consumer persists it, and an e2e test that lands a request and asserts Status advances to `started` — exercising the publish→consume→persist path across both services. The gateway keeps its two synchronous direct writes (`accepted` on Land, `cancelling` on Cancel) for read-your-write visibility at RPC return. Both are gateway writes, so the invariant holds: only the gateway persists the request log; the orchestrator only publishes. This works because gateway and orchestrator already share the same queue and app databases. ## Test Plan - ✅ `bazel build` of both servers + the moved package - ✅ `make test` — unit tests pass (incl. the moved log_test) - ✅ `make check-gazelle`, `make check-tidy`, `make lint` (fmt + license) - ✅ `make integration-test-submitqueue-gateway` — new `TestRequestLogConsumer` verifies the gateway consumer persists a log entry published to the log topic - ✅ `make e2e-test` — new `TestLandRequest_PersistsStartedLogViaGatewayConsumer` verifies an orchestrator-published `started` log is persisted by the gateway and readable via Status; both services still exit 128+SIGTERM on shutdown --- doc/rfc/submitqueue/workflow.md | 8 +-- example/submitqueue/docker-compose.yml | 2 + .../submitqueue/gateway/server/BUILD.bazel | 4 ++ .../gateway/server/docker-compose.yml | 2 + example/submitqueue/gateway/server/main.go | 72 +++++++++++++++++-- .../orchestrator/server/BUILD.bazel | 1 - .../submitqueue/orchestrator/server/main.go | 20 ++---- submitqueue/gateway/README.md | 21 +++++- .../controller/log/BUILD.bazel | 2 +- .../controller/log/log.go | 6 +- .../controller/log/log_test.go | 2 +- test/e2e/submitqueue/BUILD.bazel | 1 + test/e2e/submitqueue/suite_test.go | 40 +++++++++++ .../submitqueue/gateway/BUILD.bazel | 6 ++ .../submitqueue/gateway/suite_test.go | 51 +++++++++++++ 15 files changed, 208 insertions(+), 30 deletions(-) rename submitqueue/{orchestrator => gateway}/controller/log/BUILD.bazel (90%) rename submitqueue/{orchestrator => gateway}/controller/log/log.go (92%) rename submitqueue/{orchestrator => gateway}/controller/log/log_test.go (99%) diff --git a/doc/rfc/submitqueue/workflow.md b/doc/rfc/submitqueue/workflow.md index a3e10df0..52ad4e39 100644 --- a/doc/rfc/submitqueue/workflow.md +++ b/doc/rfc/submitqueue/workflow.md @@ -2,7 +2,7 @@ The orchestrator processes land requests through a queue-driven pipeline of small, single-purpose controllers. The gateway accepts a request over RPC and hands it off asynchronously; from there each controller consumes one topic, advances the request or batch, and publishes to the next topic. Most hops carry only an ID — the controller fetches the entity from storage — while a few entry points (`start`, `buildsignal`, `log`) carry the full payload because there is no row to fetch yet. -The pipeline has two cycles: `speculate → build → buildsignal → speculate` (CI feedback loop) and `merge → speculate` (advance the next batch). `conclude` is the only stage that transitions a request to a terminal state; `log` is an append-only sink that any controller can publish to via `submitqueue/core/request.PublishLog`. +The pipeline has two cycles: `speculate → build → buildsignal → speculate` (CI feedback loop) and `merge → speculate` (advance the next batch). `conclude` is the only stage that transitions a request to a terminal state; `log` is an append-only sink that any controller can publish to via `submitqueue/core/request.PublishLog`. The orchestrator only *publishes* request log events — the **gateway** is the sole consumer of the `log` topic and the only service that persists the request log to storage. ## Diagram @@ -14,8 +14,8 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate` │ LandRequest ▼ ┌──────────────────────┐ ┌──────────────────────────────────┐ - │ log (terminal sink) │◄───│ start │ - │ Append RequestLog │ │ Persist Request, emit Started │ + │ log (gateway sink) │◄───│ start │ + │ Persist RequestLog │ │ Persist Request, emit Started │ └──────────────────────┘ └────────────────┬─────────────────┘ ▲ │ RequestID │ ▼ @@ -79,4 +79,4 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate` | **buildsignal** | Build | speculate | Feed CI result back into speculation | | **merge** | BatchID | conclude, speculate | Merge the batch and advance the queue | | **conclude** | BatchID | — | Map terminal batch state to request state | -| **log** | RequestLog | — | Append-only sink for request log events | +| **log** | RequestLog | — | Gateway-owned sink: persists request log events to storage | diff --git a/example/submitqueue/docker-compose.yml b/example/submitqueue/docker-compose.yml index dc49e86f..1b3ed04b 100644 --- a/example/submitqueue/docker-compose.yml +++ b/example/submitqueue/docker-compose.yml @@ -55,6 +55,8 @@ services: - QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true # Path to YAML queue configuration baked into the image - QUEUE_CONFIG_PATH=/root/queues.yaml + # Stable subscriber name for the request-log consumer + - HOSTNAME=gateway-dev depends_on: mysql-app: condition: service_healthy diff --git a/example/submitqueue/gateway/server/BUILD.bazel b/example/submitqueue/gateway/server/BUILD.bazel index 86baf961..5a34d0e6 100644 --- a/example/submitqueue/gateway/server/BUILD.bazel +++ b/example/submitqueue/gateway/server/BUILD.bazel @@ -11,12 +11,16 @@ go_library( importpath = "github.com/uber/submitqueue/example/submitqueue/gateway/server", visibility = ["//visibility:private"], deps = [ + "//core/errs/generic", + "//core/errs/mysql", "//extension/counter/mysql", + "//extension/messagequeue", "//extension/messagequeue/mysql", "//submitqueue/core/consumer", "//submitqueue/extension/queueconfig/yaml", "//submitqueue/extension/storage/mysql", "//submitqueue/gateway/controller", + "//submitqueue/gateway/controller/log", "//submitqueue/gateway/protopb", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_uber_go_tally_v4//:tally", diff --git a/example/submitqueue/gateway/server/docker-compose.yml b/example/submitqueue/gateway/server/docker-compose.yml index 2b018f16..08a2f24d 100644 --- a/example/submitqueue/gateway/server/docker-compose.yml +++ b/example/submitqueue/gateway/server/docker-compose.yml @@ -55,6 +55,8 @@ services: - QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true # Path to YAML queue configuration baked into the image - QUEUE_CONFIG_PATH=/root/queues.yaml + # Stable subscriber name for the request-log consumer + - HOSTNAME=gateway-dev depends_on: mysql-app: condition: service_healthy diff --git a/example/submitqueue/gateway/server/main.go b/example/submitqueue/gateway/server/main.go index b2c814d8..d9ce8f48 100644 --- a/example/submitqueue/gateway/server/main.go +++ b/example/submitqueue/gateway/server/main.go @@ -28,12 +28,16 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/uber-go/tally/v4" + genericerrs "github.com/uber/submitqueue/core/errs/generic" + mysqlerrs "github.com/uber/submitqueue/core/errs/mysql" mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" + extqueue "github.com/uber/submitqueue/extension/messagequeue" queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql" "github.com/uber/submitqueue/submitqueue/core/consumer" yamlqueueconfig "github.com/uber/submitqueue/submitqueue/extension/queueconfig/yaml" mysqlstorage "github.com/uber/submitqueue/submitqueue/extension/storage/mysql" "github.com/uber/submitqueue/submitqueue/gateway/controller" + logctrl "github.com/uber/submitqueue/submitqueue/gateway/controller/log" pb "github.com/uber/submitqueue/submitqueue/gateway/protopb" "go.uber.org/zap" "google.golang.org/grpc" @@ -174,12 +178,29 @@ func run() error { zap.String("queue_dsn", queueDSN), ) - // Build a publish-only topic registry: gateway only feeds the start of the - // orchestrator pipeline (TopicKeyStart) and the cancel topic (TopicKeyCancel). - // No subscription is configured because the gateway never consumes from the queue. + // Stable subscriber name for the log-topic consumer. Falls back to a + // time-seeded name when HOSTNAME is unset (e.g. local runs). + subscriberName := os.Getenv("HOSTNAME") + if subscriberName == "" { + subscriberName = fmt.Sprintf("gateway-%d", time.Now().Unix()) + } + + // Build the topic registry. The gateway publishes to the start of the + // orchestrator pipeline (TopicKeyStart) and the cancel topic (TopicKeyCancel) — + // both publish-only. It additionally consumes the log topic (TopicKeyLog): + // the gateway is the sole writer of the request log, persisting entries that + // the orchestrator publishes there. registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ {Key: consumer.TopicKeyStart, Name: "start", Queue: mysqlQueue}, {Key: consumer.TopicKeyCancel, Name: "cancel", Queue: mysqlQueue}, + { + Key: consumer.TopicKeyLog, + Name: "log", + Queue: mysqlQueue, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "gateway-log", + ), + }, }) if err != nil { return fmt.Errorf("failed to create topic registry: %w", err) @@ -201,7 +222,8 @@ func run() error { // Initialize storage from the shared app database connection. The land // controller writes to this store directly; cancel/status use the request - // log store directly. + // log store directly. The log consumer (registered below) is the sole + // persister of request log entries published by the orchestrator. store, err := mysqlstorage.NewStorage(appDB, scope.SubScope("storage")) if err != nil { return fmt.Errorf("failed to create storage: %w", err) @@ -236,6 +258,29 @@ func run() error { // Register reflection service for debugging with grpcurl reflection.Register(grpcServer) + // Create the queue consumer and register the log controller. The gateway is + // the sole persister of the request log: the orchestrator publishes entries + // to the log topic and this consumer writes them to storage. + logConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry, + // Storage (extension/storage/mysql) and queue (extension/messagequeue/mysql) + // both run on the same MySQL driver, so a single classifier covers + // errors surfaced from either backend. + genericerrs.Classifier, + mysqlerrs.Classifier, + ) + + logController := logctrl.NewController(logger.Sugar(), scope, store, consumer.TopicKeyLog, "gateway-log") + if err := logConsumer.Register(logController); err != nil { + return fmt.Errorf("failed to register log controller: %w", err) + } + + if err := logConsumer.Start(ctx); err != nil { + // The error can also be a result of a context cancellation due to SIGINT or SIGTERM. + // This is expected, just propagate it. + return fmt.Errorf("failed to start log consumer: %w", err) + } + logger.Info("log consumer started") + // Listen on configurable port port := os.Getenv("PORT") if port == "" { @@ -257,6 +302,8 @@ func run() error { // Wait for interrupt signal or server critical error // If interruption is signaled, gracefully stop the server + // If the server exits with an error, cancel the context to signal the consumer + // After this, stop the consumer // If an error happens during shutdown, return the actual error, not the context cancellation error var serverErr error select { @@ -273,10 +320,25 @@ func run() error { serverErr = <-serverErrCh case serverErr = <-serverErrCh: fmt.Println("Shutting down gateway server due to critical GRPC server error...") + + // Cancel the context to signal cancellation to the queue consumer + cancel() } if serverErr != nil { - err = fmt.Errorf("GRPC server exited with error: %w", serverErr) + serverErr = fmt.Errorf("GRPC server exited with error: %w", serverErr) + } + + // Stop the consumer with a 30s timeout; by this time the context should be + // cancelled and the processing threads may already be exiting; recollect them. + errStop := logConsumer.Stop(30000) + if errStop != nil { + errStop = fmt.Errorf("failed to stop consumer: %w", errStop) + } + + if errStop != nil || serverErr != nil { + // Override context cancellation error with the shutdown error + err = errors.Join(errStop, serverErr) } return err diff --git a/example/submitqueue/orchestrator/server/BUILD.bazel b/example/submitqueue/orchestrator/server/BUILD.bazel index 93c13e67..8d28c855 100644 --- a/example/submitqueue/orchestrator/server/BUILD.bazel +++ b/example/submitqueue/orchestrator/server/BUILD.bazel @@ -42,7 +42,6 @@ go_library( "//submitqueue/orchestrator/controller/buildsignal", "//submitqueue/orchestrator/controller/cancel", "//submitqueue/orchestrator/controller/conclude", - "//submitqueue/orchestrator/controller/log", "//submitqueue/orchestrator/controller/merge", "//submitqueue/orchestrator/controller/score", "//submitqueue/orchestrator/controller/speculate", diff --git a/example/submitqueue/orchestrator/server/main.go b/example/submitqueue/orchestrator/server/main.go index 6fc80ace..bd2fbe25 100644 --- a/example/submitqueue/orchestrator/server/main.go +++ b/example/submitqueue/orchestrator/server/main.go @@ -61,7 +61,6 @@ import ( "github.com/uber/submitqueue/submitqueue/orchestrator/controller/buildsignal" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/cancel" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/conclude" - logctrl "github.com/uber/submitqueue/submitqueue/orchestrator/controller/log" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/merge" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/score" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/speculate" @@ -238,7 +237,7 @@ func run() error { return err } - logger.Info("controllers registered", zap.Int("count", 11)) + logger.Info("controllers registered", zap.Int("count", 10)) // Start consumers if err := c.Start(ctx); err != nil { @@ -408,12 +407,12 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe ), }, { + // Publish-only: the orchestrator emits request log entries to the + // log topic but never persists them. The gateway is the sole + // consumer that writes the request log to storage. Key: consumer.TopicKeyLog, Name: "log", Queue: q, - Subscription: extqueue.DefaultSubscriptionConfig( - subscriberName, "orchestrator-log", - ), }, }) } @@ -605,17 +604,6 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t return fmt.Errorf("failed to register conclude controller: %w", err) } - logController := logctrl.NewController( - logger, - scope, - store, - consumer.TopicKeyLog, - "orchestrator-log", - ) - if err := c.Register(logController); err != nil { - return fmt.Errorf("failed to register log controller: %w", err) - } - return nil } diff --git a/submitqueue/gateway/README.md b/submitqueue/gateway/README.md index d739f424..2858fe77 100644 --- a/submitqueue/gateway/README.md +++ b/submitqueue/gateway/README.md @@ -1 +1,20 @@ -SubmitQueue Gateway +# SubmitQueue Gateway + +The gateway is the RPC entry point to SubmitQueue. It accepts `Land`, `Cancel`, +`Status`, and `Ping` calls, validates them at the edge, and hands work off to the +orchestrator pipeline asynchronously via the message queue. + +## Request log ownership + +The gateway is the **sole writer of the request log**. No other service persists +request log entries: + +- For statuses it produces synchronously (`accepted` on `Land`, `cancelling` on + `Cancel`), the gateway writes directly to storage so the entry is visible the + moment the RPC returns. +- For statuses produced downstream, the orchestrator only *publishes* entries to + the `log` topic via `submitqueue/core/request.PublishLog`. The gateway runs a + consumer that drains the `log` topic and persists each entry to storage. + +This keeps a single service responsible for the request log while letting the +orchestrator remain free of storage writes for it. diff --git a/submitqueue/orchestrator/controller/log/BUILD.bazel b/submitqueue/gateway/controller/log/BUILD.bazel similarity index 90% rename from submitqueue/orchestrator/controller/log/BUILD.bazel rename to submitqueue/gateway/controller/log/BUILD.bazel index f700931c..60647c61 100644 --- a/submitqueue/orchestrator/controller/log/BUILD.bazel +++ b/submitqueue/gateway/controller/log/BUILD.bazel @@ -3,7 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "log", srcs = ["log.go"], - importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/log", + importpath = "github.com/uber/submitqueue/submitqueue/gateway/controller/log", visibility = ["//visibility:public"], deps = [ "//core/metrics", diff --git a/submitqueue/orchestrator/controller/log/log.go b/submitqueue/gateway/controller/log/log.go similarity index 92% rename from submitqueue/orchestrator/controller/log/log.go rename to submitqueue/gateway/controller/log/log.go index 3c6212dd..311681a0 100644 --- a/submitqueue/orchestrator/controller/log/log.go +++ b/submitqueue/gateway/controller/log/log.go @@ -29,6 +29,10 @@ import ( // Controller handles log queue messages. // It consumes request log entries and persists them to storage. // Implements consumer.Controller interface for integration with the consumer. +// +// The request log is written exclusively by the gateway: other services +// (e.g. the orchestrator) only publish log entries to the log topic, and this +// controller is the single consumer that persists them to storage. type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope @@ -40,7 +44,7 @@ type Controller struct { // Verify Controller implements consumer.Controller interface at compile time. var _ consumer.Controller = (*Controller)(nil) -// NewController creates a new log controller for the orchestrator. +// NewController creates a new log controller for the gateway. func NewController( logger *zap.SugaredLogger, scope tally.Scope, diff --git a/submitqueue/orchestrator/controller/log/log_test.go b/submitqueue/gateway/controller/log/log_test.go similarity index 99% rename from submitqueue/orchestrator/controller/log/log_test.go rename to submitqueue/gateway/controller/log/log_test.go index f8a1319d..c88fad1a 100644 --- a/submitqueue/orchestrator/controller/log/log_test.go +++ b/submitqueue/gateway/controller/log/log_test.go @@ -35,7 +35,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope - return NewController(logger, scope, store, consumer.TopicKeyLog, "orchestrator-log") + return NewController(logger, scope, store, consumer.TopicKeyLog, "gateway-log") } func TestController_Process(t *testing.T) { diff --git a/test/e2e/submitqueue/BUILD.bazel b/test/e2e/submitqueue/BUILD.bazel index e6c24d41..cf2737d2 100644 --- a/test/e2e/submitqueue/BUILD.bazel +++ b/test/e2e/submitqueue/BUILD.bazel @@ -18,6 +18,7 @@ go_test( "integration", ], deps = [ + "//submitqueue/entity", "//submitqueue/gateway/protopb", "//submitqueue/orchestrator/protopb", "//test/testutil", diff --git a/test/e2e/submitqueue/suite_test.go b/test/e2e/submitqueue/suite_test.go index 1387da4b..2a844e87 100644 --- a/test/e2e/submitqueue/suite_test.go +++ b/test/e2e/submitqueue/suite_test.go @@ -27,10 +27,12 @@ import ( "database/sql" "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/uber/submitqueue/submitqueue/entity" gatewaypb "github.com/uber/submitqueue/submitqueue/gateway/protopb" orchestratorpb "github.com/uber/submitqueue/submitqueue/orchestrator/protopb" "github.com/uber/submitqueue/test/testutil" @@ -169,6 +171,44 @@ func (s *E2EIntegrationSuite) TestLandRequest_SinglePR() { s.log.Logf("Land request (single PR) succeeded: sqid=%s", resp.Sqid) } +// TestLandRequest_PersistsStartedLogViaGatewayConsumer verifies the request-log +// ownership invariant end-to-end: the orchestrator only *publishes* request log +// entries to the log topic (it never writes the request log itself), and the +// gateway's log consumer drains that topic and persists them to storage. +// +// We observe this through the gateway Status RPC: immediately after Land the +// status is "accepted" (the gateway's synchronous direct write), and once the +// orchestrator's start controller publishes "started" to the log topic, the +// gateway consumer persists it and Status advances to "started". Seeing +// "started" therefore proves the publish→consume→persist path works across both +// services. +func (s *E2EIntegrationSuite) TestLandRequest_PersistsStartedLogViaGatewayConsumer() { + t := s.T() + + landResp, err := s.gatewayClient.Land(s.ctx, &gatewaypb.LandRequest{ + Queue: "e2e-test-queue", + Change: &gatewaypb.Change{Uris: []string{"github://uber/e2e-startlog/pull/4242/abcdef0123456789abcdef0123456789abcdef01"}}, + Strategy: gatewaypb.Strategy_REBASE, + }) + require.NoError(t, err, "Land request failed") + require.NotEmpty(t, landResp.Sqid, "SQID should not be empty") + sqid := landResp.Sqid + s.log.Logf("Land succeeded: sqid=%s; waiting for gateway consumer to persist 'started'", sqid) + + require.Eventually(t, func() bool { + resp, statusErr := s.gatewayClient.Status(s.ctx, &gatewaypb.StatusRequest{Sqid: sqid}) + if statusErr != nil { + s.log.Logf("Status(%s) not ready yet: %v", sqid, statusErr) + return false + } + s.log.Logf("Status(%s) = %q", sqid, resp.Status) + return resp.Status == string(entity.RequestStatusStarted) + }, 30*time.Second, 500*time.Millisecond, + "request %s should reach status %q via the gateway log consumer", sqid, entity.RequestStatusStarted) + + s.log.Logf("Gateway consumer persisted orchestrator-published 'started' log for sqid=%s", sqid) +} + // TestCancelRequest_InvalidSqid verifies the gateway rejects an empty sqid // synchronously before publishing anything to the cancel queue. func (s *E2EIntegrationSuite) TestCancelRequest_InvalidSqid() { diff --git a/test/integration/submitqueue/gateway/BUILD.bazel b/test/integration/submitqueue/gateway/BUILD.bazel index 6fe77784..da3d0d41 100644 --- a/test/integration/submitqueue/gateway/BUILD.bazel +++ b/test/integration/submitqueue/gateway/BUILD.bazel @@ -16,11 +16,17 @@ go_test( "integration", ], deps = [ + "//extension/messagequeue/mysql", + "//submitqueue/core/consumer", + "//submitqueue/core/request", + "//submitqueue/entity", "//submitqueue/gateway/protopb", "//test/testutil", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_stretchr_testify//suite", + "@com_github_uber_go_tally_v4//:tally", "@org_golang_google_grpc//:grpc", + "@org_uber_go_zap//:zap", ], ) diff --git a/test/integration/submitqueue/gateway/suite_test.go b/test/integration/submitqueue/gateway/suite_test.go index b8406262..afa2764d 100644 --- a/test/integration/submitqueue/gateway/suite_test.go +++ b/test/integration/submitqueue/gateway/suite_test.go @@ -30,12 +30,19 @@ import ( "database/sql" "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/uber-go/tally/v4" + queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql" + "github.com/uber/submitqueue/submitqueue/core/consumer" + corerequest "github.com/uber/submitqueue/submitqueue/core/request" + "github.com/uber/submitqueue/submitqueue/entity" pb "github.com/uber/submitqueue/submitqueue/gateway/protopb" "github.com/uber/submitqueue/test/testutil" + "go.uber.org/zap" "google.golang.org/grpc" ) @@ -144,3 +151,47 @@ func (s *GatewayIntegrationSuite) TestLandAPI() { s.log.Logf("Land API test passed: request stored and message published") } + +// TestRequestLogConsumer verifies the gateway's log-topic consumer in isolation: +// no orchestrator runs in this stack, so the test itself publishes a request log +// entry to the log topic exactly as the orchestrator does in production (via +// submitqueue/core/request.PublishLog). The gateway is the sole writer of the +// request log; this asserts its consumer drains the log topic and persists the +// entry to storage, observable through the Status RPC. +func (s *GatewayIntegrationSuite) TestRequestLogConsumer() { + t := s.T() + + // Build a publisher against the shared queue database. NewQueue only wires up + // stores; nothing consumes until a subscriber is started, so this publish-only + // use does not interfere with the gateway container's consumer. + queue, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.queueDB, + Logger: zap.NewNop(), + MetricsScope: tally.NoopScope, + }) + require.NoError(t, err, "failed to create queue publisher") + defer queue.Close() + + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: consumer.TopicKeyLog, Name: "log", Queue: queue}, + }) + require.NoError(t, err, "failed to create topic registry") + + const sqid = "log-consumer-test/1" + logEntry := entity.NewRequestLog(sqid, entity.RequestStatusStarted, 1, "", nil) + require.NoError(t, corerequest.PublishLog(s.ctx, registry, logEntry, sqid), + "failed to publish request log to log topic") + + s.log.Logf("Published 'started' log for sqid=%s; waiting for gateway consumer to persist it", sqid) + + require.Eventually(t, func() bool { + resp, statusErr := s.client.Status(s.ctx, &pb.StatusRequest{Sqid: sqid}) + if statusErr != nil { + return false + } + return resp.Status == string(entity.RequestStatusStarted) + }, 30*time.Second, 500*time.Millisecond, + "gateway log consumer should persist the published request log for sqid=%s", sqid) + + s.log.Logf("Request log consumer test passed: entry persisted and readable via Status") +} From 43670be50200238a3d69a2d4e35bcc10c5c517cf Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Fri, 5 Jun 2026 15:07:48 -0700 Subject: [PATCH 2/2] refactor(request-log): address review on gateway sole-owner PR - Make the log-consumer subscriber name unique per instance (hostname+PID) so co-located gateway processes don't contend for the same partition lease. - Report the gRPC server error first in the shutdown errors.Join (it is the primary failure; consumer-stop is secondary cleanup). - Clarify in the README that the gateway is the sole owner (writer and reader) of the request log; Status/Cancel read directly, orchestrator only publishes. - Extract named poll constants (persistTimeout/persistPollInterval) in the gateway integration and e2e suites with a comment explaining that the in-container consumer is observed black-box via Status, so a bounded poll is used in lieu of an in-process channel/HookSignal wait. Follow-ups split out: design doc (#211) and DLQ PublishLog() (#212). Co-Authored-By: Oz --- example/submitqueue/gateway/server/main.go | 22 +++++++++++++------ submitqueue/gateway/README.md | 9 ++++++-- test/e2e/submitqueue/suite_test.go | 13 ++++++++++- .../submitqueue/gateway/suite_test.go | 13 ++++++++++- 4 files changed, 46 insertions(+), 11 deletions(-) diff --git a/example/submitqueue/gateway/server/main.go b/example/submitqueue/gateway/server/main.go index d9ce8f48..01452c69 100644 --- a/example/submitqueue/gateway/server/main.go +++ b/example/submitqueue/gateway/server/main.go @@ -178,12 +178,18 @@ func run() error { zap.String("queue_dsn", queueDSN), ) - // Stable subscriber name for the log-topic consumer. Falls back to a - // time-seeded name when HOSTNAME is unset (e.g. local runs). - subscriberName := os.Getenv("HOSTNAME") - if subscriberName == "" { - subscriberName = fmt.Sprintf("gateway-%d", time.Now().Unix()) + // Subscriber name for the log-topic consumer. It must be unique per running + // instance: SubscriberName identifies a subscriber for partition leases, so + // two gateway processes on the same host (sharing HOSTNAME) would otherwise + // contend for the same lease. Append the PID to keep co-located instances + // distinct; the PID is stable for the life of the process. Offset tracking + // stays keyed on the shared ConsumerGroup ("gateway-log"), not this name. + // Falls back to a time-seeded name when HOSTNAME is unset (e.g. local runs). + hostname := os.Getenv("HOSTNAME") + if hostname == "" { + hostname = fmt.Sprintf("gateway-%d", time.Now().Unix()) } + subscriberName := fmt.Sprintf("%s-%d", hostname, os.Getpid()) // Build the topic registry. The gateway publishes to the start of the // orchestrator pipeline (TopicKeyStart) and the cancel topic (TopicKeyCancel) — @@ -337,8 +343,10 @@ func run() error { } if errStop != nil || serverErr != nil { - // Override context cancellation error with the shutdown error - err = errors.Join(errStop, serverErr) + // Override context cancellation error with the shutdown error. The server + // error is the primary/root failure, so it leads; the consumer-stop error + // is secondary cleanup. + err = errors.Join(serverErr, errStop) } return err diff --git a/submitqueue/gateway/README.md b/submitqueue/gateway/README.md index 2858fe77..5903f009 100644 --- a/submitqueue/gateway/README.md +++ b/submitqueue/gateway/README.md @@ -6,8 +6,9 @@ orchestrator pipeline asynchronously via the message queue. ## Request log ownership -The gateway is the **sole writer of the request log**. No other service persists -request log entries: +The gateway is the **sole owner of the request log** — the only service that +both writes and reads it. No other service persists or reads request log +entries: - For statuses it produces synchronously (`accepted` on `Land`, `cancelling` on `Cancel`), the gateway writes directly to storage so the entry is visible the @@ -16,5 +17,9 @@ request log entries: the `log` topic via `submitqueue/core/request.PublishLog`. The gateway runs a consumer that drains the `log` topic and persists each entry to storage. +Reads are likewise gateway-only: the `Status` and `Cancel` RPCs read the request +log directly from storage. The orchestrator only *publishes* log entries and +never touches the request log store. + This keeps a single service responsible for the request log while letting the orchestrator remain free of storage writes for it. diff --git a/test/e2e/submitqueue/suite_test.go b/test/e2e/submitqueue/suite_test.go index 2a844e87..e914aeda 100644 --- a/test/e2e/submitqueue/suite_test.go +++ b/test/e2e/submitqueue/suite_test.go @@ -56,6 +56,17 @@ func TestE2EIntegration(t *testing.T) { suite.Run(t, new(E2EIntegrationSuite)) } +// The gateway log consumer runs inside the gateway-service container, so this +// suite can only observe persistence black-box through the Status RPC — there is +// no in-process channel/HookSignal to wait on across the container boundary. A +// bounded poll is therefore the deterministic-enough analog: persistTimeout is a +// safety net (a failure here means something is genuinely stuck, not a timing +// race), and persistPollInterval bounds how often we re-query. +const ( + persistTimeout = 30 * time.Second + persistPollInterval = 500 * time.Millisecond +) + func (s *E2EIntegrationSuite) SetupSuite() { t := s.T() s.ctx = context.Background() @@ -203,7 +214,7 @@ func (s *E2EIntegrationSuite) TestLandRequest_PersistsStartedLogViaGatewayConsum } s.log.Logf("Status(%s) = %q", sqid, resp.Status) return resp.Status == string(entity.RequestStatusStarted) - }, 30*time.Second, 500*time.Millisecond, + }, persistTimeout, persistPollInterval, "request %s should reach status %q via the gateway log consumer", sqid, entity.RequestStatusStarted) s.log.Logf("Gateway consumer persisted orchestrator-published 'started' log for sqid=%s", sqid) diff --git a/test/integration/submitqueue/gateway/suite_test.go b/test/integration/submitqueue/gateway/suite_test.go index afa2764d..1db104c8 100644 --- a/test/integration/submitqueue/gateway/suite_test.go +++ b/test/integration/submitqueue/gateway/suite_test.go @@ -60,6 +60,17 @@ func TestGatewayIntegration(t *testing.T) { suite.Run(t, new(GatewayIntegrationSuite)) } +// The log consumer runs inside the gateway-service container, so this suite can +// only observe persistence black-box through the Status RPC — there is no +// in-process channel/HookSignal to wait on across the container boundary. A +// bounded poll is therefore the deterministic-enough analog: persistTimeout is a +// safety net (a failure here means something is genuinely stuck, not a timing +// race), and persistPollInterval bounds how often we re-query. +const ( + persistTimeout = 30 * time.Second + persistPollInterval = 500 * time.Millisecond +) + func (s *GatewayIntegrationSuite) SetupSuite() { t := s.T() s.ctx = context.Background() @@ -190,7 +201,7 @@ func (s *GatewayIntegrationSuite) TestRequestLogConsumer() { return false } return resp.Status == string(entity.RequestStatusStarted) - }, 30*time.Second, 500*time.Millisecond, + }, persistTimeout, persistPollInterval, "gateway log consumer should persist the published request log for sqid=%s", sqid) s.log.Logf("Request log consumer test passed: entry persisted and readable via Status")