diff --git a/CHANGELOG.md b/CHANGELOG.md index 01e43314..70ce64b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `rivermigrate.ValidateOpts.TargetVersion` so validation can check migrations up to a specific target version, matching the target-version behavior available on `Migrate` and `MigrateTx`. Notably, this is a breaking API change as the validate functions previously didn't take any options. [PR #1259](https://github.com/riverqueue/river/pull/1259) +### Fixed + +- Fixed `rivertest.Worker.Work` and `WorkJob` to honor a configured custom `Config.Schema` when transitioning a job to its running state. Previously, the running-state update ran unqualified and could fail on a connection whose `search_path` didn't include the configured schema. [PR #1262](https://github.com/riverqueue/river/pull/1262) + ## [0.38.0] - 2026-05-22 ### Added diff --git a/rivertest/worker.go b/rivertest/worker.go index 45c5fb8b..8da01ae0 100644 --- a/rivertest/worker.go +++ b/rivertest/worker.go @@ -166,6 +166,7 @@ func (w *Worker[T, TTx]) workJob(ctx context.Context, tb testing.TB, tx TTx, job AttemptedAtDoUpdate: true, AttemptedBy: append(job.AttemptedBy, w.config.ID), AttemptedByDoUpdate: true, + Schema: w.config.Schema, StateDoUpdate: true, State: rivertype.JobStateRunning, }) diff --git a/rivertest/worker_test.go b/rivertest/worker_test.go index 1e0d5680..ced315c5 100644 --- a/rivertest/worker_test.go +++ b/rivertest/worker_test.go @@ -450,6 +450,34 @@ func TestWorker_Work(t *testing.T) { require.True(t, middlewareCalled) require.True(t, middlewareWithBaseServiceCalled) }) + + // Honors config.Schema: River lives in a named schema, worked through a + // transaction with an empty search_path so tables resolve only via schema + // qualification. Needs its own infrastructure, so it skips setup. + t.Run("CustomSchema", func(t *testing.T) { + t.Parallel() + + var ( + dbPool = riversharedtest.DBPool(ctx, t) + driver = riverpgxv5.New(dbPool) + schema = riverdbtest.TestSchema(ctx, t, driver, nil) + config = &river.Config{ID: "rivertest-worker", Schema: schema} + ) + + tx, err := dbPool.Begin(ctx) + require.NoError(t, err) + t.Cleanup(func() { _ = tx.Rollback(ctx) }) + + worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error { + require.Equal(t, rivertype.JobStateRunning, job.State) + return nil + }) + + tw := NewWorker(t, driver, config, worker) + res, err := tw.Work(ctx, t, tx, testArgs{Value: "test"}, nil) + require.NoError(t, err) + require.Equal(t, river.EventKindJobCompleted, res.EventKind) + }) } func TestWorker_WorkJob(t *testing.T) { @@ -556,4 +584,38 @@ func TestWorker_WorkJob(t *testing.T) { require.ErrorContains(t, err, "failed to update job to running state") require.Nil(t, res) }) + + // Honors config.Schema: River lives in a named schema, worked through a + // transaction with an empty search_path so tables resolve only via schema + // qualification. Needs its own infrastructure, so it skips setup. + t.Run("CustomSchema", func(t *testing.T) { + t.Parallel() + + var ( + dbPool = riversharedtest.DBPool(ctx, t) + driver = riverpgxv5.New(dbPool) + schema = riverdbtest.TestSchema(ctx, t, driver, nil) + config = &river.Config{ID: "rivertest-workjob", Schema: schema} + ) + + tx, err := dbPool.Begin(ctx) + require.NoError(t, err) + t.Cleanup(func() { _ = tx.Rollback(ctx) }) + + client, err := river.NewClient(driver, config) + require.NoError(t, err) + + insertRes, err := client.InsertTx(ctx, tx, testArgs{Value: "test"}, nil) + require.NoError(t, err) + + worker := river.WorkFunc(func(ctx context.Context, job *river.Job[testArgs]) error { + require.Equal(t, rivertype.JobStateRunning, job.State) + return nil + }) + + tw := NewWorker(t, driver, config, worker) + res, err := tw.WorkJob(ctx, t, tx, insertRes.Job) + require.NoError(t, err) + require.Equal(t, river.EventKindJobCompleted, res.EventKind) + }) }