diff --git a/doc/rfc/submitqueue/workflow.md b/doc/rfc/submitqueue/workflow.md index a3e10df0..52ad4e39 100644 --- a/doc/rfc/submitqueue/workflow.md +++ b/doc/rfc/submitqueue/workflow.md @@ -2,7 +2,7 @@ The orchestrator processes land requests through a queue-driven pipeline of small, single-purpose controllers. The gateway accepts a request over RPC and hands it off asynchronously; from there each controller consumes one topic, advances the request or batch, and publishes to the next topic. Most hops carry only an ID — the controller fetches the entity from storage — while a few entry points (`start`, `buildsignal`, `log`) carry the full payload because there is no row to fetch yet. -The pipeline has two cycles: `speculate → build → buildsignal → speculate` (CI feedback loop) and `merge → speculate` (advance the next batch). `conclude` is the only stage that transitions a request to a terminal state; `log` is an append-only sink that any controller can publish to via `submitqueue/core/request.PublishLog`. +The pipeline has two cycles: `speculate → build → buildsignal → speculate` (CI feedback loop) and `merge → speculate` (advance the next batch). `conclude` is the only stage that transitions a request to a terminal state; `log` is an append-only sink that any controller can publish to via `submitqueue/core/request.PublishLog`. The orchestrator only *publishes* request log events — the **gateway** is the sole consumer of the `log` topic and the only service that persists the request log to storage. ## Diagram @@ -14,8 +14,8 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate` │ LandRequest ▼ ┌──────────────────────┐ ┌──────────────────────────────────┐ - │ log (terminal sink) │◄───│ start │ - │ Append RequestLog │ │ Persist Request, emit Started │ + │ log (gateway sink) │◄───│ start │ + │ Persist RequestLog │ │ Persist Request, emit Started │ └──────────────────────┘ └────────────────┬─────────────────┘ ▲ │ RequestID │ ▼ @@ -79,4 +79,4 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate` | **buildsignal** | Build | speculate | Feed CI result back into speculation | | **merge** | BatchID | conclude, speculate | Merge the batch and advance the queue | | **conclude** | BatchID | — | Map terminal batch state to request state | -| **log** | RequestLog | — | Append-only sink for request log events | +| **log** | RequestLog | — | Gateway-owned sink: persists request log events to storage | diff --git a/example/submitqueue/docker-compose.yml b/example/submitqueue/docker-compose.yml index dc49e86f..1b3ed04b 100644 --- a/example/submitqueue/docker-compose.yml +++ b/example/submitqueue/docker-compose.yml @@ -55,6 +55,8 @@ services: - QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true # Path to YAML queue configuration baked into the image - QUEUE_CONFIG_PATH=/root/queues.yaml + # Stable subscriber name for the request-log consumer + - HOSTNAME=gateway-dev depends_on: mysql-app: condition: service_healthy diff --git a/example/submitqueue/gateway/server/BUILD.bazel b/example/submitqueue/gateway/server/BUILD.bazel index 86baf961..5a34d0e6 100644 --- a/example/submitqueue/gateway/server/BUILD.bazel +++ b/example/submitqueue/gateway/server/BUILD.bazel @@ -11,12 +11,16 @@ go_library( importpath = "github.com/uber/submitqueue/example/submitqueue/gateway/server", visibility = ["//visibility:private"], deps = [ + "//core/errs/generic", + "//core/errs/mysql", "//extension/counter/mysql", + "//extension/messagequeue", "//extension/messagequeue/mysql", "//submitqueue/core/consumer", "//submitqueue/extension/queueconfig/yaml", "//submitqueue/extension/storage/mysql", "//submitqueue/gateway/controller", + "//submitqueue/gateway/controller/log", "//submitqueue/gateway/protopb", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_uber_go_tally_v4//:tally", diff --git a/example/submitqueue/gateway/server/docker-compose.yml b/example/submitqueue/gateway/server/docker-compose.yml index 2b018f16..08a2f24d 100644 --- a/example/submitqueue/gateway/server/docker-compose.yml +++ b/example/submitqueue/gateway/server/docker-compose.yml @@ -55,6 +55,8 @@ services: - QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true # Path to YAML queue configuration baked into the image - QUEUE_CONFIG_PATH=/root/queues.yaml + # Stable subscriber name for the request-log consumer + - HOSTNAME=gateway-dev depends_on: mysql-app: condition: service_healthy diff --git a/example/submitqueue/gateway/server/main.go b/example/submitqueue/gateway/server/main.go index b2c814d8..01452c69 100644 --- a/example/submitqueue/gateway/server/main.go +++ b/example/submitqueue/gateway/server/main.go @@ -28,12 +28,16 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/uber-go/tally/v4" + genericerrs "github.com/uber/submitqueue/core/errs/generic" + mysqlerrs "github.com/uber/submitqueue/core/errs/mysql" mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" + extqueue "github.com/uber/submitqueue/extension/messagequeue" queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql" "github.com/uber/submitqueue/submitqueue/core/consumer" yamlqueueconfig "github.com/uber/submitqueue/submitqueue/extension/queueconfig/yaml" mysqlstorage "github.com/uber/submitqueue/submitqueue/extension/storage/mysql" "github.com/uber/submitqueue/submitqueue/gateway/controller" + logctrl "github.com/uber/submitqueue/submitqueue/gateway/controller/log" pb "github.com/uber/submitqueue/submitqueue/gateway/protopb" "go.uber.org/zap" "google.golang.org/grpc" @@ -174,12 +178,35 @@ func run() error { zap.String("queue_dsn", queueDSN), ) - // Build a publish-only topic registry: gateway only feeds the start of the - // orchestrator pipeline (TopicKeyStart) and the cancel topic (TopicKeyCancel). - // No subscription is configured because the gateway never consumes from the queue. + // Subscriber name for the log-topic consumer. It must be unique per running + // instance: SubscriberName identifies a subscriber for partition leases, so + // two gateway processes on the same host (sharing HOSTNAME) would otherwise + // contend for the same lease. Append the PID to keep co-located instances + // distinct; the PID is stable for the life of the process. Offset tracking + // stays keyed on the shared ConsumerGroup ("gateway-log"), not this name. + // Falls back to a time-seeded name when HOSTNAME is unset (e.g. local runs). + hostname := os.Getenv("HOSTNAME") + if hostname == "" { + hostname = fmt.Sprintf("gateway-%d", time.Now().Unix()) + } + subscriberName := fmt.Sprintf("%s-%d", hostname, os.Getpid()) + + // Build the topic registry. The gateway publishes to the start of the + // orchestrator pipeline (TopicKeyStart) and the cancel topic (TopicKeyCancel) — + // both publish-only. It additionally consumes the log topic (TopicKeyLog): + // the gateway is the sole writer of the request log, persisting entries that + // the orchestrator publishes there. registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ {Key: consumer.TopicKeyStart, Name: "start", Queue: mysqlQueue}, {Key: consumer.TopicKeyCancel, Name: "cancel", Queue: mysqlQueue}, + { + Key: consumer.TopicKeyLog, + Name: "log", + Queue: mysqlQueue, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "gateway-log", + ), + }, }) if err != nil { return fmt.Errorf("failed to create topic registry: %w", err) @@ -201,7 +228,8 @@ func run() error { // Initialize storage from the shared app database connection. The land // controller writes to this store directly; cancel/status use the request - // log store directly. + // log store directly. The log consumer (registered below) is the sole + // persister of request log entries published by the orchestrator. store, err := mysqlstorage.NewStorage(appDB, scope.SubScope("storage")) if err != nil { return fmt.Errorf("failed to create storage: %w", err) @@ -236,6 +264,29 @@ func run() error { // Register reflection service for debugging with grpcurl reflection.Register(grpcServer) + // Create the queue consumer and register the log controller. The gateway is + // the sole persister of the request log: the orchestrator publishes entries + // to the log topic and this consumer writes them to storage. + logConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry, + // Storage (extension/storage/mysql) and queue (extension/messagequeue/mysql) + // both run on the same MySQL driver, so a single classifier covers + // errors surfaced from either backend. + genericerrs.Classifier, + mysqlerrs.Classifier, + ) + + logController := logctrl.NewController(logger.Sugar(), scope, store, consumer.TopicKeyLog, "gateway-log") + if err := logConsumer.Register(logController); err != nil { + return fmt.Errorf("failed to register log controller: %w", err) + } + + if err := logConsumer.Start(ctx); err != nil { + // The error can also be a result of a context cancellation due to SIGINT or SIGTERM. + // This is expected, just propagate it. + return fmt.Errorf("failed to start log consumer: %w", err) + } + logger.Info("log consumer started") + // Listen on configurable port port := os.Getenv("PORT") if port == "" { @@ -257,6 +308,8 @@ func run() error { // Wait for interrupt signal or server critical error // If interruption is signaled, gracefully stop the server + // If the server exits with an error, cancel the context to signal the consumer + // After this, stop the consumer // If an error happens during shutdown, return the actual error, not the context cancellation error var serverErr error select { @@ -273,10 +326,27 @@ func run() error { serverErr = <-serverErrCh case serverErr = <-serverErrCh: fmt.Println("Shutting down gateway server due to critical GRPC server error...") + + // Cancel the context to signal cancellation to the queue consumer + cancel() } if serverErr != nil { - err = fmt.Errorf("GRPC server exited with error: %w", serverErr) + serverErr = fmt.Errorf("GRPC server exited with error: %w", serverErr) + } + + // Stop the consumer with a 30s timeout; by this time the context should be + // cancelled and the processing threads may already be exiting; recollect them. + errStop := logConsumer.Stop(30000) + if errStop != nil { + errStop = fmt.Errorf("failed to stop consumer: %w", errStop) + } + + if errStop != nil || serverErr != nil { + // Override context cancellation error with the shutdown error. The server + // error is the primary/root failure, so it leads; the consumer-stop error + // is secondary cleanup. + err = errors.Join(serverErr, errStop) } return err diff --git a/example/submitqueue/orchestrator/server/BUILD.bazel b/example/submitqueue/orchestrator/server/BUILD.bazel index 93c13e67..8d28c855 100644 --- a/example/submitqueue/orchestrator/server/BUILD.bazel +++ b/example/submitqueue/orchestrator/server/BUILD.bazel @@ -42,7 +42,6 @@ go_library( "//submitqueue/orchestrator/controller/buildsignal", "//submitqueue/orchestrator/controller/cancel", "//submitqueue/orchestrator/controller/conclude", - "//submitqueue/orchestrator/controller/log", "//submitqueue/orchestrator/controller/merge", "//submitqueue/orchestrator/controller/score", "//submitqueue/orchestrator/controller/speculate", diff --git a/example/submitqueue/orchestrator/server/main.go b/example/submitqueue/orchestrator/server/main.go index 6fc80ace..bd2fbe25 100644 --- a/example/submitqueue/orchestrator/server/main.go +++ b/example/submitqueue/orchestrator/server/main.go @@ -61,7 +61,6 @@ import ( "github.com/uber/submitqueue/submitqueue/orchestrator/controller/buildsignal" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/cancel" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/conclude" - logctrl "github.com/uber/submitqueue/submitqueue/orchestrator/controller/log" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/merge" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/score" "github.com/uber/submitqueue/submitqueue/orchestrator/controller/speculate" @@ -238,7 +237,7 @@ func run() error { return err } - logger.Info("controllers registered", zap.Int("count", 11)) + logger.Info("controllers registered", zap.Int("count", 10)) // Start consumers if err := c.Start(ctx); err != nil { @@ -408,12 +407,12 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe ), }, { + // Publish-only: the orchestrator emits request log entries to the + // log topic but never persists them. The gateway is the sole + // consumer that writes the request log to storage. Key: consumer.TopicKeyLog, Name: "log", Queue: q, - Subscription: extqueue.DefaultSubscriptionConfig( - subscriberName, "orchestrator-log", - ), }, }) } @@ -605,17 +604,6 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t return fmt.Errorf("failed to register conclude controller: %w", err) } - logController := logctrl.NewController( - logger, - scope, - store, - consumer.TopicKeyLog, - "orchestrator-log", - ) - if err := c.Register(logController); err != nil { - return fmt.Errorf("failed to register log controller: %w", err) - } - return nil } diff --git a/submitqueue/gateway/README.md b/submitqueue/gateway/README.md index d739f424..5903f009 100644 --- a/submitqueue/gateway/README.md +++ b/submitqueue/gateway/README.md @@ -1 +1,25 @@ -SubmitQueue Gateway +# SubmitQueue Gateway + +The gateway is the RPC entry point to SubmitQueue. It accepts `Land`, `Cancel`, +`Status`, and `Ping` calls, validates them at the edge, and hands work off to the +orchestrator pipeline asynchronously via the message queue. + +## Request log ownership + +The gateway is the **sole owner of the request log** — the only service that +both writes and reads it. No other service persists or reads request log +entries: + +- For statuses it produces synchronously (`accepted` on `Land`, `cancelling` on + `Cancel`), the gateway writes directly to storage so the entry is visible the + moment the RPC returns. +- For statuses produced downstream, the orchestrator only *publishes* entries to + the `log` topic via `submitqueue/core/request.PublishLog`. The gateway runs a + consumer that drains the `log` topic and persists each entry to storage. + +Reads are likewise gateway-only: the `Status` and `Cancel` RPCs read the request +log directly from storage. The orchestrator only *publishes* log entries and +never touches the request log store. + +This keeps a single service responsible for the request log while letting the +orchestrator remain free of storage writes for it. diff --git a/submitqueue/orchestrator/controller/log/BUILD.bazel b/submitqueue/gateway/controller/log/BUILD.bazel similarity index 90% rename from submitqueue/orchestrator/controller/log/BUILD.bazel rename to submitqueue/gateway/controller/log/BUILD.bazel index f700931c..60647c61 100644 --- a/submitqueue/orchestrator/controller/log/BUILD.bazel +++ b/submitqueue/gateway/controller/log/BUILD.bazel @@ -3,7 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "log", srcs = ["log.go"], - importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/log", + importpath = "github.com/uber/submitqueue/submitqueue/gateway/controller/log", visibility = ["//visibility:public"], deps = [ "//core/metrics", diff --git a/submitqueue/orchestrator/controller/log/log.go b/submitqueue/gateway/controller/log/log.go similarity index 92% rename from submitqueue/orchestrator/controller/log/log.go rename to submitqueue/gateway/controller/log/log.go index 3c6212dd..311681a0 100644 --- a/submitqueue/orchestrator/controller/log/log.go +++ b/submitqueue/gateway/controller/log/log.go @@ -29,6 +29,10 @@ import ( // Controller handles log queue messages. // It consumes request log entries and persists them to storage. // Implements consumer.Controller interface for integration with the consumer. +// +// The request log is written exclusively by the gateway: other services +// (e.g. the orchestrator) only publish log entries to the log topic, and this +// controller is the single consumer that persists them to storage. type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope @@ -40,7 +44,7 @@ type Controller struct { // Verify Controller implements consumer.Controller interface at compile time. var _ consumer.Controller = (*Controller)(nil) -// NewController creates a new log controller for the orchestrator. +// NewController creates a new log controller for the gateway. func NewController( logger *zap.SugaredLogger, scope tally.Scope, diff --git a/submitqueue/orchestrator/controller/log/log_test.go b/submitqueue/gateway/controller/log/log_test.go similarity index 99% rename from submitqueue/orchestrator/controller/log/log_test.go rename to submitqueue/gateway/controller/log/log_test.go index f8a1319d..c88fad1a 100644 --- a/submitqueue/orchestrator/controller/log/log_test.go +++ b/submitqueue/gateway/controller/log/log_test.go @@ -35,7 +35,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope - return NewController(logger, scope, store, consumer.TopicKeyLog, "orchestrator-log") + return NewController(logger, scope, store, consumer.TopicKeyLog, "gateway-log") } func TestController_Process(t *testing.T) { diff --git a/test/e2e/submitqueue/BUILD.bazel b/test/e2e/submitqueue/BUILD.bazel index e6c24d41..cf2737d2 100644 --- a/test/e2e/submitqueue/BUILD.bazel +++ b/test/e2e/submitqueue/BUILD.bazel @@ -18,6 +18,7 @@ go_test( "integration", ], deps = [ + "//submitqueue/entity", "//submitqueue/gateway/protopb", "//submitqueue/orchestrator/protopb", "//test/testutil", diff --git a/test/e2e/submitqueue/suite_test.go b/test/e2e/submitqueue/suite_test.go index 1387da4b..e914aeda 100644 --- a/test/e2e/submitqueue/suite_test.go +++ b/test/e2e/submitqueue/suite_test.go @@ -27,10 +27,12 @@ import ( "database/sql" "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/uber/submitqueue/submitqueue/entity" gatewaypb "github.com/uber/submitqueue/submitqueue/gateway/protopb" orchestratorpb "github.com/uber/submitqueue/submitqueue/orchestrator/protopb" "github.com/uber/submitqueue/test/testutil" @@ -54,6 +56,17 @@ func TestE2EIntegration(t *testing.T) { suite.Run(t, new(E2EIntegrationSuite)) } +// The gateway log consumer runs inside the gateway-service container, so this +// suite can only observe persistence black-box through the Status RPC — there is +// no in-process channel/HookSignal to wait on across the container boundary. A +// bounded poll is therefore the deterministic-enough analog: persistTimeout is a +// safety net (a failure here means something is genuinely stuck, not a timing +// race), and persistPollInterval bounds how often we re-query. +const ( + persistTimeout = 30 * time.Second + persistPollInterval = 500 * time.Millisecond +) + func (s *E2EIntegrationSuite) SetupSuite() { t := s.T() s.ctx = context.Background() @@ -169,6 +182,44 @@ func (s *E2EIntegrationSuite) TestLandRequest_SinglePR() { s.log.Logf("Land request (single PR) succeeded: sqid=%s", resp.Sqid) } +// TestLandRequest_PersistsStartedLogViaGatewayConsumer verifies the request-log +// ownership invariant end-to-end: the orchestrator only *publishes* request log +// entries to the log topic (it never writes the request log itself), and the +// gateway's log consumer drains that topic and persists them to storage. +// +// We observe this through the gateway Status RPC: immediately after Land the +// status is "accepted" (the gateway's synchronous direct write), and once the +// orchestrator's start controller publishes "started" to the log topic, the +// gateway consumer persists it and Status advances to "started". Seeing +// "started" therefore proves the publish→consume→persist path works across both +// services. +func (s *E2EIntegrationSuite) TestLandRequest_PersistsStartedLogViaGatewayConsumer() { + t := s.T() + + landResp, err := s.gatewayClient.Land(s.ctx, &gatewaypb.LandRequest{ + Queue: "e2e-test-queue", + Change: &gatewaypb.Change{Uris: []string{"github://uber/e2e-startlog/pull/4242/abcdef0123456789abcdef0123456789abcdef01"}}, + Strategy: gatewaypb.Strategy_REBASE, + }) + require.NoError(t, err, "Land request failed") + require.NotEmpty(t, landResp.Sqid, "SQID should not be empty") + sqid := landResp.Sqid + s.log.Logf("Land succeeded: sqid=%s; waiting for gateway consumer to persist 'started'", sqid) + + require.Eventually(t, func() bool { + resp, statusErr := s.gatewayClient.Status(s.ctx, &gatewaypb.StatusRequest{Sqid: sqid}) + if statusErr != nil { + s.log.Logf("Status(%s) not ready yet: %v", sqid, statusErr) + return false + } + s.log.Logf("Status(%s) = %q", sqid, resp.Status) + return resp.Status == string(entity.RequestStatusStarted) + }, persistTimeout, persistPollInterval, + "request %s should reach status %q via the gateway log consumer", sqid, entity.RequestStatusStarted) + + s.log.Logf("Gateway consumer persisted orchestrator-published 'started' log for sqid=%s", sqid) +} + // TestCancelRequest_InvalidSqid verifies the gateway rejects an empty sqid // synchronously before publishing anything to the cancel queue. func (s *E2EIntegrationSuite) TestCancelRequest_InvalidSqid() { diff --git a/test/integration/submitqueue/gateway/BUILD.bazel b/test/integration/submitqueue/gateway/BUILD.bazel index 6fe77784..da3d0d41 100644 --- a/test/integration/submitqueue/gateway/BUILD.bazel +++ b/test/integration/submitqueue/gateway/BUILD.bazel @@ -16,11 +16,17 @@ go_test( "integration", ], deps = [ + "//extension/messagequeue/mysql", + "//submitqueue/core/consumer", + "//submitqueue/core/request", + "//submitqueue/entity", "//submitqueue/gateway/protopb", "//test/testutil", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_stretchr_testify//suite", + "@com_github_uber_go_tally_v4//:tally", "@org_golang_google_grpc//:grpc", + "@org_uber_go_zap//:zap", ], ) diff --git a/test/integration/submitqueue/gateway/suite_test.go b/test/integration/submitqueue/gateway/suite_test.go index b8406262..1db104c8 100644 --- a/test/integration/submitqueue/gateway/suite_test.go +++ b/test/integration/submitqueue/gateway/suite_test.go @@ -30,12 +30,19 @@ import ( "database/sql" "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/uber-go/tally/v4" + queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql" + "github.com/uber/submitqueue/submitqueue/core/consumer" + corerequest "github.com/uber/submitqueue/submitqueue/core/request" + "github.com/uber/submitqueue/submitqueue/entity" pb "github.com/uber/submitqueue/submitqueue/gateway/protopb" "github.com/uber/submitqueue/test/testutil" + "go.uber.org/zap" "google.golang.org/grpc" ) @@ -53,6 +60,17 @@ func TestGatewayIntegration(t *testing.T) { suite.Run(t, new(GatewayIntegrationSuite)) } +// The log consumer runs inside the gateway-service container, so this suite can +// only observe persistence black-box through the Status RPC — there is no +// in-process channel/HookSignal to wait on across the container boundary. A +// bounded poll is therefore the deterministic-enough analog: persistTimeout is a +// safety net (a failure here means something is genuinely stuck, not a timing +// race), and persistPollInterval bounds how often we re-query. +const ( + persistTimeout = 30 * time.Second + persistPollInterval = 500 * time.Millisecond +) + func (s *GatewayIntegrationSuite) SetupSuite() { t := s.T() s.ctx = context.Background() @@ -144,3 +162,47 @@ func (s *GatewayIntegrationSuite) TestLandAPI() { s.log.Logf("Land API test passed: request stored and message published") } + +// TestRequestLogConsumer verifies the gateway's log-topic consumer in isolation: +// no orchestrator runs in this stack, so the test itself publishes a request log +// entry to the log topic exactly as the orchestrator does in production (via +// submitqueue/core/request.PublishLog). The gateway is the sole writer of the +// request log; this asserts its consumer drains the log topic and persists the +// entry to storage, observable through the Status RPC. +func (s *GatewayIntegrationSuite) TestRequestLogConsumer() { + t := s.T() + + // Build a publisher against the shared queue database. NewQueue only wires up + // stores; nothing consumes until a subscriber is started, so this publish-only + // use does not interfere with the gateway container's consumer. + queue, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.queueDB, + Logger: zap.NewNop(), + MetricsScope: tally.NoopScope, + }) + require.NoError(t, err, "failed to create queue publisher") + defer queue.Close() + + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: consumer.TopicKeyLog, Name: "log", Queue: queue}, + }) + require.NoError(t, err, "failed to create topic registry") + + const sqid = "log-consumer-test/1" + logEntry := entity.NewRequestLog(sqid, entity.RequestStatusStarted, 1, "", nil) + require.NoError(t, corerequest.PublishLog(s.ctx, registry, logEntry, sqid), + "failed to publish request log to log topic") + + s.log.Logf("Published 'started' log for sqid=%s; waiting for gateway consumer to persist it", sqid) + + require.Eventually(t, func() bool { + resp, statusErr := s.client.Status(s.ctx, &pb.StatusRequest{Sqid: sqid}) + if statusErr != nil { + return false + } + return resp.Status == string(entity.RequestStatusStarted) + }, persistTimeout, persistPollInterval, + "gateway log consumer should persist the published request log for sqid=%s", sqid) + + s.log.Logf("Request log consumer test passed: entry persisted and readable via Status") +}