diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index c2f813e0bee..9cfa13babd7 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -1,4 +1,4 @@ -use super::module_host::{EventStatus, ModuleHost, ModuleInfo, NoSuchModule}; +use super::module_host::{DurableOffset, EventStatus, InitDatabaseResult, ModuleHost, ModuleInfo, NoSuchModule}; use super::scheduler::SchedulerStarter; use super::v8::V8HeapMetrics; use super::wasmtime::{WasmMemoryBytesMetric, WasmtimeRuntime}; @@ -89,6 +89,83 @@ where pub type ProgramStorage = Arc; +/// A launched module host plus any pending controldb program-bootstrap completion work. +pub struct ModuleHostWithBootstrap { + pub module: ModuleHost, + pub bootstrap_completion: Option, +} + +/// A handle describing when program bootstrap can be marked complete. +/// +/// The handle owns any wait state needed to prove that the database has a +/// durable `st_module` row. +pub struct BootstrapCompletion { + program_hash: Hash, + status: ProgramBootstrap, +} + +/// Has the module been bootstrapped from the controldb? +/// +/// Once we have inserted into `st_module`, bootstrapping is no longer necessary, +/// and the initial program bytes can be dropped fromt the controldb. +enum ProgramBootstrap { + /// The module's program bytes have already been written to `st_module` + Durable, + /// The module was bootstrapped from the controldb and the `st_module` write is not yet durable + Pending { + tx_offset: TransactionOffset, + durable_offset: Option, + }, +} + +impl BootstrapCompletion { + fn durable(program_hash: Hash) -> Self { + Self { + program_hash, + status: ProgramBootstrap::Durable, + } + } + + fn pending(program_hash: Hash, tx_offset: TransactionOffset, durable_offset: Option) -> Self { + Self { + program_hash, + status: ProgramBootstrap::Pending { + tx_offset, + durable_offset, + }, + } + } + + pub fn program_hash(&self) -> Hash { + self.program_hash + } + + /// Wait until it is safe to complete program bootstrap in controldb. + /// That is, wait until the initial `st_module` insert becomes durable. + pub async fn wait(self) -> anyhow::Result { + match self.status { + ProgramBootstrap::Durable => Ok(self.program_hash), + ProgramBootstrap::Pending { + tx_offset, + durable_offset, + } => { + let tx_offset = tx_offset + .await + .context("failed waiting for initialized program transaction offset")?; + + if let Some(mut durable_offset) = durable_offset { + durable_offset + .wait_for(tx_offset) + .await + .context("failed waiting for initialized program to become durable")?; + } + + Ok(self.program_hash) + } + } + } +} + /// A host controller manages the lifecycle of spacetime databases and their /// associated modules. /// @@ -155,6 +232,11 @@ pub struct ReducerCallResult { pub execution_duration: Duration, } +pub struct ReducerCallResultWithTxOffset { + pub result: ReducerCallResult, + pub tx_offset: TransactionOffset, +} + impl ReducerCallResult { pub fn is_err(&self) -> bool { self.outcome.is_err() @@ -275,11 +357,30 @@ impl HostController { /// See also: [`Self::get_module_host`] #[tracing::instrument(level = "trace", skip_all)] pub async fn get_or_launch_module_host(&self, database: Database, replica_id: u64) -> anyhow::Result { - let mut rx = self.watch_maybe_launch_module_host(database, replica_id).await?; + let (mut rx, _) = self + .watch_maybe_launch_module_host_with_bootstrap(database, replica_id) + .await?; let module = rx.borrow_and_update(); Ok(module.clone()) } + /// Like [`Self::get_or_launch_module_host`], but returns a bootstrap completion waiter. + #[tracing::instrument(level = "trace", skip_all)] + pub async fn get_or_launch_module_host_with_bootstrap( + &self, + database: Database, + replica_id: u64, + ) -> anyhow::Result { + let (mut rx, bootstrap_completion) = self + .watch_maybe_launch_module_host_with_bootstrap(database, replica_id) + .await?; + let module = rx.borrow_and_update(); + Ok(ModuleHostWithBootstrap { + module: module.clone(), + bootstrap_completion, + }) + } + /// Like [`Self::get_or_launch_module_host`], use a [`ModuleHost`] managed /// by this controller, or launch it if it is not running. /// @@ -293,13 +394,24 @@ impl HostController { database: Database, replica_id: u64, ) -> anyhow::Result> { + self.watch_maybe_launch_module_host_with_bootstrap(database, replica_id) + .await + .map(|(rx, _)| rx) + } + + /// Like [`Self::watch_maybe_launch_module_host`], but returns a bootstrap completion waiter. + async fn watch_maybe_launch_module_host_with_bootstrap( + &self, + database: Database, + replica_id: u64, + ) -> anyhow::Result<(watch::Receiver, Option)> { // Try a read lock first. { if let Ok(guard) = self.acquire_read_lock(replica_id).await && let Some(host) = &*guard { trace!("cached host {}/{}", database.database_identity, replica_id); - return Ok(host.module.subscribe()); + return Ok((host.module.subscribe(), None)); } } @@ -318,7 +430,7 @@ impl HostController { database.database_identity, replica_id ); - return Ok(host.module.subscribe()); + return Ok((host.module.subscribe(), None)); } trace!("launch host {}/{}", database.database_identity, replica_id); @@ -339,12 +451,16 @@ impl HostController { // Note that `tokio::spawn` only cancels its tasks when the runtime shuts down, // at which point we won't be calling `try_init_host` again anyways. let rx = tokio::spawn(async move { - let host = this.try_init_host(database, replica_id).await?; + let initialized = this.try_init_host(database, replica_id).await?; + let HostInit { + host, + bootstrap_completion, + } = initialized; let rx = host.module.subscribe(); *guard = Some(host); - Ok::<_, anyhow::Error>(rx) + Ok::<_, anyhow::Error>((rx, bootstrap_completion)) }) .await??; @@ -377,8 +493,8 @@ impl HostController { ) .await?; - let call_result = launched.module_host.init_database(program).await?; - if let Some(call_result) = call_result { + let InitDatabaseResult { reducer, .. } = launched.module_host.init_database(program).await?; + if let Some(call_result) = reducer { Result::from(call_result)?; } @@ -437,7 +553,7 @@ impl HostController { let mut host = match guard.take() { None => { trace!("host not running, try_init"); - this.try_init_host(database, replica_id).await? + this.try_init_host(database, replica_id).await?.host } Some(host) => { trace!("host found, updating"); @@ -640,7 +756,7 @@ impl HostController { timeout(Duration::from_secs(5), lock.read_owned()).await } - async fn try_init_host(&self, database: Database, replica_id: u64) -> anyhow::Result { + async fn try_init_host(&self, database: Database, replica_id: u64) -> anyhow::Result { let database_identity = database.database_identity; Host::try_init(self, database, replica_id) .await @@ -759,6 +875,11 @@ struct LaunchedModule { scheduler_starter: SchedulerStarter, } +struct HostInit { + host: Host, + bootstrap_completion: Option, +} + struct ModuleLauncher { database: Database, replica_id: u64, @@ -888,7 +1009,11 @@ impl Host { /// Note that this does **not** run module initialization routines, but may /// create on-disk artifacts if the host / database did not exist. #[tracing::instrument(level = "debug", skip_all)] - async fn try_init(host_controller: &HostController, database: Database, replica_id: u64) -> anyhow::Result { + async fn try_init( + host_controller: &HostController, + database: Database, + replica_id: u64, + ) -> anyhow::Result { let HostController { data_dir, default_config: config, @@ -982,6 +1107,7 @@ impl Host { (program, true) } }; + let mut bootstrap_completion = Some(BootstrapCompletion::durable(program.hash)); let relational_db = Arc::new(db); let (program, launched) = match HostType::from(program.kind) { @@ -1071,10 +1197,16 @@ impl Host { }; if program_needs_init { - let call_result = launched.module_host.init_database(program).await?; - if let Some(call_result) = call_result { + let program_hash = program.hash; + let InitDatabaseResult { reducer, tx_offset } = launched.module_host.init_database(program).await?; + if let Some(call_result) = reducer { Result::from(call_result)?; } + bootstrap_completion = Some(BootstrapCompletion::pending( + program_hash, + tx_offset, + launched.module_host.durable_tx_offset(), + )); } else { drop(program) } @@ -1113,13 +1245,16 @@ impl Host { let module = watch::Sender::new(module_host); - Ok(Host { - module, - replica_ctx, - scheduler, - disk_metrics_recorder_task, - tx_metrics_recorder_task, - view_cleanup_task, + Ok(HostInit { + host: Host { + module, + replica_ctx, + scheduler, + disk_metrics_recorder_task, + tx_metrics_recorder_task, + view_cleanup_task, + }, + bootstrap_completion, }) } diff --git a/crates/core/src/host/mod.rs b/crates/core/src/host/mod.rs index daa506a8cf5..68f6035df80 100644 --- a/crates/core/src/host/mod.rs +++ b/crates/core/src/host/mod.rs @@ -25,10 +25,13 @@ mod wasm_common; pub use disk_storage::DiskStorage; pub use host_controller::{ - extract_schema, CallProcedureReturn, CallResult, ExternalDurability, ExternalStorage, HostController, - HostRuntimeConfig, MigratePlanResult, ProcedureCallResult, ProgramStorage, ReducerCallResult, ReducerOutcome, + extract_schema, BootstrapCompletion, CallProcedureReturn, CallResult, ExternalDurability, ExternalStorage, + HostController, HostRuntimeConfig, MigratePlanResult, ModuleHostWithBootstrap, ProcedureCallResult, ProgramStorage, + ReducerCallResult, ReducerCallResultWithTxOffset, ReducerOutcome, +}; +pub use module_host::{ + InitDatabaseResult, ModuleHost, NoSuchModule, ProcedureCallError, ReducerCallError, UpdateDatabaseResult, }; -pub use module_host::{ModuleHost, NoSuchModule, ProcedureCallError, ReducerCallError, UpdateDatabaseResult}; pub use scheduler::Scheduler; /// Encoded arguments to a database function. diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 331aff0ca06..ec65b7e9dd4 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1,6 +1,6 @@ use super::{ - ArgsTuple, FunctionArgs, InvalidProcedureArguments, InvalidReducerArguments, ReducerCallResult, ReducerId, - ReducerOutcome, Scheduler, + ArgsTuple, FunctionArgs, InvalidProcedureArguments, InvalidReducerArguments, ReducerCallResult, + ReducerCallResultWithTxOffset, ReducerId, ReducerOutcome, Scheduler, }; use crate::client::messages::{OneOffQueryResponseMessage, ProcedureResultMessage, SerializableMessage}; use crate::client::{ClientActorId, ClientConnectionSender, WsVersion}; @@ -21,8 +21,8 @@ use crate::messages::control_db::{Database, HostType}; use crate::replica_context::ReplicaContext; use crate::sql::execute::SqlResult; use crate::subscription::module_subscription_actor::ModuleSubscriptions; -use crate::subscription::module_subscription_manager::BroadcastError; pub use crate::subscription::module_subscription_manager::TransactionOffset; +use crate::subscription::module_subscription_manager::{from_tx_offset, BroadcastError}; use crate::subscription::row_list_builder_pool::{BsatnRowListBuilderPool, JsonRowListBuilderFakePool}; use crate::subscription::tx::DeltaTx; use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilderSource}; @@ -574,8 +574,8 @@ pub(crate) fn init_database( replica_ctx: &ReplicaContext, module_def: &ModuleDef, program: Program, - call_reducer: impl FnOnce(Option, CallReducerParams) -> (ReducerCallResult, bool), -) -> (anyhow::Result>, bool) { + call_reducer: impl FnOnce(Option, CallReducerParams) -> (ReducerCallResultWithTxOffset, bool), +) -> (anyhow::Result, bool) { extract_trapped(init_database_inner(replica_ctx, module_def, program, call_reducer)) } @@ -583,8 +583,8 @@ fn init_database_inner( replica_ctx: &ReplicaContext, module_def: &ModuleDef, program: Program, - call_reducer: impl FnOnce(Option, CallReducerParams) -> (ReducerCallResult, bool), -) -> anyhow::Result<(Option, bool)> { + call_reducer: impl FnOnce(Option, CallReducerParams) -> (ReducerCallResultWithTxOffset, bool), +) -> anyhow::Result<(InitDatabaseResult, bool)> { log::debug!("init database"); let timestamp = Timestamp::now(); let stdb = replica_ctx.relational_db(); @@ -632,17 +632,30 @@ fn init_database_inner( let rcr = match module_def.lifecycle_reducer(Lifecycle::Init) { None => { - if let Some((_tx_offset, tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? { - stdb.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data)); - } - (None, false) + let (tx_offset, tx_data, tx_metrics, reducer) = stdb + .commit_tx(tx)? + .context("database initialization did not commit a transaction")?; + stdb.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data)); + ( + InitDatabaseResult { + reducer: None, + tx_offset: from_tx_offset(tx_offset), + }, + false, + ) } Some((reducer_id, _)) => { logger.info("Invoking `init` reducer"); let params = CallReducerParams::from_system(timestamp, owner_identity, reducer_id, ArgsTuple::nullary()); let (res, trapped) = call_reducer(Some(tx), params); - (Some(res), trapped) + ( + InitDatabaseResult { + reducer: Some(res.result), + tx_offset: res.tx_offset, + }, + trapped, + ) } }; @@ -650,6 +663,11 @@ fn init_database_inner( Ok(rcr) } +pub struct InitDatabaseResult { + pub reducer: Option, + pub tx_offset: TransactionOffset, +} + pub fn call_identity_connected( caller_auth: ConnectionAuthCtx, caller_connection_id: ConnectionId, @@ -3063,7 +3081,7 @@ impl ModuleHost { instance.common.call_view_with_tx(tx, params, instance.instance) } - pub async fn init_database(&self, program: Program) -> Result, InitDatabaseError> { + pub async fn init_database(&self, program: Program) -> Result { call_instance!( self, "", diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 5004ccd29c7..52abf373e48 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -85,7 +85,7 @@ use crate::host::wasm_common::module_host_actor::{ ReducerExecuteResult, ReducerOp, ViewExecuteResult, ViewOp, WasmInstance, }; use crate::host::wasm_common::{RowIters, TimingSpanSet}; -use crate::host::{ModuleHost, ReducerCallError, ReducerCallResult, Scheduler}; +use crate::host::{InitDatabaseResult, ModuleHost, ReducerCallError, ReducerCallResult, Scheduler}; use crate::messages::control_db::HostType; use crate::module_host_context::ModuleCreationContext; use crate::replica_context::ReplicaContext; @@ -535,7 +535,7 @@ impl JsMainInstance { self.request(DisconnectClientRequest { client_id }).await } - pub async fn init_database(&self, program: Program) -> anyhow::Result> { + pub async fn init_database(&self, program: Program) -> anyhow::Result { self.request(InitDatabaseRequest { program }).await } @@ -662,7 +662,7 @@ js_main_request! { js_main_request! { InitDatabaseRequest { program: Program, - } => "init_database", anyhow::Result>, InitDatabase + } => "init_database", anyhow::Result, InitDatabase } js_main_request! { @@ -862,7 +862,7 @@ enum JsMainWorkerRequest { }, /// See [`JsMainInstance::init_database`]. InitDatabase { - reply_tx: JsReplyTx>>, + reply_tx: JsReplyTx>, program: Program, }, } @@ -1489,8 +1489,8 @@ fn handle_main_worker_request( } JsMainWorkerRequest::InitDatabase { reply_tx, program } => { handle_worker_request("init_database", reply_tx, || { - let call_reducer = |tx, params| instance_common.call_reducer_with_tx(tx, params, inst); - let (res, trapped): (Result, anyhow::Error>, bool) = + let call_reducer = |tx, params| instance_common.call_reducer_with_tx_offset(tx, params, inst); + let (res, trapped): (Result, bool) = init_database(replica_ctx, &info.module_def, program, call_reducer); (res, trapped) }) diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index a800a3654b5..39aa30c3c78 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -16,8 +16,8 @@ use crate::host::module_host::{ }; use crate::host::scheduler::{CallScheduledFunctionResult, ScheduledFunctionParams}; use crate::host::{ - ArgsTuple, ModuleHost, ProcedureCallError, ProcedureCallResult, ReducerCallError, ReducerCallResult, ReducerId, - ReducerOutcome, Scheduler, UpdateDatabaseResult, + ArgsTuple, InitDatabaseResult, ModuleHost, ProcedureCallError, ProcedureCallResult, ReducerCallError, + ReducerCallResult, ReducerCallResultWithTxOffset, ReducerId, ReducerOutcome, Scheduler, UpdateDatabaseResult, }; use crate::identity::Identity; use crate::messages::control_db::HostType; @@ -543,10 +543,10 @@ impl WasmModuleInstance { res } - pub fn init_database(&mut self, program: Program) -> anyhow::Result> { + pub fn init_database(&mut self, program: Program) -> anyhow::Result { let module_def = &self.common.info.clone().module_def; let replica_ctx = &self.instance.replica_ctx().clone(); - let call_reducer = |tx, params| self.call_reducer_with_tx(tx, params); + let call_reducer = |tx, params| self.call_reducer_with_tx_offset(tx, params); let (res, trapped) = init_database(replica_ctx, module_def, program, call_reducer); self.trapped = trapped; res @@ -589,8 +589,18 @@ impl WasmModuleInstance { impl WasmModuleInstance { #[tracing::instrument(level = "trace", skip_all)] fn call_reducer_with_tx(&mut self, tx: Option, params: CallReducerParams) -> (ReducerCallResult, bool) { + let (res, trapped) = self.call_reducer_with_tx_offset(tx, params); + (res.result, trapped) + } + + #[tracing::instrument(level = "trace", skip_all)] + fn call_reducer_with_tx_offset( + &mut self, + tx: Option, + params: CallReducerParams, + ) -> (ReducerCallResultWithTxOffset, bool) { crate::callgrind_flag::invoke_allowing_callgrind(|| { - self.common.call_reducer_with_tx(tx, params, &mut self.instance) + self.common.call_reducer_with_tx_offset(tx, params, &mut self.instance) }) } @@ -946,6 +956,16 @@ impl InstanceCommon { params: CallReducerParams, inst: &mut I, ) -> (ReducerCallResult, bool) { + let (res, trapped) = self.call_reducer_with_tx_offset(tx, params, inst); + (res.result, trapped) + } + + pub(crate) fn call_reducer_with_tx_offset( + &mut self, + tx: Option, + params: CallReducerParams, + inst: &mut I, + ) -> (ReducerCallResultWithTxOffset, bool) { let CallReducerParams { timestamp, caller_identity, @@ -1083,7 +1103,8 @@ impl InstanceCommon { request_id, timer, }; - let event = commit_and_broadcast_event(&info.subscriptions, client, event, out.tx).event; + let CommitAndBroadcastEventSuccess { event, tx_offset, .. } = + commit_and_broadcast_event(&info.subscriptions, client, event, out.tx); let res = ReducerCallResult { outcome: ReducerOutcome::from(&event.status), @@ -1091,7 +1112,7 @@ impl InstanceCommon { execution_duration: total_duration, }; - (res, trapped) + (ReducerCallResultWithTxOffset { result: res, tx_offset }, trapped) } fn handle_outer_error(&mut self, energy: &EnergyStats, reducer_name: &str) -> EventStatus {