Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 155 additions & 20 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::module_host::{EventStatus, ModuleHost, ModuleInfo, NoSuchModule};
use super::module_host::{DurableOffset, EventStatus, InitDatabaseResult, ModuleHost, ModuleInfo, NoSuchModule};
use super::scheduler::SchedulerStarter;
use super::v8::V8HeapMetrics;
use super::wasmtime::{WasmMemoryBytesMetric, WasmtimeRuntime};
Expand Down Expand Up @@ -89,6 +89,83 @@ where

pub type ProgramStorage = Arc<dyn ExternalStorage>;

/// A launched module host plus any pending controldb program-bootstrap completion work.
pub struct ModuleHostWithBootstrap {
pub module: ModuleHost,
pub bootstrap_completion: Option<BootstrapCompletion>,
}

/// A handle describing when program bootstrap can be marked complete.
///
/// The handle owns any wait state needed to prove that the database has a
/// durable `st_module` row.
pub struct BootstrapCompletion {
program_hash: Hash,
status: ProgramBootstrap,
}

/// Has the module been bootstrapped from the controldb?
///
/// Once we have inserted into `st_module`, bootstrapping is no longer necessary,
/// and the initial program bytes can be dropped fromt the controldb.
enum ProgramBootstrap {
/// The module's program bytes have already been written to `st_module`
Durable,
/// The module was bootstrapped from the controldb and the `st_module` write is not yet durable
Pending {
tx_offset: TransactionOffset,
durable_offset: Option<DurableOffset>,
},
}

impl BootstrapCompletion {
fn durable(program_hash: Hash) -> Self {
Self {
program_hash,
status: ProgramBootstrap::Durable,
}
}

fn pending(program_hash: Hash, tx_offset: TransactionOffset, durable_offset: Option<DurableOffset>) -> Self {
Self {
program_hash,
status: ProgramBootstrap::Pending {
tx_offset,
durable_offset,
},
}
}

pub fn program_hash(&self) -> Hash {
self.program_hash
}

/// Wait until it is safe to complete program bootstrap in controldb.
/// That is, wait until the initial `st_module` insert becomes durable.
pub async fn wait(self) -> anyhow::Result<Hash> {
match self.status {
ProgramBootstrap::Durable => Ok(self.program_hash),
ProgramBootstrap::Pending {
tx_offset,
durable_offset,
} => {
let tx_offset = tx_offset
.await
.context("failed waiting for initialized program transaction offset")?;

if let Some(mut durable_offset) = durable_offset {
durable_offset
.wait_for(tx_offset)
.await
.context("failed waiting for initialized program to become durable")?;
}

Ok(self.program_hash)
}
}
}
}

/// A host controller manages the lifecycle of spacetime databases and their
/// associated modules.
///
Expand Down Expand Up @@ -155,6 +232,11 @@ pub struct ReducerCallResult {
pub execution_duration: Duration,
}

pub struct ReducerCallResultWithTxOffset {
pub result: ReducerCallResult,
pub tx_offset: TransactionOffset,
}

impl ReducerCallResult {
pub fn is_err(&self) -> bool {
self.outcome.is_err()
Expand Down Expand Up @@ -275,11 +357,30 @@ impl HostController {
/// See also: [`Self::get_module_host`]
#[tracing::instrument(level = "trace", skip_all)]
pub async fn get_or_launch_module_host(&self, database: Database, replica_id: u64) -> anyhow::Result<ModuleHost> {
let mut rx = self.watch_maybe_launch_module_host(database, replica_id).await?;
let (mut rx, _) = self
.watch_maybe_launch_module_host_with_bootstrap(database, replica_id)
.await?;
let module = rx.borrow_and_update();
Ok(module.clone())
}

/// Like [`Self::get_or_launch_module_host`], but returns a bootstrap completion waiter.
#[tracing::instrument(level = "trace", skip_all)]
pub async fn get_or_launch_module_host_with_bootstrap(
&self,
database: Database,
replica_id: u64,
) -> anyhow::Result<ModuleHostWithBootstrap> {
let (mut rx, bootstrap_completion) = self
.watch_maybe_launch_module_host_with_bootstrap(database, replica_id)
.await?;
let module = rx.borrow_and_update();
Ok(ModuleHostWithBootstrap {
module: module.clone(),
bootstrap_completion,
})
}

/// Like [`Self::get_or_launch_module_host`], use a [`ModuleHost`] managed
/// by this controller, or launch it if it is not running.
///
Expand All @@ -293,13 +394,24 @@ impl HostController {
database: Database,
replica_id: u64,
) -> anyhow::Result<watch::Receiver<ModuleHost>> {
self.watch_maybe_launch_module_host_with_bootstrap(database, replica_id)
.await
.map(|(rx, _)| rx)
}

/// Like [`Self::watch_maybe_launch_module_host`], but returns a bootstrap completion waiter.
async fn watch_maybe_launch_module_host_with_bootstrap(
&self,
database: Database,
replica_id: u64,
) -> anyhow::Result<(watch::Receiver<ModuleHost>, Option<BootstrapCompletion>)> {
// Try a read lock first.
{
if let Ok(guard) = self.acquire_read_lock(replica_id).await
&& let Some(host) = &*guard
{
trace!("cached host {}/{}", database.database_identity, replica_id);
return Ok(host.module.subscribe());
return Ok((host.module.subscribe(), None));
}
}

Expand All @@ -318,7 +430,7 @@ impl HostController {
database.database_identity,
replica_id
);
return Ok(host.module.subscribe());
return Ok((host.module.subscribe(), None));
}

trace!("launch host {}/{}", database.database_identity, replica_id);
Expand All @@ -339,12 +451,16 @@ impl HostController {
// Note that `tokio::spawn` only cancels its tasks when the runtime shuts down,
// at which point we won't be calling `try_init_host` again anyways.
let rx = tokio::spawn(async move {
let host = this.try_init_host(database, replica_id).await?;
let initialized = this.try_init_host(database, replica_id).await?;
let HostInit {
host,
bootstrap_completion,
} = initialized;

let rx = host.module.subscribe();
*guard = Some(host);

Ok::<_, anyhow::Error>(rx)
Ok::<_, anyhow::Error>((rx, bootstrap_completion))
})
.await??;

Expand Down Expand Up @@ -377,8 +493,8 @@ impl HostController {
)
.await?;

let call_result = launched.module_host.init_database(program).await?;
if let Some(call_result) = call_result {
let InitDatabaseResult { reducer, .. } = launched.module_host.init_database(program).await?;
if let Some(call_result) = reducer {
Result::from(call_result)?;
}

Expand Down Expand Up @@ -437,7 +553,7 @@ impl HostController {
let mut host = match guard.take() {
None => {
trace!("host not running, try_init");
this.try_init_host(database, replica_id).await?
this.try_init_host(database, replica_id).await?.host
}
Some(host) => {
trace!("host found, updating");
Expand Down Expand Up @@ -640,7 +756,7 @@ impl HostController {
timeout(Duration::from_secs(5), lock.read_owned()).await
}

async fn try_init_host(&self, database: Database, replica_id: u64) -> anyhow::Result<Host> {
async fn try_init_host(&self, database: Database, replica_id: u64) -> anyhow::Result<HostInit> {
let database_identity = database.database_identity;
Host::try_init(self, database, replica_id)
.await
Expand Down Expand Up @@ -759,6 +875,11 @@ struct LaunchedModule {
scheduler_starter: SchedulerStarter,
}

struct HostInit {
host: Host,
bootstrap_completion: Option<BootstrapCompletion>,
}

struct ModuleLauncher<F> {
database: Database,
replica_id: u64,
Expand Down Expand Up @@ -888,7 +1009,11 @@ impl Host {
/// Note that this does **not** run module initialization routines, but may
/// create on-disk artifacts if the host / database did not exist.
#[tracing::instrument(level = "debug", skip_all)]
async fn try_init(host_controller: &HostController, database: Database, replica_id: u64) -> anyhow::Result<Self> {
async fn try_init(
host_controller: &HostController,
database: Database,
replica_id: u64,
) -> anyhow::Result<HostInit> {
let HostController {
data_dir,
default_config: config,
Expand Down Expand Up @@ -982,6 +1107,7 @@ impl Host {
(program, true)
}
};
let mut bootstrap_completion = Some(BootstrapCompletion::durable(program.hash));

let relational_db = Arc::new(db);
let (program, launched) = match HostType::from(program.kind) {
Expand Down Expand Up @@ -1071,10 +1197,16 @@ impl Host {
};

if program_needs_init {
let call_result = launched.module_host.init_database(program).await?;
if let Some(call_result) = call_result {
let program_hash = program.hash;
let InitDatabaseResult { reducer, tx_offset } = launched.module_host.init_database(program).await?;
if let Some(call_result) = reducer {
Result::from(call_result)?;
}
bootstrap_completion = Some(BootstrapCompletion::pending(
program_hash,
tx_offset,
launched.module_host.durable_tx_offset(),
));
} else {
drop(program)
}
Expand Down Expand Up @@ -1113,13 +1245,16 @@ impl Host {

let module = watch::Sender::new(module_host);

Ok(Host {
module,
replica_ctx,
scheduler,
disk_metrics_recorder_task,
tx_metrics_recorder_task,
view_cleanup_task,
Ok(HostInit {
host: Host {
module,
replica_ctx,
scheduler,
disk_metrics_recorder_task,
tx_metrics_recorder_task,
view_cleanup_task,
},
bootstrap_completion,
})
}

Expand Down
9 changes: 6 additions & 3 deletions crates/core/src/host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ mod wasm_common;

pub use disk_storage::DiskStorage;
pub use host_controller::{
extract_schema, CallProcedureReturn, CallResult, ExternalDurability, ExternalStorage, HostController,
HostRuntimeConfig, MigratePlanResult, ProcedureCallResult, ProgramStorage, ReducerCallResult, ReducerOutcome,
extract_schema, BootstrapCompletion, CallProcedureReturn, CallResult, ExternalDurability, ExternalStorage,
HostController, HostRuntimeConfig, MigratePlanResult, ModuleHostWithBootstrap, ProcedureCallResult, ProgramStorage,
ReducerCallResult, ReducerCallResultWithTxOffset, ReducerOutcome,
};
pub use module_host::{
InitDatabaseResult, ModuleHost, NoSuchModule, ProcedureCallError, ReducerCallError, UpdateDatabaseResult,
};
pub use module_host::{ModuleHost, NoSuchModule, ProcedureCallError, ReducerCallError, UpdateDatabaseResult};
pub use scheduler::Scheduler;

/// Encoded arguments to a database function.
Expand Down
Loading
Loading