diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index c2f813e0bee..152b70a9097 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -37,7 +37,7 @@ use spacetimedb_datastore::execution_context::Workload; use spacetimedb_datastore::system_tables::ModuleKind; use spacetimedb_datastore::traits::Program; use spacetimedb_durability::{self as durability}; -use spacetimedb_lib::{AlgebraicValue, Identity, Timestamp}; +use spacetimedb_lib::{identity::AuthCtx, AlgebraicValue, Identity, Timestamp}; use spacetimedb_paths::server::{ModuleLogsDir, ServerDataDir}; use spacetimedb_runtime::AbortHandle; use spacetimedb_sats::hash::Hash; @@ -820,6 +820,31 @@ impl ModuleLauncher { } } +fn repair_stale_view_backing_tables_on_launch(launched: &LaunchedModule) -> anyhow::Result<()> { + let info = launched.module_host.info(); + let stdb = info.relational_db().clone(); + let Some(plan) = db::update::stale_view_backing_table_recreate_plan(stdb.as_ref(), &info.module_def)? else { + return Ok(()); + }; + + info!( + "repairing stale view backing tables during module launch: {}", + info.database_identity + ); + let system_logger = launched.replica_ctx.logger.system_logger(); + system_logger.info("Repairing stale view backing tables"); + + let auth_ctx = AuthCtx::for_current(info.owner_identity); + stdb.with_auto_commit(Workload::Internal, |tx| -> anyhow::Result<()> { + match db::update::update_database(stdb.as_ref(), tx, auth_ctx, plan, system_logger)? { + db::update::UpdateResult::Success | db::update::UpdateResult::RequiresClientDisconnect => Ok(()), + db::update::UpdateResult::EvaluateSubscribedViews => { + bail!("startup view backing table repair unexpectedly requested view evaluation") + } + } + }) +} + /// Update a module. /// /// If the `db` is not initialized yet (i.e. its program hash is `None`), @@ -1076,6 +1101,7 @@ impl Host { Result::from(call_result)?; } } else { + repair_stale_view_backing_tables_on_launch(&launched)?; drop(program) } diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index a800a3654b5..5ebf3e18e14 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -49,7 +49,7 @@ use spacetimedb_lib::{bsatn, http as st_http, ConnectionId, Hash, ProductType, R use spacetimedb_primitives::{HttpHandlerId, ProcedureId, TableId, ViewFnPtr, ViewId}; use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type; use spacetimedb_sats::{AlgebraicType, AlgebraicTypeRef, Deserialize, ProductValue, Typespace, WithTypespace}; -use spacetimedb_schema::auto_migrate::{MigratePlan, MigrationPolicy, MigrationPolicyError}; +use spacetimedb_schema::auto_migrate::{ponder_migrate, MigrationPolicy, MigrationPolicyError}; use spacetimedb_schema::def::deserialize::FunctionDef; use spacetimedb_schema::def::{ModuleDef, ViewDef}; use spacetimedb_schema::identifier::Identifier; @@ -641,21 +641,22 @@ impl InstanceCommon { let system_logger = replica_ctx.logger.system_logger(); let stdb = &replica_ctx.relational_db(); - let plan: MigratePlan = match policy.try_migrate( + let plan = match ponder_migrate(&old_module_info.module_def, &self.info.module_def) { + Ok(plan) => plan, + Err(e) => return Ok(UpdateDatabaseResult::AutoMigrateError(e.into())), + }; + + if let Err(e) = policy.permits_migrate_plan( self.info.database_identity, old_module_info.module_hash, - &old_module_info.module_def, self.info.module_hash, - &self.info.module_def, + &plan, ) { - Ok(plan) => plan, - Err(e) => { - return match e { - MigrationPolicyError::AutoMigrateFailure(e) => Ok(UpdateDatabaseResult::AutoMigrateError(e.into())), - _ => Ok(UpdateDatabaseResult::ErrorExecutingMigration(e.into())), - } - } - }; + return match e { + MigrationPolicyError::AutoMigrateFailure(e) => Ok(UpdateDatabaseResult::AutoMigrateError(e.into())), + _ => Ok(UpdateDatabaseResult::ErrorExecutingMigration(e.into())), + }; + } let program_hash = program.hash; let host_type = HostType::from(program.kind); diff --git a/crates/engine/src/update.rs b/crates/engine/src/update.rs index c868341096e..98094a69947 100644 --- a/crates/engine/src/update.rs +++ b/crates/engine/src/update.rs @@ -1,13 +1,16 @@ use super::relational_db::RelationalDB; use crate::sql::rls::RowLevelExpr; use anyhow::Context; -use spacetimedb_datastore::locking_tx_datastore::MutTxId; +use spacetimedb_datastore::execution_context::Workload; +use spacetimedb_datastore::locking_tx_datastore::state_view::StateView; +use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId}; +use spacetimedb_datastore::system_tables::{StViewFields, StViewRow, ST_VIEW_ID}; use spacetimedb_lib::db::auth::StTableType; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::AlgebraicValue; use spacetimedb_primitives::{ColSet, TableId}; -use spacetimedb_schema::auto_migrate::{AutoMigratePlan, ManualMigratePlan, MigratePlan}; -use spacetimedb_schema::def::{ModuleDef, TableDef, ViewDef}; +use spacetimedb_schema::auto_migrate::{AutoMigratePlan, AutoMigrateStep, ManualMigratePlan, MigratePlan}; +use spacetimedb_schema::def::{ModuleDef, ModuleDefLookup, TableDef, ViewDef}; use spacetimedb_schema::schema::{ column_schemas_from_defs, ConstraintSchema, IndexSchema, Schema, SequenceSchema, TableSchema, }; @@ -26,6 +29,91 @@ pub enum UpdateResult { EvaluateSubscribedViews, } +/// Build a repair-only migration plan for views whose stored backing table row +/// layout is stale compared to the currently loaded module definition. +pub fn stale_view_backing_table_recreate_plan<'def>( + stdb: &RelationalDB, + module_def: &'def ModuleDef, +) -> anyhow::Result>> { + let steps = stale_view_backing_table_recreate_steps(stdb, module_def)?; + if steps.is_empty() { + return Ok(None); + } + + Ok(Some(MigratePlan::Auto(AutoMigratePlan { + old: module_def, + new: module_def, + prechecks: Vec::new(), + steps, + }))) +} + +fn stale_view_backing_table_recreate_steps<'def>( + stdb: &RelationalDB, + module_def: &'def ModuleDef, +) -> anyhow::Result>> { + stdb.with_read_only(Workload::Internal, |tx| -> anyhow::Result<_> { + let mut steps = Vec::new(); + + for view in module_def.views() { + if view_backing_table_needs_recreate(stdb, tx, module_def, view)? { + steps.extend([ + AutoMigrateStep::RemoveView(view.key()), + AutoMigrateStep::AddView(view.key()), + ]); + } + } + + if !steps.is_empty() { + ensure_disconnect_all_users(&mut steps); + steps.sort(); + } + + Ok(steps) + }) +} + +fn view_backing_table_needs_recreate( + stdb: &RelationalDB, + tx: &mut TxId, + module_def: &ModuleDef, + view: &ViewDef, +) -> anyhow::Result { + let Some(table_id) = view_backing_table_id(tx, view)? else { + return Ok(false); + }; + + let actual = stdb.schema_for_table(tx, table_id)?; + let expected = TableSchema::from_view_def_for_datastore(module_def, view); + + Ok(view_backing_row_layout_changed(&actual, &expected)) +} + +fn view_backing_table_id(tx: &mut TxId, view: &ViewDef) -> anyhow::Result> { + let view_name = AlgebraicValue::from(>::from(&*view.name)); + let Some(row) = tx + .iter_by_col_eq(ST_VIEW_ID, StViewFields::ViewName, &view_name)? + .next() + else { + return Ok(None); + }; + + Ok(StViewRow::try_from(row)?.table_id) +} + +fn view_backing_row_layout_changed(actual: &TableSchema, expected: &TableSchema) -> bool { + actual.row_type != expected.row_type +} + +fn ensure_disconnect_all_users(steps: &mut Vec>) { + if !steps + .iter() + .any(|step| matches!(step, AutoMigrateStep::DisconnectAllUsers)) + { + steps.push(AutoMigrateStep::DisconnectAllUsers); + } +} + /// Update the database according to the migration plan. /// /// The update is performed within the transactional context `tx`. @@ -575,6 +663,86 @@ mod test { .expect("should be a valid module definition") } + fn view_module() -> ModuleDef { + let mut builder = RawModuleDefV10Builder::new(); + let return_type_ref = builder.add_algebraic_type( + [], + "my_view_return_type", + AlgebraicType::product([("a", AlgebraicType::U64)]), + true, + ); + builder.add_view( + "my_view", + 0, + true, + true, + ProductType::unit(), + AlgebraicType::array(AlgebraicType::Ref(return_type_ref)), + ); + builder.add_view_primary_key("my_view", ["a"]); + builder + .finish() + .try_into() + .expect("should be a valid module definition") + } + + fn old_view_backing_schema(module_def: &ModuleDef) -> TableSchema { + let view = module_def.view("my_view").unwrap(); + let mut schema = TableSchema::from_view_def_for_datastore(module_def, view); + + schema.columns.remove(0); + for (pos, col) in schema.columns.iter_mut().enumerate() { + col.col_pos = pos.into(); + } + schema.indexes.clear(); + schema.constraints.clear(); + schema.reset(); + + schema + } + + fn create_backing_table(stdb: &TestDB, schema: TableSchema) -> anyhow::Result<()> { + let mut tx = begin_mut_tx(stdb); + stdb.create_table(&mut tx, schema)?; + stdb.commit_tx(tx)?; + Ok(()) + } + + #[test] + fn stale_view_backing_schema_generates_startup_repair_plan() -> anyhow::Result<()> { + let stdb = TestDB::durable()?; + let module_def = view_module(); + create_backing_table(&stdb, old_view_backing_schema(&module_def))?; + + let plan = stale_view_backing_table_recreate_plan(&stdb, &module_def)?.expect("expected repair plan"); + + let MigratePlan::Auto(plan) = &plan else { + panic!("expected auto migration"); + }; + let my_view = module_def.view("my_view").unwrap().key(); + assert!(plan.steps.contains(&AutoMigrateStep::RemoveView(my_view))); + assert!(plan.steps.contains(&AutoMigrateStep::AddView(my_view))); + assert!(plan.steps.contains(&AutoMigrateStep::DisconnectAllUsers)); + assert!(!plan.steps.contains(&AutoMigrateStep::UpdateView(my_view))); + + Ok(()) + } + + #[test] + fn current_view_backing_schema_skips_startup_repair_plan() -> anyhow::Result<()> { + let stdb = TestDB::durable()?; + let module_def = view_module(); + let view = module_def.view("my_view").unwrap(); + let backing_schema = TableSchema::from_view_def_for_datastore(&module_def, view); + create_backing_table(&stdb, backing_schema)?; + + let plan = stale_view_backing_table_recreate_plan(&stdb, &module_def)?; + + assert!(plan.is_none(), "{plan:#?}"); + + Ok(()) + } + enum TakeSnapshot { None, BeforeAutomigration, diff --git a/crates/schema/src/auto_migrate.rs b/crates/schema/src/auto_migrate.rs index 895a226b9b5..63239de7ecc 100644 --- a/crates/schema/src/auto_migrate.rs +++ b/crates/schema/src/auto_migrate.rs @@ -131,14 +131,25 @@ impl MigrationPolicy { new_module_def: &'def ModuleDef, ) -> anyhow::Result, MigrationPolicyError> { let plan = ponder_migrate(old_module_def, new_module_def).map_err(MigrationPolicyError::AutoMigrateFailure)?; + self.permits_migrate_plan(database_identity, old_module_hash, new_module_hash, &plan)?; + Ok(plan) + } + /// Validate an already-generated migration plan under this policy. + pub fn permits_migrate_plan( + &self, + database_identity: Identity, + old_module_hash: spacetimedb_lib::Hash, + new_module_hash: spacetimedb_lib::Hash, + plan: &MigratePlan<'_>, + ) -> anyhow::Result<(), MigrationPolicyError> { let token = MigrationToken { database_identity, old_module_hash, new_module_hash, }; - self.permits_plan(&plan, &token)?; - Ok(plan) + self.permits_plan(plan, &token)?; + Ok(()) } }