Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
# Created by https://www.toptal.com/developers/gitignore/api/rust,node,visualstudiocode
# Edit at https://www.toptal.com/developers/gitignore?templates=rust,node,visualstudiocode

perf.data
perf.data.old
flamegraph.html
flamegraphs/*.svg
flamegraphs/flamegraph.folded

Expand Down
80 changes: 66 additions & 14 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use spacetimedb_primitives::{ProcedureId, TableId, ViewFnPtr, ViewId};
use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type;
use spacetimedb_sats::{AlgebraicType, AlgebraicTypeRef, Deserialize, ProductValue, Typespace, WithTypespace};
use spacetimedb_schema::auto_migrate::{MigratePlan, MigrationPolicy, MigrationPolicyError};
use spacetimedb_schema::def::deserialize::FunctionDef;
use spacetimedb_schema::def::{ModuleDef, ViewDef};
use spacetimedb_subscription::SubscriptionPlan;
use std::sync::Arc;
Expand Down Expand Up @@ -492,12 +493,17 @@ pub struct InstanceCommon {
energy_monitor: Arc<dyn EnergyMonitor>,
allocated_memory: usize,
metric_wasm_memory_bytes: IntGauge,
vm_metrics: AllVmMetrics,
}

impl InstanceCommon {
pub(crate) fn new(module: &ModuleCommon) -> Self {
let info = module.info();
let vm_metrics = AllVmMetrics::new(&info);

Self {
info: module.info(),
vm_metrics,
energy_monitor: module.energy_monitor(),
// Will be updated on the first reducer call.
allocated_memory: 0,
Expand Down Expand Up @@ -790,7 +796,6 @@ impl InstanceCommon {

let replica_ctx = inst.replica_ctx();
let stdb = &*replica_ctx.relational_db.clone();
let database_identity = replica_ctx.database_identity;
let info = self.info.clone();
let reducer_def = info.module_def.reducer_by_id(reducer_id);
let reducer_name = &*reducer_def.name;
Expand All @@ -812,17 +817,15 @@ impl InstanceCommon {
let tx = tx.unwrap_or_else(|| stdb.begin_mut_tx(IsolationLevel::Serializable, workload));
let mut tx_slot = inst.tx_slot();

let vm_metrics = VmMetrics::new(&database_identity, reducer_name);
let vm_metrics = self.vm_metrics.get_for_reducer_id(reducer_id);
let _guard = vm_metrics.timer_guard_for_reducer_plus_query(tx.timer);

let (mut tx, result) = tx_slot.set(tx, || {
self.call_function(caller_identity, reducer_name, |budget| inst.call_reducer(op, budget))
});

// Report the reducer execution metrics
vm_metrics.report_energy_used(result.stats.energy_used());
vm_metrics.report_total_duration(result.stats.total_duration());
vm_metrics.report_abi_duration(result.stats.abi_duration());
// Report execution metrics on each reducer call.
vm_metrics.report(&result.stats);

// An outer error occurred.
// This signifies a logic error in the module rather than a properly
Expand Down Expand Up @@ -855,7 +858,7 @@ impl InstanceCommon {
// We handle OnConnect events before running the reducer.
let res = match reducer_def.lifecycle {
Some(Lifecycle::OnDisconnect) => {
tx.delete_st_client(caller_identity, caller_connection_id, database_identity)
tx.delete_st_client(caller_identity, caller_connection_id, info.database_identity)
}
_ => Ok(()),
};
Expand Down Expand Up @@ -1117,13 +1120,8 @@ impl InstanceCommon {
})
});

let replica_ctx = inst.replica_ctx();
let stdb = &*replica_ctx.relational_db.clone();
let database_identity = replica_ctx.database_identity;
let vm_metrics = VmMetrics::new(&database_identity, &view_name);

// Report execution metrics on each view call
vm_metrics.report(&result.stats);
// Report execution metrics on each view call.
self.vm_metrics.get_for_view_id(view_id).report(&result.stats);

let trapped = matches!(result.call_result, Err(ExecutionError::Trap(_)));

Expand Down Expand Up @@ -1166,6 +1164,8 @@ impl InstanceCommon {
.context("Error executing raw SQL returned by view".to_string())?,
};

let replica_ctx = inst.replica_ctx();
let stdb = &*replica_ctx.relational_db.clone();
let res = match sender {
Some(sender) => stdb.materialize_view(&mut tx, table_id, sender, rows),
None => stdb.materialize_anonymous_view(&mut tx, table_id, rows),
Expand Down Expand Up @@ -1338,7 +1338,59 @@ impl InstanceCommon {
crate::host::scheduler::call_scheduled_function(&self.info.clone(), params, self, inst).await
}
}

/// Pre-fetched VM metrics counters for all reducers and views in a module.
struct AllVmMetrics {
// We use a `Vec` here as the number of reducers + views
// will likely be lower than e.g., 128, which would take up a page (4096 / 32).
// TODO(perf, centril): Define a `VecMapWithFallback<N>`
// that falls back to `HashMap` when exceeding `N` entries.
// This could be useful elsewhere for e.g., TableId => X maps and similar.
counters: Vec<VmMetrics>,
num_reducers: u32,
}

impl AllVmMetrics {
/// Pre-fetch all vm metrics counters for the module in `info`.
fn new(info: &ModuleInfo) -> Self {
// These are the reducers:
let def = &info.module_def;
let reducers = def.reducer_ids_and_defs();
let num_reducers = reducers.len() as u32;
let reducers = reducers.map(|(_, def)| def.name());

// These are the views:
let views = def.views().map(|def| def.name());

// Pre-fetch the metrics for both:
let counters = reducers
.chain(views)
.map(|name| VmMetrics::new(&info.database_identity, name))
.collect();

Self { counters, num_reducers }
}

fn get_for_index(&self, index: u32) -> VmMetrics {
self.counters[index as usize].clone()
}

/// Returns the vm metrics counters for `id`,
/// or panics if `id` was not pre-fetched in [`AllVmMetrics::new`].
fn get_for_reducer_id(&self, id: ReducerId) -> VmMetrics {
self.get_for_index(id.0)
}

/// Returns the vm metrics counters for `id`,
/// or panics if `id` was not pre-fetched in [`AllVmMetrics::new`].
fn get_for_view_id(&self, id: ViewId) -> VmMetrics {
// Counters for the first view starts after counters for the last reducer.
self.get_for_index(self.num_reducers + id.0)
}
}

/// VM-related metrics for reducer execution.
#[derive(Clone)]
struct VmMetrics {
/// The time spent executing a reducer + plus evaluating its subscription queries.
reducer_plus_query_duration: Histogram,
Expand Down
5 changes: 5 additions & 0 deletions crates/schema/src/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ impl ModuleDef {
self.reducers.values()
}

/// Returns an iterator over all reducer ids and definitions.
pub fn reducer_ids_and_defs(&self) -> impl ExactSizeIterator<Item = (ReducerId, &ReducerDef)> {
self.reducers.values().enumerate().map(|(idx, def)| (idx.into(), def))
}

/// The procedures of the module definition.
pub fn procedures(&self) -> impl Iterator<Item = &ProcedureDef> {
self.procedures.values()
Expand Down
30 changes: 30 additions & 0 deletions d3-flamegraph-base.html

Large diffs are not rendered by default.

Loading