Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 33 additions & 7 deletions database_admin/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,26 @@ package database_admin
import (
"app/base/utils"
"database/sql"
"errors"
"fmt"
"os"
"strings"
"time"

"github.com/golang-migrate/migrate/v4/database"
"github.com/lib/pq"
log "github.com/sirupsen/logrus"
)

var lockUsers = []string{"listener", "evaluator", "manager", "vmaas_sync"}

const activeAppSessionsQuery = `SELECT usename || ' ' || substring(query for 50)
FROM pg_stat_activity
WHERE usename = ANY($1)
LIMIT 1`

const sessionCheckMaxRetries = 5

func execOrPanic(db *sql.DB, query string, args ...interface{}) {
if _, err := db.Exec(query, args...); err != nil {
panic(err)
Expand All @@ -38,18 +47,35 @@ func releaseAdvisoryLock(db *sql.DB) {
execOrPanic(db, "SELECT pg_advisory_unlock(123)")
}

// findActiveAppSession returns the first open session for lockUsers, if any.
func findActiveAppSession(db *sql.DB) (session string, found bool, err error) {
err = db.QueryRow(activeAppSessionsQuery, pq.Array(lockUsers)).Scan(&session)
if errors.Is(err, sql.ErrNoRows) {
return "", false, nil
}
if err != nil {
return "", false, err
}
return session, true, nil
}

// Wait for closing of all lockUsers database sessions.
func waitForSessionClosed(db *sql.DB) {
errRetries := 0
for {
session := ""
err := db.QueryRow(
"SELECT usename || ' ' || substring(query for 50) FROM pg_stat_activity WHERE "+
"usename IN (?) LIMIT 30;", lockUsers,
).Scan(&session)
session, found, err := findActiveAppSession(db)
if err != nil {
log.Info(err)
errRetries++
utils.LogError("err", err.Error(), "attempt", errRetries, "failed to check app database sessions")
if errRetries >= sessionCheckMaxRetries {
panic(fmt.Errorf("failed to check app database sessions after %d attempts: %w",
sessionCheckMaxRetries, err))
}
time.Sleep(time.Second)
continue
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
}
if session == "" {
errRetries = 0
if !found {
log.Info("No ", strings.Join(lockUsers, ", "), " sessions found")
return
}
Expand Down
73 changes: 73 additions & 0 deletions database_admin/update_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package database_admin

import (
"app/base/database"
"app/base/utils"
"database/sql"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func openAppDB(t *testing.T, user, password string) *sql.DB {
t.Helper()
url := fmt.Sprintf("postgres://%s:%s@%s:%d/%s?sslmode=%s",
user, password,
utils.CoreCfg.DBHost, utils.CoreCfg.DBPort,
utils.CoreCfg.DBName, utils.CoreCfg.DBSslMode,
)
db, err := sql.Open("postgres", url)
require.NoError(t, err)
require.NoError(t, db.Ping())
t.Cleanup(func() { _ = db.Close() })
return db
}

// Query errors must not be treated as "no sessions". Use an unreachable host/port so
// QueryRow fails on connect; sql.Open itself does not connect.
func TestFindActiveAppSessionInvalidDB(t *testing.T) {
db, err := sql.Open("postgres", "postgres://127.0.0.1:1/nope?sslmode=disable&connect_timeout=1")
require.NoError(t, err)
t.Cleanup(func() { _ = db.Close() })

_, found, err := findActiveAppSession(db)
assert.Error(t, err)
assert.False(t, found)
}

func TestFindActiveAppSessionNoRows(t *testing.T) {
utils.SkipWithoutDB(t)
database.Configure()

_, db := dbConn()
t.Cleanup(func() { _ = db.Close() })

_, found, err := findActiveAppSession(db)
assert.NoError(t, err)
assert.False(t, found)
}

func TestFindActiveAppSessionFound(t *testing.T) {
utils.SkipWithoutDB(t)
database.Configure()

_ = openAppDB(t, "manager", utils.Getenv("MANAGER_PASSWORD", "manager"))

_, db := dbConn()
t.Cleanup(func() { _ = db.Close() })

session, found, err := findActiveAppSession(db)
require.NoError(t, err)
assert.True(t, found)
assert.Contains(t, session, "manager")
}

func TestWaitForSessionClosedQueryErrors(t *testing.T) {
db, err := sql.Open("postgres", "postgres://127.0.0.1:1/nope?sslmode=disable&connect_timeout=1")
require.NoError(t, err)
t.Cleanup(func() { _ = db.Close() })

assert.Panics(t, func() { waitForSessionClosed(db) })
}
Loading