Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
| Topic | Location |
|--------|----------|
| Architecture and components | [docs/md/architecture.md](docs/md/architecture.md) |
| Database layout | [docs/md/database.md](docs/md/database.md) |
| Database layout and migrations | [docs/md/database.md](docs/md/database.md) |
| Local dev, tests, OpenAPI | [README.md](README.md) |
| Commits, PRs, contribution style | [CONTRIBUTING.md](CONTRIBUTING.md) |

Expand All @@ -30,6 +30,7 @@ Prefer these sources over guessing when behavior or schema matters.
| Evaluation | `evaluator/`, topic names in code and `conf/` |
| Advisory sync | `tasks/vmaas_sync/` |
| Migrations | `database_admin/migrations/` (verify naming against existing migrations) |
| Migration flow and session flags | `database_admin/update.go`, [docs/md/database.md#migrations](docs/md/database.md#migrations) |
| Database schema and SQL | `database_admin/schema/` |
| Containers and local orchestration | `docker-compose.yml`, `docker-compose.test.yml`, `Dockerfile*` |
| Scheduled jobs | `tasks/` |
Expand Down Expand Up @@ -85,3 +86,29 @@ Manager Component (REST API)
Response to User
```

---

## Database migrations: `terminate_db_sessions`

When advising on migrations or deploy config, use [docs/md/database.md#migrations](docs/md/database.md#migrations). Summary for agents:

**Default:** do **not** set `terminate_db_sessions`. It defaults to `false`; normal deploys must stay unchanged.

**What it does:** After `NOLOGIN` on app DB users, database-admin optionally runs `pg_terminate_backend` on open `listener` / `evaluator` / `manager` / `vmaas_sync` sessions, then waits until `pg_stat_activity` shows none, then runs DDL. Code: `prepareForMigration()` in `database_admin/update.go`.

**Recommend `terminate_db_sessions=true` only when:**

- The migration is a **major DDL** change likely to need exclusive locks or long runtimes (large `ALTER TABLE`, partition restructuring, similar)
- Migration logs show blocking after user lock with app sessions still present
- Ops are doing a **one-off major migration deploy** via `DATABASE_ADMIN_CONFIG` on the **db-migration Job**

**Do not recommend it when:**

- The change is a routine migration or standard release deploy
- The user is working locally or in CI
- There is no session-lock symptom — it forcibly drops client connections and is not a safe default

**How to set (production):** `DATABASE_ADMIN_CONFIG=terminate_db_sessions=true` on the db-migration Job for that deploy only; remove afterward. Do not enable on manager/listener/evaluator pods.

**Related:** Session wait logic and `pg_stat_activity` queries are in `database_admin/update.go`. Deploy layout (single migration Job, `check-for-db` init) is in `deploy/clowdapp.yaml`. Expected migration log sequence (advisory lock → sessions cleared → DDL start) is in [docs/md/database.md#migration-log-sequence](docs/md/database.md#migration-log-sequence).
2 changes: 2 additions & 0 deletions database_admin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ var (
unlockUsers = utils.PodConfig.GetBool("unlock_users", false)
// rerun config.sql
updateDBConfig = utils.PodConfig.GetBool("update_db_config", false)
// Terminate lockUsers sessions after NOLOGIN (for major DDL migrations)
terminateDBSessions = utils.PodConfig.GetBool("terminate_db_sessions", false)
)
11 changes: 11 additions & 0 deletions database_admin/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,17 @@ func dbSchemaVersion(conn database.Driver, sourceURL string) (int, error) {
return int(curVersion), nil //nolint:gosec
}

func migrationTargetVersion(sourceURL string) int {
if schemaMigration >= 0 {
return schemaMigration
}
latest, err := latestSchemaMigrationFileVersion(sourceURL)
if err != nil {
panic(err)
}
return latest
}

func migrateAction(conn database.Driver, sourceURL string) int {
expectedSchema := schemaMigration
utils.LogInfo("schema", expectedSchema, "DB migration in progress, waiting for schema")
Expand Down
79 changes: 73 additions & 6 deletions database_admin/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import (

var lockUsers = []string{"listener", "evaluator", "manager", "vmaas_sync"}

const activeAppSessionsWhere = `usename = ANY($1) AND pid <> pg_backend_pid()`

const activeAppSessionsQuery = `SELECT usename || ' ' || substring(query for 50)
FROM pg_stat_activity
WHERE usename = ANY($1)
WHERE ` + activeAppSessionsWhere + `
LIMIT 1`

const sessionCheckMaxRetries = 5
Expand All @@ -40,6 +42,7 @@ func execFromFile(db *sql.DB, filepath string) {
func getAdvisoryLock(db *sql.DB) {
log.Info("Getting advisory lock")
execOrPanic(db, "SELECT pg_advisory_lock(123)")
log.Info("Advisory lock acquired")
}

func releaseAdvisoryLock(db *sql.DB) {
Expand All @@ -59,11 +62,60 @@ func findActiveAppSession(db *sql.DB) (session string, found bool, err error) {
return session, true, nil
}

type appSession struct {
pid int
usename string
query string
}

const activeAppSessionPIDsQuery = `SELECT pid, usename, coalesce(substring(query for 50), '')
FROM pg_stat_activity
WHERE ` + activeAppSessionsWhere

func listActiveAppSessions(db *sql.DB) ([]appSession, error) {
rows, err := db.Query(activeAppSessionPIDsQuery, pq.Array(lockUsers))
if err != nil {
return nil, err
}
defer rows.Close()

var sessions []appSession
for rows.Next() {
var session appSession
if err := rows.Scan(&session.pid, &session.usename, &session.query); err != nil {
return nil, err
}
sessions = append(sessions, session)
}
return sessions, rows.Err()
}

func formatAppSessions(sessions []appSession) string {
details := make([]string, len(sessions))
for i, session := range sessions {
details[i] = fmt.Sprintf("pid=%d user=%s query=%q", session.pid, session.usename, session.query)
}
return strings.Join(details, "; ")
}

func terminateAppSessions(db *sql.DB) {
sessions, err := listActiveAppSessions(db)
if err != nil {
panic(err)
}
for _, session := range sessions {
if _, err := db.Exec("SELECT pg_terminate_backend($1)", session.pid); err != nil {
panic(err)
}
log.Infof("Terminated session pid=%d user=%s query=%q", session.pid, session.usename, session.query)
}
}

// Wait for closing of all lockUsers database sessions.
func waitForSessionClosed(db *sql.DB) {
errRetries := 0
for {
session, found, err := findActiveAppSession(db)
sessions, err := listActiveAppSessions(db)
if err != nil {
errRetries++
utils.LogError("err", err.Error(), "attempt", errRetries, "failed to check app database sessions")
Expand All @@ -75,11 +127,11 @@ func waitForSessionClosed(db *sql.DB) {
continue
}
errRetries = 0
if !found {
log.Info("No ", strings.Join(lockUsers, ", "), " sessions found")
if len(sessions) == 0 {
log.Info("App database sessions cleared")
return
}
utils.LogInfo("session:", session, "Session found")
log.Infof("Waiting for %d sessions: %s", len(sessions), formatAppSessions(sessions))
time.Sleep(time.Second)
}
}
Expand All @@ -106,10 +158,25 @@ func unblockUsers(db *sql.DB) {
}
}

func startMigration(conn database.Driver, db *sql.DB, migrationFilesURL string) {
func prepareForMigration(db *sql.DB) {
log.Info("Blocking writing users during the migration")
blockUsers(db)
if terminateDBSessions {
log.Info("Terminating active app database sessions")
terminateAppSessions(db)
}
waitForSessionClosed(db)
}

func startMigration(conn database.Driver, db *sql.DB, migrationFilesURL string) {
prepareForMigration(db)

targetVersion := migrationTargetVersion(migrationFilesURL)
if currentVersion, err := dbSchemaVersion(conn, migrationFilesURL); err != nil {
log.Infof("Starting schema migration to version %d", targetVersion)
} else {
log.Infof("Starting schema migration to version %d (current version %d)", targetVersion, currentVersion)
}

MigrateUp(conn, migrationFilesURL)

Expand Down
99 changes: 98 additions & 1 deletion database_admin/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@ import (

func openAppDB(t *testing.T, user, password string) *sql.DB {
t.Helper()
sslModeCert := utils.CoreCfg.DBSslMode
if utils.CoreCfg.DBSslRootCert != "" {
sslModeCert += "&sslrootcert=" + utils.CoreCfg.DBSslRootCert
}
url := fmt.Sprintf("postgres://%s:%s@%s:%d/%s?sslmode=%s",
user, password,
utils.CoreCfg.DBHost, utils.CoreCfg.DBPort,
utils.CoreCfg.DBName, utils.CoreCfg.DBSslMode,
utils.CoreCfg.DBName, sslModeCert,
)
db, err := sql.Open("postgres", url)
require.NoError(t, err)
Expand Down Expand Up @@ -71,3 +75,96 @@ func TestWaitForSessionClosedQueryErrors(t *testing.T) {

assert.Panics(t, func() { waitForSessionClosed(db) })
}

func TestListActiveAppSessionsFound(t *testing.T) {
utils.SkipWithoutDB(t)
database.Configure()

_ = openAppDB(t, "manager", utils.Getenv("MANAGER_PASSWORD", "manager"))

_, db := dbConn()
t.Cleanup(func() { _ = db.Close() })

sessions, err := listActiveAppSessions(db)
require.NoError(t, err)

found := false
for _, session := range sessions {
if session.usename == "manager" {
found = true
assert.NotZero(t, session.pid)
break
}
}
assert.True(t, found)
}

func TestTerminateAppSessions(t *testing.T) {
utils.SkipWithoutDB(t)
database.Configure()

listenerDB := openAppDB(t, "listener", utils.Getenv("LISTENER_PASSWORD", "listener"))
listenerDB.SetMaxOpenConns(1)

_, db := dbConn()
t.Cleanup(func() {
unblockUsers(db)
_ = db.Close()
})

var listenerPID int
require.NoError(t, listenerDB.QueryRow("SELECT pg_backend_pid()").Scan(&listenerPID))

sessions, err := listActiveAppSessions(db)
require.NoError(t, err)
found := false
for _, session := range sessions {
if session.pid == listenerPID {
found = true
break
}
}
require.True(t, found)

blockUsers(db)
terminateAppSessions(db)

var one int
err = listenerDB.QueryRow("SELECT 1").Scan(&one)
assert.Error(t, err)
}

func TestStartMigrationBeforeMigrateUp(t *testing.T) {
utils.SkipWithoutDB(t)
database.Configure()

oldTerminate := terminateDBSessions
terminateDBSessions = true
t.Cleanup(func() { terminateDBSessions = oldTerminate })

listenerDB := openAppDB(t, "listener", utils.Getenv("LISTENER_PASSWORD", "listener"))
listenerDB.SetMaxOpenConns(1)

_, db := dbConn()
t.Cleanup(func() {
unblockUsers(db)
_ = db.Close()
})

prepareForMigration(db)

var one int
err := listenerDB.QueryRow("SELECT 1").Scan(&one)
assert.Error(t, err)

_, found, err := findActiveAppSession(db)
require.NoError(t, err)
assert.False(t, found)

for _, user := range lockUsers {
var canLogin bool
err := db.QueryRow("SELECT rolcanlogin FROM pg_roles WHERE rolname = $1", user).Scan(&canLogin)
require.NoError(t, err)
assert.False(t, canLogin, "user %s should remain blocked before MigrateUp", user)
}
}
1 change: 1 addition & 0 deletions deploy/clowdapp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,7 @@ parameters:
- {name: MEM_REQUEST_DATABASE_ADMIN, value: 128Mi}
- {name: GOMEMLIMIT_DATABASE_ADMIN, value: '115MiB'} # set to 90% of the default memory limit 128Mi (don't forget `B`)
- {name: DATABASE_ADMIN_CONFIG, value: ''} # Set 'schema_version=XXX' if need specific database schema
# 'terminate_db_sessions=true' to kill app DB sessions before DDL
# 'force_schema_version=XXX' to reset the dirty flag to false and force the specific version, it will follow up with the schema upgrade defined by schema_version

# Common parameters
Expand Down
5 changes: 4 additions & 1 deletion docs/md/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ description of the component and data layout are in [separate page](database.md)

- **database-admin** - Executes database initialization and migrations. It needs all rights for the database. It also
creates database users for all components and updates passwords for them, so it reads passwords for admin and all
components from environment variables. Using container CLI it's possible to manually manage database
components from environment variables. Before DDL it sets app users (`listener`, `evaluator`, `manager`, `vmaas_sync`)
to `NOLOGIN`, optionally terminates lingering sessions when `terminate_db_sessions=true`, waits until no app sessions
remain, runs migrations, then restores `LOGIN`. See [Database migrations](database.md#migrations) for when to enable
session termination. Using container CLI it's possible to manually manage database
(`./scripts/psql.sh`). See [component environment variables](../../conf/database_admin.env)

### Components cooperation schema
Expand Down
60 changes: 60 additions & 0 deletions docs/md/database.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,63 @@ Main database tables description:
The ERD image below may lag `database_admin/schema/create_schema.sql`; for systems it may not reflect the split between **system_inventory** (host profile / upload payload) and **system_patch** (evaluation caches and aggregates).

![](graphics/db_diagram.png)

## Migrations

Schema changes live in `database_admin/migrations/` and are applied by the **database-admin** component (`database_admin/update.go`). In production, a single **db-migration** ClowdApp Job runs migrations; other pods wait in a `check-for-db` init container until the schema matches.

### Pre-migration session handling

Before running DDL, database-admin blocks app database users from new logins and waits for existing sessions to drain:

1. `ALTER USER … NOLOGIN` for `listener`, `evaluator`, `manager`, `vmaas_sync`
2. Optionally (see below) `pg_terminate_backend` on remaining app sessions
3. Poll `pg_stat_activity` until no app-user sessions remain
4. Run `MigrateUp`
5. `ALTER USER … LOGIN` to restore access

`NOLOGIN` stops **new** connections but does **not** close existing ones. Lingering sessions can hold locks and block DDL on large or sensitive migrations.

### `terminate_db_sessions` flag

| | |
|---|---|
| **Config key** | `terminate_db_sessions` (boolean, default `false`) |
| **Where to set** | `DATABASE_ADMIN_CONFIG` / `POD_CONFIG` on the db-migration Job only |
| **Example** | `terminate_db_sessions=true` |

When enabled, database-admin calls `pg_terminate_backend` on all open sessions for the four app users above (excluding its own connection), then waits again until `pg_stat_activity` is clear.

**Set `terminate_db_sessions=true` when:**

- The migration runs heavy or long-held DDL (e.g. `ALTER TABLE` on large partitioned tables, structural changes that need exclusive locks)
- A previous migration appeared stuck after “Blocking writing users” with app sessions still in `pg_stat_activity`
- Operations explicitly plan a major migration deploy and want to force-close stale app connections

**Leave unset (default `false`) when:**

- Routine deploys and normal migrations (additive columns, new tables, typical index changes)
- Local development, CI, and test runs
- There is no evidence of session-related blocking — the flag forcibly disconnects clients and should not be the default

Remove the flag after the major migration deploy completes; subsequent deploys should not need it.

### Migration log sequence

When the db-migration Job runs, expect these log lines in order (Kibana: `@log_stream: patchman-*` and `kubernetes.container_name: db-migration`):

1. `Getting advisory lock`
2. `Advisory lock acquired` — if missing, another process holds advisory lock 123
3. `Migrating the database`
4. `Blocking writing users during the migration`
5. `Terminating active app database sessions` / `Terminated session pid=...` — only when `terminate_db_sessions=true`
6. `Waiting for N sessions: ...` — repeats until sessions drain
7. `App database sessions cleared`
8. `Starting schema migration to version X`
9. Silence during DDL (normal)
10. `Reverting components privileges`
11. `Releasing advisory lock`

### Other `DATABASE_ADMIN_CONFIG` options

See `deploy/clowdapp.yaml` parameters and `database_admin/config.go`: `schema_migration`, `force_migration_version`, `reset_schema`, `update_users`, `unlock_users`, `update_db_config`.
Loading