diff --git a/crates/core/src/sync/diagnostics.rs b/crates/core/src/sync/diagnostics.rs new file mode 100644 index 0000000..bd8a638 --- /dev/null +++ b/crates/core/src/sync/diagnostics.rs @@ -0,0 +1,256 @@ +use alloc::vec; +use alloc::{ + borrow::Cow, + collections::btree_map::BTreeMap, + string::{String, ToString}, + vec::Vec, +}; +use serde::{ + Deserialize, Deserializer, Serialize, + de::{IgnoredAny, Visitor}, +}; + +use crate::sync::{ + interface::{Instruction, StartSyncStream}, + line::{DataLine, OplogData, SyncLineStr}, + sync_status::{BucketProgress, DownloadSyncStatus}, +}; + +#[derive(Deserialize)] +pub struct DiagnosticOptions { + // currently empty, we enable diagnostics if an Option is Some() +} + +#[derive(Serialize)] +pub enum DiagnosticsEvent { + BucketStateChange { + changes: Vec, + incremental: bool, + }, + SchemaChange(ObservedSchemaType), +} + +#[derive(Serialize)] +pub struct BucketDownloadState { + pub name: String, + pub progress: BucketProgress, +} + +#[derive(Serialize)] +pub struct ObservedSchemaType { + pub table: String, + pub column: String, + pub value_type: ValueType, +} + +#[derive(Serialize, Clone, Copy, PartialEq)] +pub enum ValueType { + Null, + String, + Integer, + Real, +} + +#[derive(Default)] +pub struct DiagnosticsCollector { + inferred_schema: BTreeMap>, +} + +impl DiagnosticsCollector { + pub fn for_options(options: &StartSyncStream) -> Option { + options.diagnostics.as_ref().map(|_| Self::default()) + } + + pub fn handle_tracking_checkpoint( + &self, + status: &DownloadSyncStatus, + instructions: &mut Vec, + ) { + let mut buckets = vec![]; + if let Some(downloading) = &status.downloading { + for (name, progress) in &downloading.buckets { + buckets.push(BucketDownloadState { + name: name.clone(), + progress: progress.clone(), + }); + } + } + + instructions.push(Instruction::HandleDiagnostics( + DiagnosticsEvent::BucketStateChange { + changes: buckets, + incremental: false, + }, + )); + } + + /// Updates the internal inferred schema with types from the handled data line. + /// + /// Emits a diagnostic line for each changed column. + pub fn handle_data_line<'a>( + &mut self, + line: &'a DataLine<'a>, + status: &DownloadSyncStatus, + instructions: &mut Vec, + ) { + if let Some(download_status) = &status.downloading { + if let Some(progress) = download_status.buckets.get(line.bucket.as_ref()) { + let mut changes = vec![]; + changes.push(BucketDownloadState { + name: line.bucket.to_string(), + progress: progress.clone(), + }); + + instructions.push(Instruction::HandleDiagnostics( + DiagnosticsEvent::BucketStateChange { + changes, + incremental: true, + }, + )); + } + } + + for op in &line.data { + if let (Some(data), Some(object_type)) = (&op.data, &op.object_type) { + let OplogData::Json { data } = data; + let table = self + .inferred_schema + .entry(object_type.to_string()) + .or_default(); + + let mut de = serde_json::Deserializer::from_str(data); + + struct TypeInferringVisitor<'a> { + table_name: &'a str, + table: &'a mut BTreeMap, + instructions: &'a mut Vec, + } + + impl TypeInferringVisitor<'_> { + fn observe_type<'a>(&mut self, name: Cow<'a, str>, column_type: ValueType) { + if column_type == ValueType::Null { + // We don't track nullability in the inferred schema. + return; + } + + if let Some(existing) = self.table.get_mut(name.as_ref()) { + if *existing != column_type && *existing != ValueType::String { + *existing = column_type; + + self.instructions.push(Instruction::HandleDiagnostics( + DiagnosticsEvent::SchemaChange(ObservedSchemaType { + table: self.table_name.to_string(), + column: name.into_owned(), + value_type: column_type, + }), + )); + } + } else { + let name = name.into_owned(); + self.table.insert(name.clone(), column_type); + + self.instructions.push(Instruction::HandleDiagnostics( + DiagnosticsEvent::SchemaChange(ObservedSchemaType { + table: self.table_name.to_string(), + column: name, + value_type: column_type, + }), + )); + } + } + } + + impl<'de> Visitor<'de> for TypeInferringVisitor<'de> { + type Value = (); + + fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result { + write!(formatter, "a map") + } + + fn visit_map(mut self, mut map: A) -> Result + where + A: serde::de::MapAccess<'de>, + { + while let Some(k) = map.next_key::>()? { + if k == "id" { + map.next_value::()?; + } else { + let value_type = map.next_value::()?.0; + self.observe_type(k, value_type); + } + } + + Ok(()) + } + } + + let _ = de.deserialize_map(TypeInferringVisitor { + table_name: object_type, + table, + instructions, + }); + } + } + } +} + +/// Utility to deserialize the [ValueType] from a [serde_json::Value] without reading it into a +/// structure that requires allocation. +struct ValueToValueType(ValueType); + +impl<'de> Deserialize<'de> for ValueToValueType { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct ValueTypeVisitor; + + impl<'de> Visitor<'de> for ValueTypeVisitor { + type Value = ValueType; + + fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result { + write!(formatter, "a sync value") + } + + fn visit_f64(self, _v: f64) -> Result + where + E: serde::de::Error, + { + Ok(ValueType::Real) + } + + fn visit_u64(self, _v: u64) -> Result + where + E: serde::de::Error, + { + Ok(ValueType::Integer) + } + + fn visit_i64(self, _v: i64) -> Result + where + E: serde::de::Error, + { + Ok(ValueType::Integer) + } + + fn visit_str(self, _v: &str) -> Result + where + E: serde::de::Error, + { + Ok(ValueType::String) + } + + fn visit_unit(self) -> Result + where + E: serde::de::Error, + { + // Unit is used to represent nulls, see https://github.com/serde-rs/json/blob/4f6dbfac79647d032b0997b5ab73022340c6dab7/src/de.rs#L1404-L1409 + Ok(ValueType::Null) + } + } + + Ok(ValueToValueType( + deserializer.deserialize_any(ValueTypeVisitor)?, + )) + } +} diff --git a/crates/core/src/sync/interface.rs b/crates/core/src/sync/interface.rs index 8fea4bf..7dd86b3 100644 --- a/crates/core/src/sync/interface.rs +++ b/crates/core/src/sync/interface.rs @@ -8,6 +8,7 @@ use crate::create_sqlite_text_fn; use crate::error::PowerSyncError; use crate::schema::Schema; use crate::state::DatabaseState; +use crate::sync::diagnostics::{DiagnosticOptions, DiagnosticsEvent}; use crate::sync::storage_adapter::StorageAdapter; use crate::sync::subscriptions::{StreamKey, apply_subscriptions}; use alloc::borrow::Cow; @@ -43,6 +44,10 @@ pub struct StartSyncStream { pub active_streams: Rc>, #[serde(default)] pub app_metadata: Option>, + + /// Whether sync diagnostics with detailed download stats and inferred schema should be reported + /// by the sync client. + pub diagnostics: Option, } impl StartSyncStream { @@ -59,6 +64,7 @@ impl Default for StartSyncStream { include_defaults: Self::include_defaults_by_default(), active_streams: Default::default(), app_metadata: Default::default(), + diagnostics: Default::default(), } } } @@ -138,6 +144,11 @@ pub enum Instruction { FlushFileSystem {}, /// Notify that a sync has been completed, prompting client SDKs to clear earlier errors. DidCompleteSync {}, + + /// Handle a diagnostic event. + /// + /// This instruction is only emitted if diagnostics have been enabled onĀ [StartSyncStream]. + HandleDiagnostics(DiagnosticsEvent), } #[derive(Serialize, Default)] diff --git a/crates/core/src/sync/line.rs b/crates/core/src/sync/line.rs index e4853b3..957e3f1 100644 --- a/crates/core/src/sync/line.rs +++ b/crates/core/src/sync/line.rs @@ -13,7 +13,7 @@ use super::bucket_priority::BucketPriority; /// With the JSON decoder, borrowing from input data is only possible when the string contains no /// escape sequences (otherwise, the string is not a direct view of input data and we need an /// internal copy). -type SyncLineStr<'a> = Cow<'a, str>; +pub type SyncLineStr<'a> = Cow<'a, str>; #[derive(Debug)] diff --git a/crates/core/src/sync/mod.rs b/crates/core/src/sync/mod.rs index 47b137f..3b74674 100644 --- a/crates/core/src/sync/mod.rs +++ b/crates/core/src/sync/mod.rs @@ -4,6 +4,7 @@ use powersync_sqlite_nostd::{self as sqlite, ResultCode}; mod bucket_priority; pub mod checkpoint; mod checksum; +mod diagnostics; mod interface; pub mod line; pub mod operations; diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index de5f27b..c32e1b1 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -25,6 +25,7 @@ use crate::{ sync::{ BucketPriority, checkpoint::OwnedBucketChecksum, + diagnostics::DiagnosticsCollector, interface::{CloseSyncStream, StartSyncStream, StreamSubscriptionRequest}, line::{ BucketSubscriptionReason, DataLine, StreamDescription, StreamSubscriptionError, @@ -149,11 +150,12 @@ impl SyncIterationHandle { ) -> Result { let runner = StreamingSyncIteration { db, + validated_but_not_applied: None, + diagnostics: DiagnosticsCollector::for_options(&options), options, state, adapter: StorageAdapter::new(db)?, status: SyncStatusContainer::new(), - validated_but_not_applied: None, }; let future = runner.run().boxed_local(); @@ -232,6 +234,7 @@ struct StreamingSyncIteration { // pending local data. We will retry applying this checkpoint when the client SDK informs us // that it has finished uploading changes. validated_but_not_applied: Option, + diagnostics: Option, } impl StreamingSyncIteration { @@ -455,10 +458,20 @@ impl StreamingSyncIteration { // something worth doing. self.validated_but_not_applied = None; *target = updated_target; + + if let Some(diagnostics) = &self.diagnostics { + let status = self.status.inner().borrow(); + diagnostics.handle_tracking_checkpoint(&*status, &mut event.instructions); + } } SyncStateMachineTransition::DataLineSaved { line } => { self.status .update(|s| s.track_line(&line), &mut event.instructions); + + if let Some(diagnostics) = &mut self.diagnostics { + let status = self.status.inner().borrow(); + diagnostics.handle_data_line(line, &*status, &mut event.instructions); + } } SyncStateMachineTransition::CloseIteration(close) => return Some(close), SyncStateMachineTransition::SyncLocalFailedDueToPendingCrud { diff --git a/crates/core/src/sync/sync_status.rs b/crates/core/src/sync/sync_status.rs index 1f00614..42f2fdd 100644 --- a/crates/core/src/sync/sync_status.rs +++ b/crates/core/src/sync/sync_status.rs @@ -267,7 +267,7 @@ pub struct SyncPriorityStatus { } /// Per-bucket download progress information. -#[derive(Serialize, Hash)] +#[derive(Serialize, Hash, Clone)] pub struct BucketProgress { pub priority: BucketPriority, pub at_last: i64, @@ -277,7 +277,7 @@ pub struct BucketProgress { #[derive(Hash)] pub struct SyncDownloadProgress { - buckets: BTreeMap, + pub buckets: BTreeMap, } impl Serialize for SyncDownloadProgress { diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index 8d0847d..deb6a15 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -1388,9 +1388,111 @@ CREATE TRIGGER users_ref_delete ..select('select powersync_init();'); invokeControl('start', null); expect(vfs.openFiles, isPositive); - db.dispose(); + db.close(); expect(vfs.openFiles, isZero); }); + + group('diagnostics', () { + test('infers schema', () { + invokeControl('start', json.encode({'diagnostics': {}})); + pushCheckpoint(buckets: priorityBuckets); + var instructions = pushSyncData('prio1', '1', 'row-0', 'PUT', { + 'text': 'text', + 'int': 3, + 'double': 3.14, + 'null': null, + }); + expect( + instructions, + containsAll( + [ + schemaChange('items', 'text', 'String'), + schemaChange('items', 'int', 'Integer'), + schemaChange('items', 'double', 'Real'), + ], + ), + ); + + instructions = pushSyncData('prio1', '2', 'row-1', 'PUT', { + 'text': null, + 'int': 3.123, + 'double': 1.23, + }); + + // Should only report the changed type + expect( + instructions.skip(2), + [ + schemaChange('items', 'int', 'Real'), + ], + ); + }); + + test('reports per-bucket progress', () { + invokeControl('start', json.encode({'diagnostics': {}})); + + var instructions = pushCheckpoint( + buckets: [ + bucketDescription('a', count: 1), + bucketDescription('b', count: 1), + ], + lastOpId: 10, + ); + expect( + instructions[1], + { + 'HandleDiagnostics': { + 'BucketStateChange': { + 'incremental': false, + 'changes': [ + { + 'name': 'a', + 'progress': { + 'priority': 3, + 'at_last': 0, + 'since_last': 0, + 'target_count': 1 + }, + }, + { + 'name': 'b', + 'progress': { + 'priority': 3, + 'at_last': 0, + 'since_last': 0, + 'target_count': 1 + }, + } + ] + } + } + }, + ); + + instructions = pushSyncData('a', '1', 'a', 'PUT', {'foo': 'bar'}); + expect( + instructions[1], + { + 'HandleDiagnostics': { + 'BucketStateChange': { + 'incremental': true, + 'changes': [ + { + 'name': 'a', + 'progress': { + 'priority': 3, + 'at_last': 0, + 'since_last': 1, + 'target_count': 1 + }, + } + ] + } + } + }, + ); + }); + }); } final priorityBuckets = [ diff --git a/dart/test/utils/test_utils.dart b/dart/test/utils/test_utils.dart index abd3e85..53ad01c 100644 --- a/dart/test/utils/test_utils.dart +++ b/dart/test/utils/test_utils.dart @@ -48,6 +48,18 @@ Object bucketDescription( }; } +Object schemaChange(String table, String column, String valueType) { + return { + 'HandleDiagnostics': { + 'SchemaChange': { + 'table': table, + 'column': column, + 'value_type': valueType + } + } + }; +} + Matcher isSqliteException(int code, dynamic message) { return isA() .having((e) => e.extendedResultCode, 'extendedResultCode', code)