Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use spacetimedb_datastore::execution_context::Workload;
use spacetimedb_datastore::system_tables::ModuleKind;
use spacetimedb_datastore::traits::Program;
use spacetimedb_durability::{self as durability};
use spacetimedb_lib::{AlgebraicValue, Identity, Timestamp};
use spacetimedb_lib::{identity::AuthCtx, AlgebraicValue, Identity, Timestamp};
use spacetimedb_paths::server::{ModuleLogsDir, ServerDataDir};
use spacetimedb_runtime::AbortHandle;
use spacetimedb_sats::hash::Hash;
Expand Down Expand Up @@ -820,6 +820,31 @@ impl<F: Fn() + Send + Sync + 'static> ModuleLauncher<F> {
}
}

fn repair_stale_view_backing_tables_on_launch(launched: &LaunchedModule) -> anyhow::Result<()> {
let info = launched.module_host.info();
let stdb = info.relational_db().clone();
let Some(plan) = db::update::stale_view_backing_table_recreate_plan(stdb.as_ref(), &info.module_def)? else {
return Ok(());
};

info!(
"repairing stale view backing tables during module launch: {}",
info.database_identity
);
let system_logger = launched.replica_ctx.logger.system_logger();
system_logger.info("Repairing stale view backing tables");

let auth_ctx = AuthCtx::for_current(info.owner_identity);
stdb.with_auto_commit(Workload::Internal, |tx| -> anyhow::Result<()> {
match db::update::update_database(stdb.as_ref(), tx, auth_ctx, plan, system_logger)? {
db::update::UpdateResult::Success | db::update::UpdateResult::RequiresClientDisconnect => Ok(()),
db::update::UpdateResult::EvaluateSubscribedViews => {
bail!("startup view backing table repair unexpectedly requested view evaluation")
}
}
})
}

/// Update a module.
///
/// If the `db` is not initialized yet (i.e. its program hash is `None`),
Expand Down Expand Up @@ -1076,6 +1101,7 @@ impl Host {
Result::from(call_result)?;
}
} else {
repair_stale_view_backing_tables_on_launch(&launched)?;
drop(program)
}

Expand Down
25 changes: 13 additions & 12 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use spacetimedb_lib::{bsatn, http as st_http, ConnectionId, Hash, ProductType, R
use spacetimedb_primitives::{HttpHandlerId, ProcedureId, TableId, ViewFnPtr, ViewId};
use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type;
use spacetimedb_sats::{AlgebraicType, AlgebraicTypeRef, Deserialize, ProductValue, Typespace, WithTypespace};
use spacetimedb_schema::auto_migrate::{MigratePlan, MigrationPolicy, MigrationPolicyError};
use spacetimedb_schema::auto_migrate::{ponder_migrate, MigrationPolicy, MigrationPolicyError};
use spacetimedb_schema::def::deserialize::FunctionDef;
use spacetimedb_schema::def::{ModuleDef, ViewDef};
use spacetimedb_schema::identifier::Identifier;
Expand Down Expand Up @@ -641,21 +641,22 @@ impl InstanceCommon {
let system_logger = replica_ctx.logger.system_logger();
let stdb = &replica_ctx.relational_db();

let plan: MigratePlan = match policy.try_migrate(
let plan = match ponder_migrate(&old_module_info.module_def, &self.info.module_def) {
Ok(plan) => plan,
Err(e) => return Ok(UpdateDatabaseResult::AutoMigrateError(e.into())),
};

if let Err(e) = policy.permits_migrate_plan(
self.info.database_identity,
old_module_info.module_hash,
&old_module_info.module_def,
self.info.module_hash,
&self.info.module_def,
&plan,
) {
Ok(plan) => plan,
Err(e) => {
return match e {
MigrationPolicyError::AutoMigrateFailure(e) => Ok(UpdateDatabaseResult::AutoMigrateError(e.into())),
_ => Ok(UpdateDatabaseResult::ErrorExecutingMigration(e.into())),
}
}
};
return match e {
MigrationPolicyError::AutoMigrateFailure(e) => Ok(UpdateDatabaseResult::AutoMigrateError(e.into())),
_ => Ok(UpdateDatabaseResult::ErrorExecutingMigration(e.into())),
};
}

let program_hash = program.hash;
let host_type = HostType::from(program.kind);
Expand Down
174 changes: 171 additions & 3 deletions crates/engine/src/update.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use super::relational_db::RelationalDB;
use crate::sql::rls::RowLevelExpr;
use anyhow::Context;
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
use spacetimedb_datastore::execution_context::Workload;
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
use spacetimedb_datastore::system_tables::{StViewFields, StViewRow, ST_VIEW_ID};
use spacetimedb_lib::db::auth::StTableType;
use spacetimedb_lib::identity::AuthCtx;
use spacetimedb_lib::AlgebraicValue;
use spacetimedb_primitives::{ColSet, TableId};
use spacetimedb_schema::auto_migrate::{AutoMigratePlan, ManualMigratePlan, MigratePlan};
use spacetimedb_schema::def::{ModuleDef, TableDef, ViewDef};
use spacetimedb_schema::auto_migrate::{AutoMigratePlan, AutoMigrateStep, ManualMigratePlan, MigratePlan};
use spacetimedb_schema::def::{ModuleDef, ModuleDefLookup, TableDef, ViewDef};
use spacetimedb_schema::schema::{
column_schemas_from_defs, ConstraintSchema, IndexSchema, Schema, SequenceSchema, TableSchema,
};
Expand All @@ -26,6 +29,91 @@ pub enum UpdateResult {
EvaluateSubscribedViews,
}

/// Build a repair-only migration plan for views whose stored backing table row
/// layout is stale compared to the currently loaded module definition.
pub fn stale_view_backing_table_recreate_plan<'def>(
stdb: &RelationalDB,
module_def: &'def ModuleDef,
) -> anyhow::Result<Option<MigratePlan<'def>>> {
let steps = stale_view_backing_table_recreate_steps(stdb, module_def)?;
if steps.is_empty() {
return Ok(None);
}

Ok(Some(MigratePlan::Auto(AutoMigratePlan {
old: module_def,
new: module_def,
prechecks: Vec::new(),
steps,
})))
}

fn stale_view_backing_table_recreate_steps<'def>(
stdb: &RelationalDB,
module_def: &'def ModuleDef,
) -> anyhow::Result<Vec<AutoMigrateStep<'def>>> {
stdb.with_read_only(Workload::Internal, |tx| -> anyhow::Result<_> {
let mut steps = Vec::new();

for view in module_def.views() {
if view_backing_table_needs_recreate(stdb, tx, module_def, view)? {
steps.extend([
AutoMigrateStep::RemoveView(view.key()),
AutoMigrateStep::AddView(view.key()),
]);
}
}

if !steps.is_empty() {
ensure_disconnect_all_users(&mut steps);
steps.sort();
}

Ok(steps)
})
}

fn view_backing_table_needs_recreate(
stdb: &RelationalDB,
tx: &mut TxId,
module_def: &ModuleDef,
view: &ViewDef,
) -> anyhow::Result<bool> {
let Some(table_id) = view_backing_table_id(tx, view)? else {
return Ok(false);
};

let actual = stdb.schema_for_table(tx, table_id)?;
let expected = TableSchema::from_view_def_for_datastore(module_def, view);

Ok(view_backing_row_layout_changed(&actual, &expected))
}

fn view_backing_table_id(tx: &mut TxId, view: &ViewDef) -> anyhow::Result<Option<TableId>> {
let view_name = AlgebraicValue::from(<Box<str>>::from(&*view.name));
let Some(row) = tx
.iter_by_col_eq(ST_VIEW_ID, StViewFields::ViewName, &view_name)?
.next()
else {
return Ok(None);
};

Ok(StViewRow::try_from(row)?.table_id)
}

fn view_backing_row_layout_changed(actual: &TableSchema, expected: &TableSchema) -> bool {
actual.row_type != expected.row_type
}

fn ensure_disconnect_all_users(steps: &mut Vec<AutoMigrateStep<'_>>) {
if !steps
.iter()
.any(|step| matches!(step, AutoMigrateStep::DisconnectAllUsers))
{
steps.push(AutoMigrateStep::DisconnectAllUsers);
}
}

/// Update the database according to the migration plan.
///
/// The update is performed within the transactional context `tx`.
Expand Down Expand Up @@ -575,6 +663,86 @@ mod test {
.expect("should be a valid module definition")
}

fn view_module() -> ModuleDef {
let mut builder = RawModuleDefV10Builder::new();
let return_type_ref = builder.add_algebraic_type(
[],
"my_view_return_type",
AlgebraicType::product([("a", AlgebraicType::U64)]),
true,
);
builder.add_view(
"my_view",
0,
true,
true,
ProductType::unit(),
AlgebraicType::array(AlgebraicType::Ref(return_type_ref)),
);
builder.add_view_primary_key("my_view", ["a"]);
builder
.finish()
.try_into()
.expect("should be a valid module definition")
}

fn old_view_backing_schema(module_def: &ModuleDef) -> TableSchema {
let view = module_def.view("my_view").unwrap();
let mut schema = TableSchema::from_view_def_for_datastore(module_def, view);

schema.columns.remove(0);
for (pos, col) in schema.columns.iter_mut().enumerate() {
col.col_pos = pos.into();
}
schema.indexes.clear();
schema.constraints.clear();
schema.reset();

schema
}

fn create_backing_table(stdb: &TestDB, schema: TableSchema) -> anyhow::Result<()> {
let mut tx = begin_mut_tx(stdb);
stdb.create_table(&mut tx, schema)?;
stdb.commit_tx(tx)?;
Ok(())
}

#[test]
fn stale_view_backing_schema_generates_startup_repair_plan() -> anyhow::Result<()> {
let stdb = TestDB::durable()?;
let module_def = view_module();
create_backing_table(&stdb, old_view_backing_schema(&module_def))?;

let plan = stale_view_backing_table_recreate_plan(&stdb, &module_def)?.expect("expected repair plan");

let MigratePlan::Auto(plan) = &plan else {
panic!("expected auto migration");
};
let my_view = module_def.view("my_view").unwrap().key();
assert!(plan.steps.contains(&AutoMigrateStep::RemoveView(my_view)));
assert!(plan.steps.contains(&AutoMigrateStep::AddView(my_view)));
assert!(plan.steps.contains(&AutoMigrateStep::DisconnectAllUsers));
assert!(!plan.steps.contains(&AutoMigrateStep::UpdateView(my_view)));

Ok(())
}

#[test]
fn current_view_backing_schema_skips_startup_repair_plan() -> anyhow::Result<()> {
let stdb = TestDB::durable()?;
let module_def = view_module();
let view = module_def.view("my_view").unwrap();
let backing_schema = TableSchema::from_view_def_for_datastore(&module_def, view);
create_backing_table(&stdb, backing_schema)?;

let plan = stale_view_backing_table_recreate_plan(&stdb, &module_def)?;

assert!(plan.is_none(), "{plan:#?}");

Ok(())
}

enum TakeSnapshot {
None,
BeforeAutomigration,
Expand Down
15 changes: 13 additions & 2 deletions crates/schema/src/auto_migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,25 @@ impl MigrationPolicy {
new_module_def: &'def ModuleDef,
) -> anyhow::Result<MigratePlan<'def>, MigrationPolicyError> {
let plan = ponder_migrate(old_module_def, new_module_def).map_err(MigrationPolicyError::AutoMigrateFailure)?;
self.permits_migrate_plan(database_identity, old_module_hash, new_module_hash, &plan)?;
Ok(plan)
}

/// Validate an already-generated migration plan under this policy.
pub fn permits_migrate_plan(
&self,
database_identity: Identity,
old_module_hash: spacetimedb_lib::Hash,
new_module_hash: spacetimedb_lib::Hash,
plan: &MigratePlan<'_>,
) -> anyhow::Result<(), MigrationPolicyError> {
let token = MigrationToken {
database_identity,
old_module_hash,
new_module_hash,
};
self.permits_plan(&plan, &token)?;
Ok(plan)
self.permits_plan(plan, &token)?;
Ok(())
}
}

Expand Down
Loading