diff --git a/example/submitqueue/orchestrator/server/main.go b/example/submitqueue/orchestrator/server/main.go index 11a03d20..5595cc89 100644 --- a/example/submitqueue/orchestrator/server/main.go +++ b/example/submitqueue/orchestrator/server/main.go @@ -515,16 +515,17 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logger, scope, store, - // TODO: replace with a real scorer + // Heuristic scorer: bucket the batch by total lines changed across all of + // its changes — larger batches are likelier to fail to land. scorerFactory{impl: heuristic.New( []heuristic.Bucket{ - {Min: 0, Max: 1, Score: 0.95}, - {Min: 2, Max: 5, Score: 0.80}, - {Min: 6, Max: 20, Score: 0.60}, - {Min: 21, Max: 1<<31 - 1, Score: 0.40}, + {Min: 0, Max: 50, Score: 0.95}, + {Min: 51, Max: 250, Score: 0.80}, + {Min: 251, Max: 1000, Score: 0.60}, + {Min: 1001, Max: 1<<31 - 1, Score: 0.40}, }, - func(_ context.Context, change entity.Change) (int, error) { - return len(change.URIs), nil + func(_ context.Context, changes entity.BatchChanges) (int, error) { + return changes.TotalLinesChanged(), nil }, scope.SubScope("scorer"), )}, diff --git a/submitqueue/entity/BUILD.bazel b/submitqueue/entity/BUILD.bazel index 39a077dc..21a57602 100644 --- a/submitqueue/entity/BUILD.bazel +++ b/submitqueue/entity/BUILD.bazel @@ -4,9 +4,11 @@ go_library( name = "entity", srcs = [ "batch.go", + "batch_changes.go", "batch_dependent.go", "build.go", "cancel_request.go", + "change_provider.go", "change_record.go", "land_request.go", "queue_config.go", diff --git a/submitqueue/entity/batch_changes.go b/submitqueue/entity/batch_changes.go new file mode 100644 index 00000000..d97e8a9c --- /dev/null +++ b/submitqueue/entity/batch_changes.go @@ -0,0 +1,48 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +// BatchChanges is the normalized, batch-level view of all changes in a batch, +// assembled by the score controller and handed to a Scorer. A Batch references +// only request IDs, so the controller resolves each request's change records and +// flattens their details into Changes — giving the scorer the whole batch's +// change facts in one value without coupling it to storage. +type BatchChanges struct { + // BatchID is the batch being scored. Format: "/batch/". + BatchID string + // Queue is the queue the batch belongs to. + Queue string + // Changes is every change (URI + provider-supplied details) across all requests + // in the batch. Order is unspecified. + Changes []ChangeInfo +} + +// TotalLinesChanged returns the total number of lines touched across every change in the batch. +func (b BatchChanges) TotalLinesChanged() int { + total := 0 + for _, c := range b.Changes { + total += c.Details.TotalLinesChanged() + } + return total +} + +// TotalFiles returns the total number of files touched across every change in the batch. +func (b BatchChanges) TotalFiles() int { + total := 0 + for _, c := range b.Changes { + total += c.Details.FileCount() + } + return total +} diff --git a/submitqueue/entity/change_provider.go b/submitqueue/entity/change_provider.go new file mode 100644 index 00000000..9ffacc0a --- /dev/null +++ b/submitqueue/entity/change_provider.go @@ -0,0 +1,76 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +// Author represents the author of a change. +type Author struct { + // Name is the display name of the author. + Name string `json:"name"` + // Email is the email address of the author. + Email string `json:"email"` +} + +// ChangedFile represents a single file modification in a change. +type ChangedFile struct { + // Path is the file path relative to the repository root. + Path string `json:"path"` + // LinesAdded is the number of lines added in this file. + LinesAdded int `json:"lines_added"` + // LinesDeleted is the number of lines deleted in this file. + LinesDeleted int `json:"lines_deleted"` + // LinesModified is the number of lines modified in this file. Some providers + // (e.g. GitHub) report only additions and deletions and leave this zero. + LinesModified int `json:"lines_modified"` +} + +// TotalLines returns the total number of lines touched in this file. +func (f ChangedFile) TotalLines() int { + return f.LinesAdded + f.LinesDeleted + f.LinesModified +} + +// ChangeDetails holds the provider-supplied facts about a single change (author, +// modified files, line counts). It carries no identity — the owning URI lives on +// ChangeInfo (provider correlation) and ChangeRecord (persisted claim). +type ChangeDetails struct { + // Author is the author of the change. + Author Author `json:"author"` + // ChangedFiles is the list of files modified in this change. Order is unspecified. + ChangedFiles []ChangedFile `json:"changed_files,omitempty"` +} + +// TotalLinesChanged returns the total number of lines touched across all files in the change. +func (d ChangeDetails) TotalLinesChanged() int { + total := 0 + for _, f := range d.ChangedFiles { + total += f.TotalLines() + } + return total +} + +// FileCount returns the number of files touched in the change. +func (d ChangeDetails) FileCount() int { + return len(d.ChangedFiles) +} + +// ChangeInfo maps a change URI to its details. It is the change provider's return +// type: for a Change with multiple URIs (e.g. a stacked PR set), the provider +// returns one ChangeInfo per URI so callers can correlate results to inputs by URI. +type ChangeInfo struct { + // URI is the full change URI for correlation with the input request + // (e.g., "github://uber/repo/pull/98/c3a4d5e6f7890123456789abcdef0123456789ab"). + URI string `json:"uri"` + // Details is the provider-supplied facts for this URI. + Details ChangeDetails `json:"details"` +} diff --git a/submitqueue/entity/change_record.go b/submitqueue/entity/change_record.go index 8ba0ff7d..0024a7fa 100644 --- a/submitqueue/entity/change_record.go +++ b/submitqueue/entity/change_record.go @@ -15,9 +15,9 @@ package entity // ChangeRecord represents a single URI's claim by a request, persisted in the change store. -// The (Queue, URI, RequestID) triple is the identity and is immutable; Metadata may be -// updated over time as additional information about the change (e.g., PR title, author, -// mergeability) becomes available. +// The whole record is immutable: the (Queue, URI, RequestID) triple is its identity and the +// Details (author, changed files, line counts) are captured once at claim time from the +// change provider. There is no update path. type ChangeRecord struct { // URI identifies the change (RFC 3986). Same scheme/format as entity.Change.URIs. // Example: "github://uber/submitqueue/pull/123/c3a4d5e6f7890123456789abcdef0123456789ab". @@ -29,7 +29,7 @@ type ChangeRecord struct { // RequestID participates in the change-store primary key so that concurrent claims // by different requests on the same URI coexist as distinct rows. Same-request // retries collide on the PK and are absorbed idempotently; different-request - // collisions surface as additional rows that callers detect via FindOverlapping. + // collisions surface as additional rows that callers detect via GetByURI. RequestID string `json:"request_id"` // Queue is the queue the owning request belongs to. It is the leading column of @@ -37,21 +37,20 @@ type ChangeRecord struct { // scans and the table is shardable by queue. Queue string `json:"queue"` - // Metadata is a JSON-encoded blob of provider-specific information about the change - // (e.g., PR title, author, mergeable state). Stored as `'{}'` when no metadata has - // been populated yet; updated by downstream enrichment. - Metadata string `json:"metadata,omitempty"` + // Details holds the provider-supplied facts about the change (author, changed + // files, line counts). It is captured at claim time (the validate controller, after + // fetching from the change provider) and written once with the record — records are + // immutable, so Details is never updated after Create. + Details ChangeDetails `json:"details"` - // CreatedAt is the Unix milliseconds timestamp when this record was first created. + // CreatedAt is the Unix milliseconds timestamp when this record was created. CreatedAt int64 `json:"created_at"` - // UpdatedAt is the Unix milliseconds timestamp when this record's Metadata was last updated. - // Equal to CreatedAt when the record has never been updated. + // UpdatedAt is the Unix milliseconds timestamp when this record was created. Records + // are immutable, so it always equals CreatedAt; retained for schema symmetry. UpdatedAt int64 `json:"updated_at"` - // Version is the optimistic-locking counter for mutable fields (Metadata). - // Starts at 1 on Create and is incremented by callers on every update. - // Mirrors the request-store convention: callers compute newVersion = oldVersion + 1 - // and pass both to the update method; the store performs a pure conditional write. + // Version is the record version. Records are immutable, so it is always 1; retained + // for schema symmetry with the other stores. Version int32 `json:"version"` } diff --git a/submitqueue/extension/changeprovider/change_provider.go b/submitqueue/extension/changeprovider/change_provider.go index 47a91144..9b4b7433 100644 --- a/submitqueue/extension/changeprovider/change_provider.go +++ b/submitqueue/extension/changeprovider/change_provider.go @@ -22,44 +22,17 @@ import ( "github.com/uber/submitqueue/submitqueue/entity" ) -// User represents the author of a change. -type User struct { - // Name is the display name of the user. - Name string - // Email is the email address of the user. - Email string -} - -// ChangedFile represents a single file modification in a change. -type ChangedFile struct { - // Path is the file path relative to the repository root. - Path string - // Patch is the diff patch content for this file. - Patch string - // LinesAdded is the number of lines added in this file. - LinesAdded int - // LinesDeleted is the number of lines deleted in this file. - LinesDeleted int -} - -// ChangeInfo contains metadata and file changes for a code change. -type ChangeInfo struct { - // URI is the full change URI for correlation with the input request - // (e.g., "github://uber/repo/pull/98/c3a4d5e6f7890123456789abcdef0123456789ab" or "phab://D123/xyz789"). - URI string - // User is the author of the change. - User User - // ChangedFiles is the list of files modified in this change. Order is unspecified. - ChangedFiles []ChangedFile -} - // ChangeProvider fetches change metadata from external systems // Each implementation is configured for a specific provider (GitHub, GitLab, Phabricator). +// +// The change value types it produces — entity.ChangeInfo, entity.ChangeDetails, +// entity.Author, entity.ChangedFile — live in the entity package so the same typed +// facts can be persisted (entity.ChangeRecord) and scored without re-declaration. type ChangeProvider interface { // Get retrieves change information for the provided Change. // For a Change with multiple URIs (e.g., stacked PRs), returns one ChangeInfo per URI. // Returns a slice of ChangeInfo, one for each change in the stack. - Get(ctx context.Context, change entity.Change) ([]ChangeInfo, error) + Get(ctx context.Context, change entity.Change) ([]entity.ChangeInfo, error) } // Config carries the per-queue identity handed to a Factory. The system knows diff --git a/submitqueue/extension/changeprovider/github/convert.go b/submitqueue/extension/changeprovider/github/convert.go index 40c28de1..474e500a 100644 --- a/submitqueue/extension/changeprovider/github/convert.go +++ b/submitqueue/extension/changeprovider/github/convert.go @@ -1,32 +1,33 @@ package github import ( + "github.com/uber/submitqueue/submitqueue/entity" entitygithub "github.com/uber/submitqueue/submitqueue/entity/github" - "github.com/uber/submitqueue/submitqueue/extension/changeprovider" ) -// convertToChangeInfo converts GitHub PR data to ChangeInfo. -func convertToChangeInfo(parsed entitygithub.ChangeID, prData *pullRequestData) changeprovider.ChangeInfo { - changedFiles := convertFiles(prData.Files.Nodes) - - return changeprovider.ChangeInfo{ +// convertToChangeInfo converts GitHub PR data to an entity.ChangeInfo. +func convertToChangeInfo(parsed entitygithub.ChangeID, prData *pullRequestData) entity.ChangeInfo { + return entity.ChangeInfo{ URI: parsed.String(), - User: changeprovider.User{ - Name: prData.Author.Name, - Email: prData.Author.Email, + Details: entity.ChangeDetails{ + Author: entity.Author{ + Name: prData.Author.Name, + Email: prData.Author.Email, + }, + ChangedFiles: convertFiles(prData.Files.Nodes), }, - ChangedFiles: changedFiles, } } -// convertFiles converts GitHub file nodes to ChangedFile structs. -func convertFiles(nodes []fileNode) []changeprovider.ChangedFile { - changedFiles := make([]changeprovider.ChangedFile, 0, len(nodes)) +// convertFiles converts GitHub file nodes to entity.ChangedFile structs. +// GitHub's API reports only additions and deletions per file, so LinesModified +// is left zero here. +func convertFiles(nodes []fileNode) []entity.ChangedFile { + changedFiles := make([]entity.ChangedFile, 0, len(nodes)) for _, file := range nodes { - changedFiles = append(changedFiles, changeprovider.ChangedFile{ + changedFiles = append(changedFiles, entity.ChangedFile{ Path: file.Path, - Patch: file.Patch, LinesAdded: file.Additions, LinesDeleted: file.Deletions, }) diff --git a/submitqueue/extension/changeprovider/github/graphql.go b/submitqueue/extension/changeprovider/github/graphql.go index 0d527710..fc24ee5b 100644 --- a/submitqueue/extension/changeprovider/github/graphql.go +++ b/submitqueue/extension/changeprovider/github/graphql.go @@ -34,7 +34,6 @@ query($owner: String!, $repo: String!, $prNumber: Int!, $filesCursor: String) { additions deletions changeType - patch } } } @@ -98,7 +97,6 @@ type fileNode struct { Additions int `json:"additions"` Deletions int `json:"deletions"` ChangeType string `json:"changeType"` - Patch string `json:"patch"` } // buildGraphQLRequest builds a GraphQL request for fetching pull request data. diff --git a/submitqueue/extension/changeprovider/github/graphql_test.go b/submitqueue/extension/changeprovider/github/graphql_test.go index 605f449d..7eab8e45 100644 --- a/submitqueue/extension/changeprovider/github/graphql_test.go +++ b/submitqueue/extension/changeprovider/github/graphql_test.go @@ -77,7 +77,7 @@ func TestParseGraphQLResponse(t *testing.T) { "files": { "totalCount": 1, "pageInfo": {"endCursor": "cur1", "hasNextPage": false}, - "nodes": [{"path": "main.go", "additions": 10, "deletions": 2, "changeType": "MODIFIED", "patch": "diff content"}] + "nodes": [{"path": "main.go", "additions": 10, "deletions": 2, "changeType": "MODIFIED"}] } } } @@ -90,7 +90,7 @@ func TestParseGraphQLResponse(t *testing.T) { Files: filesData{ TotalCount: 1, PageInfo: pageInfo{EndCursor: "cur1", HasNextPage: false}, - Nodes: []fileNode{{Path: "main.go", Additions: 10, Deletions: 2, ChangeType: "MODIFIED", Patch: "diff content"}}, + Nodes: []fileNode{{Path: "main.go", Additions: 10, Deletions: 2, ChangeType: "MODIFIED"}}, }, }, }, diff --git a/submitqueue/extension/changeprovider/github/provider.go b/submitqueue/extension/changeprovider/github/provider.go index 5c1274d1..6f86e621 100644 --- a/submitqueue/extension/changeprovider/github/provider.go +++ b/submitqueue/extension/changeprovider/github/provider.go @@ -44,7 +44,7 @@ func NewProvider(params Params) changeprovider.ChangeProvider { // Get retrieves change information from GitHub for the provided Change. // Returns one ChangeInfo per URI (one per PR in stacked changes). -func (p *provider) Get(ctx context.Context, change entity.Change) (_ []changeprovider.ChangeInfo, retErr error) { +func (p *provider) Get(ctx context.Context, change entity.Change) (_ []entity.ChangeInfo, retErr error) { op := coremetrics.Begin(p.metricsScope, "get") defer func() { op.Complete(retErr) }() @@ -85,8 +85,8 @@ func (p *provider) Get(ctx context.Context, change entity.Change) (_ []changepro func (p *provider) fetchAllPRs( ctx context.Context, changeIDs []entitygithub.ChangeID, -) ([]changeprovider.ChangeInfo, error) { - changeInfos := make([]changeprovider.ChangeInfo, 0, len(changeIDs)) +) ([]entity.ChangeInfo, error) { + changeInfos := make([]entity.ChangeInfo, 0, len(changeIDs)) for _, cid := range changeIDs { prData, err := p.fetchPullRequest(ctx, cid) @@ -109,7 +109,7 @@ func (p *provider) fetchAllPRs( "org", cid.Org, "repo", cid.Repo, "pr", cid.PRNumber, - "files_count", len(changeInfo.ChangedFiles), + "files_count", len(changeInfo.Details.ChangedFiles), "head_sha", prData.HeadRefOid, ) } diff --git a/submitqueue/extension/changeprovider/github/provider_test.go b/submitqueue/extension/changeprovider/github/provider_test.go index 3f57d51f..9ae9d0cb 100644 --- a/submitqueue/extension/changeprovider/github/provider_test.go +++ b/submitqueue/extension/changeprovider/github/provider_test.go @@ -115,7 +115,7 @@ func TestProvider_Get(t *testing.T) { require.NoError(t, err) require.Len(t, infos, 1) assert.Equal(t, tt.uris[0], infos[0].URI) - assert.Len(t, infos[0].ChangedFiles, 2) + assert.Len(t, infos[0].Details.ChangedFiles, 2) }) } } @@ -154,7 +154,7 @@ func TestProvider_Get_Pagination(t *testing.T) { require.NoError(t, err) assert.Equal(t, 2, callCount) require.Len(t, infos, 1) - assert.Len(t, infos[0].ChangedFiles, 2) + assert.Len(t, infos[0].Details.ChangedFiles, 2) } func TestProvider_Get_MultiplePRs(t *testing.T) { diff --git a/submitqueue/extension/changeprovider/mock/change_provider_mock.go b/submitqueue/extension/changeprovider/mock/change_provider_mock.go index 948c32a0..a1afc4ae 100644 --- a/submitqueue/extension/changeprovider/mock/change_provider_mock.go +++ b/submitqueue/extension/changeprovider/mock/change_provider_mock.go @@ -43,10 +43,10 @@ func (m *MockChangeProvider) EXPECT() *MockChangeProviderMockRecorder { } // Get mocks base method. -func (m *MockChangeProvider) Get(ctx context.Context, change entity.Change) ([]changeprovider.ChangeInfo, error) { +func (m *MockChangeProvider) Get(ctx context.Context, change entity.Change) ([]entity.ChangeInfo, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Get", ctx, change) - ret0, _ := ret[0].([]changeprovider.ChangeInfo) + ret0, _ := ret[0].([]entity.ChangeInfo) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/submitqueue/extension/scorer/composite/scorer.go b/submitqueue/extension/scorer/composite/scorer.go index aa46e8e7..82f84596 100644 --- a/submitqueue/extension/scorer/composite/scorer.go +++ b/submitqueue/extension/scorer/composite/scorer.go @@ -90,13 +90,13 @@ func New(scorers map[string]scorer.Scorer, reduce ReduceFunc, scope tally.Scope) // Score evaluates all child scorers and combines their results using the reduce function. // If any child scorer returns an error, that error is returned immediately. -func (c *compositeScorer) Score(ctx context.Context, change entity.Change) (ret float64, retErr error) { +func (c *compositeScorer) Score(ctx context.Context, changes entity.BatchChanges) (ret float64, retErr error) { op := metrics.Begin(c.scope, "score") defer func() { op.Complete(retErr) }() scores := make(map[string]float64, len(c.scorers)) for name, s := range c.scorers { - score, err := s.Score(ctx, change) + score, err := s.Score(ctx, changes) if err != nil { return 0, err } diff --git a/submitqueue/extension/scorer/composite/scorer_test.go b/submitqueue/extension/scorer/composite/scorer_test.go index 2abec0a1..a6b95878 100644 --- a/submitqueue/extension/scorer/composite/scorer_test.go +++ b/submitqueue/extension/scorer/composite/scorer_test.go @@ -31,14 +31,14 @@ type fixedScorer struct { score float64 } -func (f *fixedScorer) Score(_ context.Context, _ entity.Change) (float64, error) { +func (f *fixedScorer) Score(_ context.Context, _ entity.BatchChanges) (float64, error) { return f.score, nil } // errorScorer always returns an error. type errorScorer struct{} -func (e *errorScorer) Score(_ context.Context, _ entity.Change) (float64, error) { +func (e *errorScorer) Score(_ context.Context, _ entity.BatchChanges) (float64, error) { return 0, fmt.Errorf("scorer failed") } @@ -99,7 +99,7 @@ func TestScorer_Score(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := New(tt.scorers, tt.reduce, tally.NoopScope) - got, err := s.Score(context.Background(), entity.Change{}) + got, err := s.Score(context.Background(), entity.BatchChanges{}) require.NoError(t, err) assert.InDelta(t, tt.want, got, 1e-9) }) @@ -111,7 +111,7 @@ func TestScorer_Score_ChildError(t *testing.T) { "error": &errorScorer{}, "files": &fixedScorer{0.9}, }, Min, tally.NoopScope) - _, err := s.Score(context.Background(), entity.Change{}) + _, err := s.Score(context.Background(), entity.BatchChanges{}) require.Error(t, err) } @@ -140,7 +140,7 @@ func TestReduceFunc_ReceivesNames(t *testing.T) { "files": &fixedScorer{0.9}, "deps": &fixedScorer{0.95}, }, custom, tally.NoopScope) - got, err := s.Score(context.Background(), entity.Change{}) + got, err := s.Score(context.Background(), entity.BatchChanges{}) require.NoError(t, err) assert.Equal(t, 0.9, got) assert.ElementsMatch(t, []string{"files", "deps"}, receivedNames) diff --git a/submitqueue/extension/scorer/heuristic/scorer.go b/submitqueue/extension/scorer/heuristic/scorer.go index 94e81482..7203d0e6 100644 --- a/submitqueue/extension/scorer/heuristic/scorer.go +++ b/submitqueue/extension/scorer/heuristic/scorer.go @@ -24,8 +24,8 @@ import ( "github.com/uber/submitqueue/submitqueue/extension/scorer" ) -// ValueFunc extracts a single numeric value from a Change for bucketing. -type ValueFunc func(context.Context, entity.Change) (int, error) +// ValueFunc extracts a single numeric value from a batch of changes for bucketing. +type ValueFunc func(context.Context, entity.BatchChanges) (int, error) // Bucket defines a range [Min, Max] mapped to a probability Score. type Bucket struct { @@ -37,12 +37,12 @@ type Bucket struct { Score float64 } -// heuristicScorer computes a success probability by bucketing a metric extracted from a Change. +// heuristicScorer computes a success probability by bucketing a metric extracted from a batch of changes. // It follows the Java HeuristicsBasedSuccessPredictor pattern. type heuristicScorer struct { // buckets is the list of ranges to match against. buckets []Bucket - // valueFunc extracts the numeric value from a Change. + // valueFunc extracts the numeric value from a batch of changes. valueFunc ValueFunc // scope is the tally scope for emitting metrics. scope tally.Scope @@ -61,12 +61,12 @@ func New(buckets []Bucket, valueFunc ValueFunc, scope tally.Scope) scorer.Scorer } } -// Score extracts the value from the change, then returns the probability score for the first -// bucket whose range [Min, Max] contains the value. Returns an error if no bucket matches. -func (s *heuristicScorer) Score(ctx context.Context, change entity.Change) (ret float64, retErr error) { +// Score extracts the value from the batch of changes, then returns the probability score for the +// first bucket whose range [Min, Max] contains the value. Returns an error if no bucket matches. +func (s *heuristicScorer) Score(ctx context.Context, changes entity.BatchChanges) (ret float64, retErr error) { op := metrics.Begin(s.scope, "score") defer func() { op.Complete(retErr) }() - value, err := s.valueFunc(ctx, change) + value, err := s.valueFunc(ctx, changes) if err != nil { return 0, err } diff --git a/submitqueue/extension/scorer/heuristic/scorer_test.go b/submitqueue/extension/scorer/heuristic/scorer_test.go index 61c8cbf7..98d0bcce 100644 --- a/submitqueue/extension/scorer/heuristic/scorer_test.go +++ b/submitqueue/extension/scorer/heuristic/scorer_test.go @@ -26,7 +26,7 @@ import ( // staticValue returns a ValueFunc that always returns the given value. func staticValue(value int) ValueFunc { - return func(_ context.Context, _ entity.Change) (int, error) { + return func(_ context.Context, _ entity.BatchChanges) (int, error) { return value, nil } } @@ -107,7 +107,7 @@ func TestScorer_Score(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := New(tt.buckets, tt.valueFunc, tally.NoopScope) - got, err := s.Score(context.Background(), entity.Change{}) + got, err := s.Score(context.Background(), entity.BatchChanges{}) if tt.wantErr { require.Error(t, err) return @@ -119,11 +119,11 @@ func TestScorer_Score(t *testing.T) { } func TestScorer_Score_ValueFuncError(t *testing.T) { - failing := func(_ context.Context, _ entity.Change) (int, error) { + failing := func(_ context.Context, _ entity.BatchChanges) (int, error) { return 0, assert.AnError } s := New([]Bucket{{Min: 0, Max: 10, Score: 0.9}}, failing, tally.NoopScope) - _, err := s.Score(context.Background(), entity.Change{}) + _, err := s.Score(context.Background(), entity.BatchChanges{}) require.Error(t, err) } diff --git a/submitqueue/extension/scorer/mock/scorer_mock.go b/submitqueue/extension/scorer/mock/scorer_mock.go index 9cfc706f..72edc280 100644 --- a/submitqueue/extension/scorer/mock/scorer_mock.go +++ b/submitqueue/extension/scorer/mock/scorer_mock.go @@ -43,18 +43,18 @@ func (m *MockScorer) EXPECT() *MockScorerMockRecorder { } // Score mocks base method. -func (m *MockScorer) Score(ctx context.Context, change entity.Change) (float64, error) { +func (m *MockScorer) Score(ctx context.Context, changes entity.BatchChanges) (float64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Score", ctx, change) + ret := m.ctrl.Call(m, "Score", ctx, changes) ret0, _ := ret[0].(float64) ret1, _ := ret[1].(error) return ret0, ret1 } // Score indicates an expected call of Score. -func (mr *MockScorerMockRecorder) Score(ctx, change any) *gomock.Call { +func (mr *MockScorerMockRecorder) Score(ctx, changes any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Score", reflect.TypeOf((*MockScorer)(nil).Score), ctx, change) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Score", reflect.TypeOf((*MockScorer)(nil).Score), ctx, changes) } // MockFactory is a mock of Factory interface. diff --git a/submitqueue/extension/scorer/scorer.go b/submitqueue/extension/scorer/scorer.go index 8b0141ad..6837448e 100644 --- a/submitqueue/extension/scorer/scorer.go +++ b/submitqueue/extension/scorer/scorer.go @@ -22,11 +22,11 @@ import ( "github.com/uber/submitqueue/submitqueue/entity" ) -// Scorer computes a success probability score for a change based on its characteristics. +// Scorer computes a success probability score for a batch of changes based on their characteristics. type Scorer interface { // Score returns a probability between 0.0 and 1.0 indicating the likelihood - // of a successful land for the given change. - Score(ctx context.Context, change entity.Change) (float64, error) + // of a successful land for the given batch of changes. + Score(ctx context.Context, changes entity.BatchChanges) (float64, error) } // Config carries the per-queue identity handed to a Factory. The system knows diff --git a/submitqueue/extension/storage/change_store.go b/submitqueue/extension/storage/change_store.go index a995f33c..a95c460f 100644 --- a/submitqueue/extension/storage/change_store.go +++ b/submitqueue/extension/storage/change_store.go @@ -24,19 +24,19 @@ import ( // ChangeStore manages per-URI claim records for in-flight land requests. // Each row records that a specific URI was claimed by a specific request, scoped to a queue. -// The (Queue, URI, RequestID) triple is the immutable identity of a record. Metadata may -// evolve over time. +// The (Queue, URI, RequestID) triple is the immutable identity of a record, and a record's +// Details are captured at claim time and never updated — records are immutable. // // The interface is intentionally per-record / per-URI so that any backend (SQL, DynamoDB, // Bigtable, …) can implement it without needing batch-atomicity or multi-key query support. // Callers loop when they have multiple URIs to claim or check; the typical request has a // small number of URIs (a single PR or a short stack), so the loop overhead is negligible. type ChangeStore interface { - // Create persists a single ChangeRecord. A primary-key conflict on - // (Queue, URI, RequestID) is silently ignored, which makes the call - // idempotent under queue redeliveries of the same request. Records belonging - // to different requests do not conflict on the PK — cross-request overlap - // is detected by GetByURI, not by Create. + // Create persists a single ChangeRecord (identity + Details) in one write. A + // primary-key conflict on (Queue, URI, RequestID) is silently ignored, which makes + // the call idempotent under queue redeliveries of the same request (first write + // wins). Records belonging to different requests do not conflict on the PK — + // cross-request overlap is detected by GetByURI, not by Create. Create(ctx context.Context, record entity.ChangeRecord) error // GetByURI returns every ChangeRecord for the given (queue, uri). Multiple diff --git a/submitqueue/extension/storage/mysql/change_store.go b/submitqueue/extension/storage/mysql/change_store.go index c83ae92b..be25b18f 100644 --- a/submitqueue/extension/storage/mysql/change_store.go +++ b/submitqueue/extension/storage/mysql/change_store.go @@ -17,6 +17,7 @@ package mysql import ( "context" "database/sql" + "encoding/json" "fmt" "github.com/uber-go/tally/v4" @@ -43,16 +44,14 @@ func (s *changeStore) Create(ctx context.Context, record entity.ChangeRecord) (r op := metrics.Begin(s.scope, "create") defer func() { op.Complete(retErr) }() - // Use the empty JSON object as the canonical "no metadata yet" value. - // metadata is NOT NULL in the schema and the JSON column type rejects an empty string. - metadata := record.Metadata - if metadata == "" { - metadata = "{}" + detailsJSON, err := marshalDetails(record.Details) + if err != nil { + return fmt.Errorf("failed to marshal details for change record uri=%s request_id=%s: %w", record.URI, record.RequestID, err) } - const query = "INSERT IGNORE INTO `change` (uri, request_id, queue, metadata, created_at, updated_at, version) VALUES (?, ?, ?, ?, ?, ?, ?)" + const query = "INSERT IGNORE INTO `change` (uri, request_id, queue, details, created_at, updated_at, version) VALUES (?, ?, ?, ?, ?, ?, ?)" if _, err := s.db.ExecContext(ctx, query, - record.URI, record.RequestID, record.Queue, metadata, record.CreatedAt, record.UpdatedAt, record.Version, + record.URI, record.RequestID, record.Queue, detailsJSON, record.CreatedAt, record.UpdatedAt, record.Version, ); err != nil { return fmt.Errorf("failed to insert change record uri=%s request_id=%s: %w", record.URI, record.RequestID, err) } @@ -65,7 +64,7 @@ func (s *changeStore) GetByURI(ctx context.Context, queue string, uri string) (r op := metrics.Begin(s.scope, "get_by_uri") defer func() { op.Complete(retErr) }() - const query = "SELECT uri, request_id, queue, metadata, created_at, updated_at, version FROM `change` WHERE queue = ? AND uri = ?" + const query = "SELECT uri, request_id, queue, details, created_at, updated_at, version FROM `change` WHERE queue = ? AND uri = ?" rows, err := s.db.QueryContext(ctx, query, queue, uri) if err != nil { return nil, fmt.Errorf("failed to query change records for queue=%s uri=%s: %w", queue, uri, err) @@ -75,9 +74,13 @@ func (s *changeStore) GetByURI(ctx context.Context, queue string, uri string) (r var results []entity.ChangeRecord for rows.Next() { var rec entity.ChangeRecord - if err := rows.Scan(&rec.URI, &rec.RequestID, &rec.Queue, &rec.Metadata, &rec.CreatedAt, &rec.UpdatedAt, &rec.Version); err != nil { + var detailsJSON []byte + if err := rows.Scan(&rec.URI, &rec.RequestID, &rec.Queue, &detailsJSON, &rec.CreatedAt, &rec.UpdatedAt, &rec.Version); err != nil { return nil, fmt.Errorf("failed to scan change record for queue=%s uri=%s: %w", queue, uri, err) } + if err := json.Unmarshal(detailsJSON, &rec.Details); err != nil { + return nil, fmt.Errorf("failed to unmarshal details for change record queue=%s uri=%s request_id=%s: %w", queue, uri, rec.RequestID, err) + } results = append(results, rec) } if err := rows.Err(); err != nil { @@ -85,3 +88,10 @@ func (s *changeStore) GetByURI(ctx context.Context, queue string, uri string) (r } return results, nil } + +// marshalDetails serializes ChangeDetails to JSON for the NOT NULL `details` JSON +// column. A zero-value ChangeDetails marshals to a non-empty JSON object, so no +// empty-string special-casing is needed. +func marshalDetails(details entity.ChangeDetails) ([]byte, error) { + return json.Marshal(details) +} diff --git a/submitqueue/extension/storage/mysql/schema/change.sql b/submitqueue/extension/storage/mysql/schema/change.sql index d5397754..e808021b 100644 --- a/submitqueue/extension/storage/mysql/schema/change.sql +++ b/submitqueue/extension/storage/mysql/schema/change.sql @@ -7,7 +7,7 @@ CREATE TABLE IF NOT EXISTS `change` ( uri VARCHAR(255) NOT NULL, request_id VARCHAR(255) NOT NULL, queue VARCHAR(255) NOT NULL, - metadata JSON NOT NULL, + details JSON NOT NULL, created_at BIGINT NOT NULL, updated_at BIGINT NOT NULL, version INT NOT NULL, diff --git a/submitqueue/orchestrator/controller/score/score.go b/submitqueue/orchestrator/controller/score/score.go index 2b1c3e61..6df27ee5 100644 --- a/submitqueue/orchestrator/controller/score/score.go +++ b/submitqueue/orchestrator/controller/score/score.go @@ -173,27 +173,54 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return nil // Success - message will be acked } -// scoreBatch scores each request's change in the batch and returns the combined probability. -// Uses multiplicative probability: if any single request fails, the entire batch fails, -// so the batch score is the product of individual request scores. +// scoreBatch normalizes the batch's changes and scores them as a whole. It resolves +// each request in the batch, reads that request's change records (one per URI), and +// flattens their provider-supplied details into a single entity.BatchChanges, which +// the scorer turns into one probability for the batch. func (c *Controller) scoreBatch(ctx context.Context, batch entity.Batch) (float64, error) { sc, err := c.scorers.For(scorer.Config{QueueName: batch.Queue}) if err != nil { return 0, fmt.Errorf("failed to build scorer for batch %s: %w", batch.ID, err) } - score := 1.0 + + changes, err := c.collectBatchChanges(ctx, batch) + if err != nil { + return 0, err + } + + score, err := sc.Score(ctx, changes) + if err != nil { + return 0, fmt.Errorf("failed to score batch %s: %w", batch.ID, err) + } + return score, nil +} + +// collectBatchChanges assembles the normalized entity.BatchChanges for a batch by +// resolving each request and reading its change records per URI. For each URI it +// selects the record owned by the request (GetByURI returns rows for all requests +// that ever claimed the URI) and appends its URI + details. +func (c *Controller) collectBatchChanges(ctx context.Context, batch entity.Batch) (entity.BatchChanges, error) { + changes := entity.BatchChanges{BatchID: batch.ID, Queue: batch.Queue} for _, requestID := range batch.Contains { request, err := c.store.GetRequestStore().Get(ctx, requestID) if err != nil { - return 0, fmt.Errorf("failed to get request %s: %w", requestID, err) + return entity.BatchChanges{}, fmt.Errorf("failed to get request %s: %w", requestID, err) } - s, err := sc.Score(ctx, request.Change) - if err != nil { - return 0, fmt.Errorf("failed to score request %s: %w", requestID, err) + for _, uri := range request.Change.URIs { + records, err := c.store.GetChangeStore().GetByURI(ctx, batch.Queue, uri) + if err != nil { + return entity.BatchChanges{}, fmt.Errorf("failed to read change record for request %s uri=%s: %w", requestID, uri, err) + } + for _, rec := range records { + if rec.RequestID != requestID { + continue + } + changes.Changes = append(changes.Changes, entity.ChangeInfo{URI: rec.URI, Details: rec.Details}) + break + } } - score *= s } - return score, nil + return changes, nil } // publish publishes a batch ID to the specified topic key. diff --git a/submitqueue/orchestrator/controller/score/score_test.go b/submitqueue/orchestrator/controller/score/score_test.go index aec0c974..f57a9bb1 100644 --- a/submitqueue/orchestrator/controller/score/score_test.go +++ b/submitqueue/orchestrator/controller/score/score_test.go @@ -64,7 +64,28 @@ func testRequest() entity.Request { } } -// newMockStorage creates a MockStorage with a MockBatchStore and MockRequestStore. +// mockChangeStore returns a MockChangeStore that serves one self-owned ChangeRecord +// per URI for each request, mirroring what start.claimURIs + validate enrichment +// would have persisted. The score controller reads these via GetByURI to assemble +// the batch's changes. +func mockChangeStore(ctrl *gomock.Controller, requests ...entity.Request) *storagemock.MockChangeStore { + cs := storagemock.NewMockChangeStore(ctrl) + for _, req := range requests { + for _, uri := range req.Change.URIs { + rec := entity.ChangeRecord{ + URI: uri, + RequestID: req.ID, + Queue: req.Queue, + Details: entity.ChangeDetails{ChangedFiles: []entity.ChangedFile{{Path: "f.go", LinesAdded: 5}}}, + Version: 1, + } + cs.EXPECT().GetByURI(gomock.Any(), req.Queue, uri).Return([]entity.ChangeRecord{rec}, nil).AnyTimes() + } + } + return cs +} + +// newMockStorage creates a MockStorage with a MockBatchStore, MockRequestStore, and MockChangeStore. func newMockStorage(ctrl *gomock.Controller, batch entity.Batch, request entity.Request) *storagemock.MockStorage { mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes() @@ -76,6 +97,7 @@ func newMockStorage(ctrl *gomock.Controller, batch entity.Batch, request entity. store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() store.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() + store.EXPECT().GetChangeStore().Return(mockChangeStore(ctrl, request)).AnyTimes() return store } @@ -131,7 +153,7 @@ func TestController_Process_Success(t *testing.T) { store := newMockStorage(ctrl, batch, request) mockScorer := scorermock.NewMockScorer(ctrl) - mockScorer.EXPECT().Score(gomock.Any(), request.Change).Return(0.85, nil) + mockScorer.EXPECT().Score(gomock.Any(), gomock.Any()).Return(0.85, nil) controller := newTestController(t, ctrl, store, mockScorer, nil) @@ -144,7 +166,9 @@ func TestController_Process_Success(t *testing.T) { require.NoError(t, err) } -func TestController_Process_MultipleRequests_MinScore(t *testing.T) { +// TestController_Process_BatchLevelScore verifies the controller assembles all of the +// batch's changes into one BatchChanges and persists the single score the scorer returns. +func TestController_Process_BatchLevelScore(t *testing.T) { ctrl := gomock.NewController(t) batch := entity.Batch{ @@ -172,8 +196,8 @@ func TestController_Process_MultipleRequests_MinScore(t *testing.T) { mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) - // Expect the multiplicative score (0.9 * 0.6 = 0.54) to be persisted - mockBatchStore.EXPECT().UpdateScoreAndState(gomock.Any(), batch.ID, batch.Version, batch.Version+1, 0.54, entity.BatchStateScored).Return(nil) + // The single batch-level score is persisted. + mockBatchStore.EXPECT().UpdateScoreAndState(gomock.Any(), batch.ID, batch.Version, batch.Version+1, 0.7, entity.BatchStateScored).Return(nil) mockRequestStore := storagemock.NewMockRequestStore(ctrl) mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/1").Return(request1, nil) @@ -182,10 +206,17 @@ func TestController_Process_MultipleRequests_MinScore(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() store.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() + store.EXPECT().GetChangeStore().Return(mockChangeStore(ctrl, request1, request2)).AnyTimes() + // Capture the BatchChanges to assert both requests' changes were gathered. mockScorer := scorermock.NewMockScorer(ctrl) - mockScorer.EXPECT().Score(gomock.Any(), request1.Change).Return(0.9, nil) - mockScorer.EXPECT().Score(gomock.Any(), request2.Change).Return(0.6, nil) + mockScorer.EXPECT().Score(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, changes entity.BatchChanges) (float64, error) { + assert.Equal(t, batch.ID, changes.BatchID) + assert.Len(t, changes.Changes, 2) + return 0.7, nil + }, + ) controller := newTestController(t, ctrl, store, mockScorer, nil) @@ -234,9 +265,10 @@ func TestController_Process_ScorerFailure(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() store.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() + store.EXPECT().GetChangeStore().Return(mockChangeStore(ctrl, request)).AnyTimes() mockScorer := scorermock.NewMockScorer(ctrl) - mockScorer.EXPECT().Score(gomock.Any(), request.Change).Return(0.0, fmt.Errorf("no bucket matches value 99")) + mockScorer.EXPECT().Score(gomock.Any(), gomock.Any()).Return(0.0, fmt.Errorf("no bucket matches value 99")) controller := newTestController(t, ctrl, store, mockScorer, nil) @@ -265,9 +297,10 @@ func TestController_Process_UpdateScoreFailure(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() store.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() + store.EXPECT().GetChangeStore().Return(mockChangeStore(ctrl, request)).AnyTimes() mockScorer := scorermock.NewMockScorer(ctrl) - mockScorer.EXPECT().Score(gomock.Any(), request.Change).Return(0.85, nil) + mockScorer.EXPECT().Score(gomock.Any(), gomock.Any()).Return(0.85, nil) controller := newTestController(t, ctrl, store, mockScorer, nil) @@ -288,7 +321,7 @@ func TestController_Process_PublishFailure(t *testing.T) { store := newMockStorage(ctrl, batch, request) mockScorer := scorermock.NewMockScorer(ctrl) - mockScorer.EXPECT().Score(gomock.Any(), request.Change).Return(0.85, nil) + mockScorer.EXPECT().Score(gomock.Any(), gomock.Any()).Return(0.85, nil) controller := newTestController(t, ctrl, store, mockScorer, fmt.Errorf("publish failed")) diff --git a/submitqueue/orchestrator/controller/start/start.go b/submitqueue/orchestrator/controller/start/start.go index f389d139..9a442929 100644 --- a/submitqueue/orchestrator/controller/start/start.go +++ b/submitqueue/orchestrator/controller/start/start.go @@ -18,7 +18,6 @@ import ( "context" "errors" "fmt" - "time" "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/metrics" @@ -31,11 +30,11 @@ import ( ) // Controller handles start queue messages. -// It consumes requests, persists them to the request store, claims their URIs in -// the change store, and publishes to the validate stage. Both writes are idempotent -// on retries; the duplicate-detection check itself is performed downstream by the -// validate controller, which reads the change store and consults the request store -// for liveness. Implements consumer.Controller. +// It consumes requests, persists them to the request store, and publishes to the +// validate stage. The request write is idempotent on retries. URI claiming and +// duplicate detection are performed downstream by the validate controller (which +// claims each URI in the change store with its provider details and consults the +// request store for liveness). Implements consumer.Controller. type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope @@ -68,8 +67,8 @@ func NewController( } // Process processes a request delivery from the queue. -// Persists the request, claims its URIs in the change store, and publishes to validate. -// Returns nil to ack (success), or error to nack (retry). +// Persists the request and publishes to validate. Returns nil to ack (success), +// or error to nack (retry). func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { const opName = "process" @@ -113,15 +112,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return fmt.Errorf("failed to create request: %w", err) } - // Claim each URI in the change store. Different requests on the same URI write - // distinct rows (different request_id), so cross-request URI overlap does not - // collide on insert; the validate controller surfaces it via GetByURI + a - // liveness check against the request store. - if err := c.claimURIs(ctx, request); err != nil { - metrics.NamedCounter(c.metricsScope, opName, "change_store_errors", 1) - return fmt.Errorf("failed to claim URIs for request %s: %w", request.ID, err) - } - // Record the "new" status in the request log. logEntry := entity.NewRequestLog(request.ID, entity.RequestStatusStarted, request.Version, "", nil) if err := corerequest.PublishLog(ctx, c.registry, logEntry, request.ID); err != nil { @@ -142,27 +132,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return nil } -// claimURIs persists one ChangeRecord per URI in the request. Each Create call is -// independent; the change store's per-PK idempotency makes the loop safe under -// queue redelivery (same (Queue, URI, RequestID) is a no-op on retry). -func (c *Controller) claimURIs(ctx context.Context, request entity.Request) error { - now := time.Now().UnixMilli() - for _, uri := range request.Change.URIs { - record := entity.ChangeRecord{ - URI: uri, - RequestID: request.ID, - Queue: request.Queue, - CreatedAt: now, - UpdatedAt: now, - Version: 1, - } - if err := c.store.GetChangeStore().Create(ctx, record); err != nil { - return fmt.Errorf("failed to claim uri=%s for request %s: %w", uri, request.ID, err) - } - } - return nil -} - // publish publishes a request ID to the specified topic key. func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, requestID string, partitionKey string) error { rid := entity.RequestID{ID: requestID} diff --git a/submitqueue/orchestrator/controller/start/start_test.go b/submitqueue/orchestrator/controller/start/start_test.go index 04d90286..fb5c4c62 100644 --- a/submitqueue/orchestrator/controller/start/start_test.go +++ b/submitqueue/orchestrator/controller/start/start_test.go @@ -38,14 +38,11 @@ func newTestController( t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, - cs *storagemock.MockChangeStore, publishErr error, ) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope - store.EXPECT().GetChangeStore().Return(cs).AnyTimes() - mockPub := queuemock.NewMockPublisher(ctrl) mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, topic string, msg entityqueue.Message) error { @@ -77,13 +74,6 @@ func newMockStorage(ctrl *gomock.Controller) *storagemock.MockStorage { return store } -// newMockChangeStore returns a MockChangeStore that accepts any Create call. -func newMockChangeStore(ctrl *gomock.Controller) *storagemock.MockChangeStore { - cs := storagemock.NewMockChangeStore(ctrl) - cs.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - return cs -} - // makeDelivery builds a MockDelivery wrapping a serialized LandRequest. func makeDelivery(t *testing.T, ctrl *gomock.Controller, lr entity.LandRequest) *queuemock.MockDelivery { payload, err := lr.ToBytes() @@ -98,7 +88,7 @@ func makeDelivery(t *testing.T, ctrl *gomock.Controller, lr entity.LandRequest) func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, newMockStorage(ctrl), newMockChangeStore(ctrl), nil) + controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyStart, controller.TopicKey()) @@ -108,7 +98,7 @@ func TestNewController(t *testing.T) { func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, newMockStorage(ctrl), newMockChangeStore(ctrl), nil) + controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) delivery := makeDelivery(t, ctrl, entity.LandRequest{ ID: "test-queue/123", @@ -122,7 +112,7 @@ func TestController_Process_Success(t *testing.T) { func TestController_Process_InvalidJSON(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, newMockStorage(ctrl), newMockChangeStore(ctrl), nil) + controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) invalidPayload := []byte(`{"invalid": json"}`) msg := entityqueue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) @@ -149,7 +139,7 @@ func TestController_Process_ConstructsRequestWithStateAndVersion(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() - controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), nil) + controller := newTestController(t, ctrl, store, nil) landRequest := entity.LandRequest{ ID: "test-queue/42", @@ -181,7 +171,7 @@ func TestController_Process_AllStrategies(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, newMockStorage(ctrl), newMockChangeStore(ctrl), nil) + controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) delivery := makeDelivery(t, ctrl, entity.LandRequest{ ID: fmt.Sprintf("queue/%s", tt.strategy), @@ -195,48 +185,9 @@ func TestController_Process_AllStrategies(t *testing.T) { } } -func TestController_Process_MultipleChanges(t *testing.T) { - ctrl := gomock.NewController(t) - - cs := storagemock.NewMockChangeStore(ctrl) - var captured []entity.ChangeRecord - cs.EXPECT().Create(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, record entity.ChangeRecord) error { - captured = append(captured, record) - return nil - }, - ).Times(3) - - controller := newTestController(t, ctrl, newMockStorage(ctrl), cs, nil) - - uris := []string{ - "github://uber/monorepo/pull/1/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", - "github://uber/monorepo/pull/2/bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", - "github://uber/monorepo/pull/3/cccccccccccccccccccccccccccccccccccccccc", - } - delivery := makeDelivery(t, ctrl, entity.LandRequest{ - ID: "queue/999", - Queue: "test-queue", - Change: entity.Change{URIs: uris}, - LandStrategy: entity.RequestLandStrategySquashRebase, - }) - - require.NoError(t, controller.Process(context.Background(), delivery)) - - require.Len(t, captured, len(uris)) - for i, r := range captured { - assert.Equal(t, uris[i], r.URI) - assert.Equal(t, "queue/999", r.RequestID) - assert.Equal(t, "test-queue", r.Queue) - assert.Equal(t, int32(1), r.Version) - assert.Positive(t, r.CreatedAt) - assert.Equal(t, r.CreatedAt, r.UpdatedAt) - } -} - func TestController_Process_PublishFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, newMockStorage(ctrl), newMockChangeStore(ctrl), fmt.Errorf("publish failed")) + controller := newTestController(t, ctrl, newMockStorage(ctrl), fmt.Errorf("publish failed")) delivery := makeDelivery(t, ctrl, entity.LandRequest{ ID: "test-queue/123", @@ -256,7 +207,7 @@ func TestController_Process_StorageFailure(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() - controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), nil) + controller := newTestController(t, ctrl, store, nil) delivery := makeDelivery(t, ctrl, entity.LandRequest{ ID: "test-queue/123", @@ -278,7 +229,7 @@ func TestController_Process_AlreadyExistsSucceeds(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() - controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), nil) + controller := newTestController(t, ctrl, store, nil) delivery := makeDelivery(t, ctrl, entity.LandRequest{ ID: "test-queue/123", @@ -290,27 +241,9 @@ func TestController_Process_AlreadyExistsSucceeds(t *testing.T) { require.NoError(t, controller.Process(context.Background(), delivery)) } -func TestController_Process_ChangeStoreFailure(t *testing.T) { - ctrl := gomock.NewController(t) - - cs := storagemock.NewMockChangeStore(ctrl) - cs.EXPECT().Create(gomock.Any(), gomock.Any()).Return(fmt.Errorf("change store down")) - - controller := newTestController(t, ctrl, newMockStorage(ctrl), cs, nil) - - delivery := makeDelivery(t, ctrl, entity.LandRequest{ - ID: "test-queue/123", - Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/service/pull/1/789abc1234567890abcdef1234567890abcdef12"}}, - LandStrategy: entity.RequestLandStrategyRebase, - }) - - require.Error(t, controller.Process(context.Background(), delivery)) -} - func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, newMockStorage(ctrl), newMockChangeStore(ctrl), nil) + controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) var _ consumer.Controller = controller } diff --git a/submitqueue/orchestrator/controller/validate/BUILD.bazel b/submitqueue/orchestrator/controller/validate/BUILD.bazel index 5880202e..967490a3 100644 --- a/submitqueue/orchestrator/controller/validate/BUILD.bazel +++ b/submitqueue/orchestrator/controller/validate/BUILD.bazel @@ -29,7 +29,6 @@ go_test( "//extension/messagequeue/mock", "//submitqueue/core/consumer", "//submitqueue/entity", - "//submitqueue/extension/changeprovider", "//submitqueue/extension/changeprovider/mock", "//submitqueue/extension/mergechecker", "//submitqueue/extension/mergechecker/mock", diff --git a/submitqueue/orchestrator/controller/validate/validate.go b/submitqueue/orchestrator/controller/validate/validate.go index 4aa738e9..6f9a5d7d 100644 --- a/submitqueue/orchestrator/controller/validate/validate.go +++ b/submitqueue/orchestrator/controller/validate/validate.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/errs" @@ -172,6 +173,16 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r "total_files", totalFiles(changeInfos), ) + // Claim each URI in the change store with its provider details. The claim is + // created here — after duplicate detection and the merge/provider checks — so a + // rejected request never leaves a claim, and the record is written once with its + // details (immutable thereafter; no separate enrichment update). Create is + // idempotent per (queue, uri, request_id), so redelivery is a no-op. + if err := c.claimChanges(ctx, request, changeInfos); err != nil { + coremetrics.NamedCounter(c.metricsScope, "process", "change_store_errors", 1) + return fmt.Errorf("failed to claim change records for request %s: %w", request.ID, err) + } + // Publish to batch topic if err := c.publish(ctx, consumer.TopicKeyBatch, request.ID, request.Queue); err != nil { coremetrics.NamedCounter(c.metricsScope, "process", "publish_errors", 1) @@ -187,10 +198,12 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r } // checkDuplicate looks for any other in-flight request whose URIs overlap with this -// request's. The change rows are written upstream by the start controller; validate -// is read-only here. For each URI it queries the change store, walks the returned -// candidates skipping self/duplicates/orphans/terminals, and short-circuits on the -// first live duplicate. Returns that request_id, or "" if none. +// request's. It reads the change store before this request claims its own URIs +// (claimChanges runs later in Process), so it only sees rows written by other +// requests. For each URI it queries the change store, walks the returned candidates +// skipping self/duplicates/orphans/terminals, and short-circuits on the first live +// duplicate. Returns that request_id, or "" if none. Per-queue partition leasing +// serializes validate within a queue, so this read-then-claim sequence is race-free. // // Per-URI / per-record reads keep the contract backend-agnostic; the typical request // has 1-5 URIs, so the loop is cheap. @@ -254,11 +267,35 @@ func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request return nil } +// claimChanges persists one ChangeRecord per fetched ChangeInfo, capturing the +// provider details at claim time. The record's identity (queue, uri, request_id) +// and its Details are written together in a single immutable Create — there is no +// later mutation. Create is idempotent on its primary key, so a redelivery (or a +// prior partial attempt) is a no-op and the first write wins. +func (c *Controller) claimChanges(ctx context.Context, request entity.Request, infos []entity.ChangeInfo) error { + now := time.Now().UnixMilli() + for _, info := range infos { + record := entity.ChangeRecord{ + URI: info.URI, + RequestID: request.ID, + Queue: request.Queue, + Details: info.Details, + CreatedAt: now, + UpdatedAt: now, + Version: 1, + } + if err := c.store.GetChangeStore().Create(ctx, record); err != nil { + return fmt.Errorf("failed to claim uri=%s for request %s: %w", info.URI, request.ID, err) + } + } + return nil +} + // totalFiles returns the total number of files across all changeInfos. -func totalFiles(infos []changeprovider.ChangeInfo) int { +func totalFiles(infos []entity.ChangeInfo) int { total := 0 for _, info := range infos { - total += len(info.ChangedFiles) + total += info.Details.FileCount() } return total } diff --git a/submitqueue/orchestrator/controller/validate/validate_test.go b/submitqueue/orchestrator/controller/validate/validate_test.go index 4426e9a8..a8edf034 100644 --- a/submitqueue/orchestrator/controller/validate/validate_test.go +++ b/submitqueue/orchestrator/controller/validate/validate_test.go @@ -27,7 +27,6 @@ import ( queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" "github.com/uber/submitqueue/submitqueue/core/consumer" "github.com/uber/submitqueue/submitqueue/entity" - "github.com/uber/submitqueue/submitqueue/extension/changeprovider" changeprovidermock "github.com/uber/submitqueue/submitqueue/extension/changeprovider/mock" "github.com/uber/submitqueue/submitqueue/extension/mergechecker" mergecheckermock "github.com/uber/submitqueue/submitqueue/extension/mergechecker/mock" @@ -47,16 +46,18 @@ func requestIDPayload(t *testing.T, id string) []byte { // mockChangeProvider is a simple mock that returns test data. type mockChangeProvider struct{} -func (m *mockChangeProvider) Get(ctx context.Context, change entity.Change) ([]changeprovider.ChangeInfo, error) { - return []changeprovider.ChangeInfo{ +func (m *mockChangeProvider) Get(ctx context.Context, change entity.Change) ([]entity.ChangeInfo, error) { + return []entity.ChangeInfo{ { URI: "github://org/repo/pull/123/abcdef0123456789abcdef0123456789abcdef01", - User: changeprovider.User{ - Name: "Test User", - Email: "test@example.com", - }, - ChangedFiles: []changeprovider.ChangedFile{ - {Path: "main.go"}, + Details: entity.ChangeDetails{ + Author: entity.Author{ + Name: "Test User", + Email: "test@example.com", + }, + ChangedFiles: []entity.ChangedFile{ + {Path: "main.go"}, + }, }, }, }, nil @@ -80,12 +81,13 @@ func newMockStorage(ctrl *gomock.Controller, request entity.Request) (*storagemo return store, mockReqStore } -// newMockChangeStore creates a MockChangeStore with default no-overlap behavior. -// Tests that need to simulate overlap can override GetByURI with their own EXPECT. -// Validate is read-only against the change store — it never calls Create. +// newMockChangeStore creates a MockChangeStore with default no-overlap behavior +// (GetByURI returns nothing) and accepts the claim Create. Tests that need to +// simulate overlap or assert the claim override these with their own EXPECTs. func newMockChangeStore(ctrl *gomock.Controller) *storagemock.MockChangeStore { cs := storagemock.NewMockChangeStore(ctrl) cs.EXPECT().GetByURI(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + cs.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() return cs } @@ -171,6 +173,54 @@ func TestController_Process_Success(t *testing.T) { require.NoError(t, controller.Process(context.Background(), delivery)) } +// TestController_Process_ClaimsChangeRecordsWithDetails verifies that, on the happy +// path, validate creates a change record per fetched change, capturing the provider +// details in a single immutable Create. +func TestController_Process_ClaimsChangeRecordsWithDetails(t *testing.T) { + ctrl := gomock.NewController(t) + mc := newMergeableMock(ctrl) + + // The request's URI matches the URI the mock change provider returns, so the + // claim carries that change's details. + const uri = "github://org/repo/pull/123/abcdef0123456789abcdef0123456789abcdef01" + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{uri}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateStarted, + Version: 1, + } + store, _ := newMockStorage(ctrl, request) + + wantDetails := entity.ChangeDetails{ + Author: entity.Author{Name: "Test User", Email: "test@example.com"}, + ChangedFiles: []entity.ChangedFile{{Path: "main.go"}}, + } + cs := storagemock.NewMockChangeStore(ctrl) + // Duplicate-detection read finds no overlap. + cs.EXPECT().GetByURI(gomock.Any(), request.Queue, uri).Return(nil, nil).AnyTimes() + // Capture the record passed to Create; assert identity + details. + cs.EXPECT().Create(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, rec entity.ChangeRecord) error { + assert.Equal(t, uri, rec.URI) + assert.Equal(t, request.ID, rec.RequestID) + assert.Equal(t, request.Queue, rec.Queue) + assert.Equal(t, wantDetails, rec.Details) + return nil + }, + ) + + controller := newTestController(t, ctrl, store, cs, mc, nil) + + msg := entityqueue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + require.NoError(t, controller.Process(context.Background(), delivery)) +} + func TestController_Process_StorageFailure(t *testing.T) { ctrl := gomock.NewController(t) mc := newMergeableMock(ctrl) @@ -428,6 +478,9 @@ func TestController_Process_DuplicateDetection(t *testing.T) { for _, u := range uris { cs.EXPECT().GetByURI(gomock.Any(), queueName, u).Return(tt.byURI[u], nil).MaxTimes(1) } + // When no duplicate is found, the controller continues to fetch change info + // and claims each fetched change via Create. Accept any Create. + cs.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() controller := newTestController(t, ctrl, store, cs, mc, nil) diff --git a/test/integration/submitqueue/extension/storage/suite.go b/test/integration/submitqueue/extension/storage/suite.go index 32bf66ce..4b092e8b 100644 --- a/test/integration/submitqueue/extension/storage/suite.go +++ b/test/integration/submitqueue/extension/storage/suite.go @@ -336,28 +336,39 @@ func (s *StorageContractSuite) TestStorage_ChangeCreate_DifferentRequestSameURI( assert.Equal(t, []string{queue + "/1", queue + "/2"}, ids) } -// TestStorage_ChangeCreate_PreservesMetadata verifies metadata round-trips through the store. -func (s *StorageContractSuite) TestStorage_ChangeCreate_PreservesMetadata() { +// sampleDetails is a representative ChangeDetails reused across change-store contract tests. +func sampleDetails() entity.ChangeDetails { + return entity.ChangeDetails{ + Author: entity.Author{Name: "Ada Lovelace", Email: "ada@example.com"}, + ChangedFiles: []entity.ChangedFile{ + {Path: "main.go", LinesAdded: 10, LinesDeleted: 3, LinesModified: 2}, + {Path: "main_test.go", LinesAdded: 20, LinesDeleted: 0}, + }, + } +} + +// TestStorage_ChangeCreate_PreservesDetails verifies typed Details round-trip through the store. +func (s *StorageContractSuite) TestStorage_ChangeCreate_PreservesDetails() { t := s.T() ctx := s.ctx - const queue = "cq-meta" - const meta = `{"title":"add new feature"}` + const queue = "cq-details" + details := sampleDetails() require.NoError(t, s.storage.GetChangeStore().Create(ctx, entity.ChangeRecord{ - URI: changeURI, RequestID: queue + "/1", Queue: queue, Metadata: meta, CreatedAt: 1, UpdatedAt: 1, Version: 1, + URI: changeURI, RequestID: queue + "/1", Queue: queue, Details: details, CreatedAt: 1, UpdatedAt: 1, Version: 1, })) got, err := s.storage.GetChangeStore().GetByURI(ctx, queue, changeURI) require.NoError(t, err) require.Len(t, got, 1) - assert.JSONEq(t, meta, got[0].Metadata) + assert.Equal(t, details, got[0].Details) } -// TestStorage_ChangeCreate_EmptyMetadataStoredAsObject verifies empty metadata is stored as "{}". -func (s *StorageContractSuite) TestStorage_ChangeCreate_EmptyMetadataStoredAsObject() { +// TestStorage_ChangeCreate_EmptyDetails verifies a zero-value Details round-trips (stored as a JSON object). +func (s *StorageContractSuite) TestStorage_ChangeCreate_EmptyDetails() { t := s.T() ctx := s.ctx - const queue = "cq-emptymeta" + const queue = "cq-emptydetails" require.NoError(t, s.storage.GetChangeStore().Create(ctx, entity.ChangeRecord{ URI: changeURI, RequestID: queue + "/1", Queue: queue, CreatedAt: 1, UpdatedAt: 1, Version: 1, @@ -366,5 +377,5 @@ func (s *StorageContractSuite) TestStorage_ChangeCreate_EmptyMetadataStoredAsObj got, err := s.storage.GetChangeStore().GetByURI(ctx, queue, changeURI) require.NoError(t, err) require.Len(t, got, 1) - assert.JSONEq(t, "{}", got[0].Metadata) + assert.Equal(t, entity.ChangeDetails{}, got[0].Details) }