Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions doc/rfc/submitqueue/workflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

The orchestrator processes land requests through a queue-driven pipeline of small, single-purpose controllers. The gateway accepts a request over RPC and hands it off asynchronously; from there each controller consumes one topic, advances the request or batch, and publishes to the next topic. Most hops carry only an ID — the controller fetches the entity from storage — while a few entry points (`start`, `buildsignal`, `log`) carry the full payload because there is no row to fetch yet.

The pipeline has two cycles: `speculate → build → buildsignal → speculate` (CI feedback loop) and `merge → speculate` (advance the next batch). `conclude` is the only stage that transitions a request to a terminal state; `log` is an append-only sink that any controller can publish to via `submitqueue/core/request.PublishLog`.
The pipeline has two cycles: `speculate → build → buildsignal → speculate` (CI feedback loop) and `merge → speculate` (advance the next batch). `conclude` is the only stage that transitions a request to a terminal state; `log` is an append-only sink that any controller can publish to via `submitqueue/core/request.PublishLog`. The orchestrator only *publishes* request log events — the **gateway** is the sole consumer of the `log` topic and the only service that persists the request log to storage.

## Diagram

Expand All @@ -14,8 +14,8 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate`
│ LandRequest
┌──────────────────────┐ ┌──────────────────────────────────┐
│ log (terminal sink) │◄───│ start │
Append RequestLog │ │ Persist Request, emit Started │
│ log (gateway sink) │◄───│ start │
Persist RequestLog │ │ Persist Request, emit Started │
└──────────────────────┘ └────────────────┬─────────────────┘
▲ │ RequestID
│ ▼
Expand Down Expand Up @@ -79,4 +79,4 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate`
| **buildsignal** | Build | speculate | Feed CI result back into speculation |
| **merge** | BatchID | conclude, speculate | Merge the batch and advance the queue |
| **conclude** | BatchID | — | Map terminal batch state to request state |
| **log** | RequestLog | — | Append-only sink for request log events |
| **log** | RequestLog | — | Gateway-owned sink: persists request log events to storage |
2 changes: 2 additions & 0 deletions example/submitqueue/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ services:
- QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true
# Path to YAML queue configuration baked into the image
- QUEUE_CONFIG_PATH=/root/queues.yaml
# Stable subscriber name for the request-log consumer
- HOSTNAME=gateway-dev
depends_on:
mysql-app:
condition: service_healthy
Expand Down
4 changes: 4 additions & 0 deletions example/submitqueue/gateway/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ go_library(
importpath = "github.com/uber/submitqueue/example/submitqueue/gateway/server",
visibility = ["//visibility:private"],
deps = [
"//core/errs/generic",
"//core/errs/mysql",
"//extension/counter/mysql",
"//extension/messagequeue",
"//extension/messagequeue/mysql",
"//submitqueue/core/consumer",
"//submitqueue/extension/queueconfig/yaml",
"//submitqueue/extension/storage/mysql",
"//submitqueue/gateway/controller",
"//submitqueue/gateway/controller/log",
"//submitqueue/gateway/protopb",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_uber_go_tally_v4//:tally",
Expand Down
2 changes: 2 additions & 0 deletions example/submitqueue/gateway/server/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ services:
- QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true
# Path to YAML queue configuration baked into the image
- QUEUE_CONFIG_PATH=/root/queues.yaml
# Stable subscriber name for the request-log consumer
- HOSTNAME=gateway-dev
depends_on:
mysql-app:
condition: service_healthy
Expand Down
80 changes: 75 additions & 5 deletions example/submitqueue/gateway/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@ import (

_ "github.com/go-sql-driver/mysql"
"github.com/uber-go/tally/v4"
genericerrs "github.com/uber/submitqueue/core/errs/generic"
mysqlerrs "github.com/uber/submitqueue/core/errs/mysql"
mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql"
extqueue "github.com/uber/submitqueue/extension/messagequeue"
queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql"
"github.com/uber/submitqueue/submitqueue/core/consumer"
yamlqueueconfig "github.com/uber/submitqueue/submitqueue/extension/queueconfig/yaml"
mysqlstorage "github.com/uber/submitqueue/submitqueue/extension/storage/mysql"
"github.com/uber/submitqueue/submitqueue/gateway/controller"
logctrl "github.com/uber/submitqueue/submitqueue/gateway/controller/log"
pb "github.com/uber/submitqueue/submitqueue/gateway/protopb"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -174,12 +178,35 @@ func run() error {
zap.String("queue_dsn", queueDSN),
)

// Build a publish-only topic registry: gateway only feeds the start of the
// orchestrator pipeline (TopicKeyStart) and the cancel topic (TopicKeyCancel).
// No subscription is configured because the gateway never consumes from the queue.
// Subscriber name for the log-topic consumer. It must be unique per running
// instance: SubscriberName identifies a subscriber for partition leases, so
// two gateway processes on the same host (sharing HOSTNAME) would otherwise
// contend for the same lease. Append the PID to keep co-located instances
// distinct; the PID is stable for the life of the process. Offset tracking
// stays keyed on the shared ConsumerGroup ("gateway-log"), not this name.
// Falls back to a time-seeded name when HOSTNAME is unset (e.g. local runs).
hostname := os.Getenv("HOSTNAME")
if hostname == "" {
hostname = fmt.Sprintf("gateway-%d", time.Now().Unix())
}
subscriberName := fmt.Sprintf("%s-%d", hostname, os.Getpid())

// Build the topic registry. The gateway publishes to the start of the
// orchestrator pipeline (TopicKeyStart) and the cancel topic (TopicKeyCancel) —
// both publish-only. It additionally consumes the log topic (TopicKeyLog):
// the gateway is the sole writer of the request log, persisting entries that
// the orchestrator publishes there.
registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{
{Key: consumer.TopicKeyStart, Name: "start", Queue: mysqlQueue},
{Key: consumer.TopicKeyCancel, Name: "cancel", Queue: mysqlQueue},
{
Key: consumer.TopicKeyLog,
Name: "log",
Queue: mysqlQueue,
Subscription: extqueue.DefaultSubscriptionConfig(
subscriberName, "gateway-log",
),
},
})
if err != nil {
return fmt.Errorf("failed to create topic registry: %w", err)
Expand All @@ -201,7 +228,8 @@ func run() error {

// Initialize storage from the shared app database connection. The land
// controller writes to this store directly; cancel/status use the request
// log store directly.
// log store directly. The log consumer (registered below) is the sole
// persister of request log entries published by the orchestrator.
store, err := mysqlstorage.NewStorage(appDB, scope.SubScope("storage"))
if err != nil {
return fmt.Errorf("failed to create storage: %w", err)
Expand Down Expand Up @@ -236,6 +264,29 @@ func run() error {
// Register reflection service for debugging with grpcurl
reflection.Register(grpcServer)

// Create the queue consumer and register the log controller. The gateway is
// the sole persister of the request log: the orchestrator publishes entries
// to the log topic and this consumer writes them to storage.
logConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry,
// Storage (extension/storage/mysql) and queue (extension/messagequeue/mysql)
// both run on the same MySQL driver, so a single classifier covers
// errors surfaced from either backend.
genericerrs.Classifier,
mysqlerrs.Classifier,
)

logController := logctrl.NewController(logger.Sugar(), scope, store, consumer.TopicKeyLog, "gateway-log")
if err := logConsumer.Register(logController); err != nil {
return fmt.Errorf("failed to register log controller: %w", err)
}

if err := logConsumer.Start(ctx); err != nil {
// The error can also be a result of a context cancellation due to SIGINT or SIGTERM.
// This is expected, just propagate it.
return fmt.Errorf("failed to start log consumer: %w", err)
}
logger.Info("log consumer started")

// Listen on configurable port
port := os.Getenv("PORT")
if port == "" {
Expand All @@ -257,6 +308,8 @@ func run() error {

// Wait for interrupt signal or server critical error
// If interruption is signaled, gracefully stop the server
// If the server exits with an error, cancel the context to signal the consumer
// After this, stop the consumer
// If an error happens during shutdown, return the actual error, not the context cancellation error
var serverErr error
select {
Expand All @@ -273,10 +326,27 @@ func run() error {
serverErr = <-serverErrCh
case serverErr = <-serverErrCh:
fmt.Println("Shutting down gateway server due to critical GRPC server error...")

// Cancel the context to signal cancellation to the queue consumer
cancel()
}

if serverErr != nil {
err = fmt.Errorf("GRPC server exited with error: %w", serverErr)
serverErr = fmt.Errorf("GRPC server exited with error: %w", serverErr)
}

// Stop the consumer with a 30s timeout; by this time the context should be
// cancelled and the processing threads may already be exiting; recollect them.
errStop := logConsumer.Stop(30000)
if errStop != nil {
errStop = fmt.Errorf("failed to stop consumer: %w", errStop)
}

if errStop != nil || serverErr != nil {
// Override context cancellation error with the shutdown error. The server
// error is the primary/root failure, so it leads; the consumer-stop error
// is secondary cleanup.
err = errors.Join(serverErr, errStop)
}

return err
Expand Down
1 change: 0 additions & 1 deletion example/submitqueue/orchestrator/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ go_library(
"//submitqueue/orchestrator/controller/buildsignal",
"//submitqueue/orchestrator/controller/cancel",
"//submitqueue/orchestrator/controller/conclude",
"//submitqueue/orchestrator/controller/log",
"//submitqueue/orchestrator/controller/merge",
"//submitqueue/orchestrator/controller/score",
"//submitqueue/orchestrator/controller/speculate",
Expand Down
20 changes: 4 additions & 16 deletions example/submitqueue/orchestrator/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ import (
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/buildsignal"
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/cancel"
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/conclude"
logctrl "github.com/uber/submitqueue/submitqueue/orchestrator/controller/log"
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/merge"
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/score"
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/speculate"
Expand Down Expand Up @@ -238,7 +237,7 @@ func run() error {
return err
}

logger.Info("controllers registered", zap.Int("count", 11))
logger.Info("controllers registered", zap.Int("count", 10))

// Start consumers
if err := c.Start(ctx); err != nil {
Expand Down Expand Up @@ -408,12 +407,12 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
),
},
{
// Publish-only: the orchestrator emits request log entries to the
// log topic but never persists them. The gateway is the sole
// consumer that writes the request log to storage.
Key: consumer.TopicKeyLog,
Name: "log",
Queue: q,
Subscription: extqueue.DefaultSubscriptionConfig(
subscriberName, "orchestrator-log",
),
},
})
}
Expand Down Expand Up @@ -605,17 +604,6 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
return fmt.Errorf("failed to register conclude controller: %w", err)
}

logController := logctrl.NewController(
logger,
scope,
store,
consumer.TopicKeyLog,
"orchestrator-log",
)
if err := c.Register(logController); err != nil {
return fmt.Errorf("failed to register log controller: %w", err)
}

return nil
}

Expand Down
26 changes: 25 additions & 1 deletion submitqueue/gateway/README.md
Original file line number Diff line number Diff line change
@@ -1 +1,25 @@
SubmitQueue Gateway
# SubmitQueue Gateway

The gateway is the RPC entry point to SubmitQueue. It accepts `Land`, `Cancel`,
`Status`, and `Ping` calls, validates them at the edge, and hands work off to the
orchestrator pipeline asynchronously via the message queue.

## Request log ownership

The gateway is the **sole owner of the request log** — the only service that
both writes and reads it. No other service persists or reads request log
entries:

- For statuses it produces synchronously (`accepted` on `Land`, `cancelling` on
`Cancel`), the gateway writes directly to storage so the entry is visible the
moment the RPC returns.
- For statuses produced downstream, the orchestrator only *publishes* entries to
the `log` topic via `submitqueue/core/request.PublishLog`. The gateway runs a
consumer that drains the `log` topic and persists each entry to storage.

Reads are likewise gateway-only: the `Status` and `Cancel` RPCs read the request
log directly from storage. The orchestrator only *publishes* log entries and
never touches the request log store.

This keeps a single service responsible for the request log while letting the
orchestrator remain free of storage writes for it.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "log",
srcs = ["log.go"],
importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/log",
importpath = "github.com/uber/submitqueue/submitqueue/gateway/controller/log",
visibility = ["//visibility:public"],
deps = [
"//core/metrics",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
// Controller handles log queue messages.
// It consumes request log entries and persists them to storage.
// Implements consumer.Controller interface for integration with the consumer.
//
// The request log is written exclusively by the gateway: other services
// (e.g. the orchestrator) only publish log entries to the log topic, and this
// controller is the single consumer that persists them to storage.
type Controller struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
Expand All @@ -40,7 +44,7 @@ type Controller struct {
// Verify Controller implements consumer.Controller interface at compile time.
var _ consumer.Controller = (*Controller)(nil)

// NewController creates a new log controller for the orchestrator.
// NewController creates a new log controller for the gateway.
func NewController(
logger *zap.SugaredLogger,
scope tally.Scope,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock
logger := zaptest.NewLogger(t).Sugar()
scope := tally.NoopScope

return NewController(logger, scope, store, consumer.TopicKeyLog, "orchestrator-log")
return NewController(logger, scope, store, consumer.TopicKeyLog, "gateway-log")
}

func TestController_Process(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions test/e2e/submitqueue/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_test(
"integration",
],
deps = [
"//submitqueue/entity",
"//submitqueue/gateway/protopb",
"//submitqueue/orchestrator/protopb",
"//test/testutil",
Expand Down
Loading