Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 256 additions & 0 deletions crates/core/src/sync/diagnostics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
use alloc::vec;
use alloc::{
borrow::Cow,
collections::btree_map::BTreeMap,
string::{String, ToString},
vec::Vec,
};
use serde::{
Deserialize, Deserializer, Serialize,
de::{IgnoredAny, Visitor},
};

use crate::sync::{
interface::{Instruction, StartSyncStream},
line::{DataLine, OplogData, SyncLineStr},
sync_status::{BucketProgress, DownloadSyncStatus},
};

#[derive(Deserialize)]
pub struct DiagnosticOptions {
// currently empty, we enable diagnostics if an Option<Self> is Some()
}

#[derive(Serialize)]
pub enum DiagnosticsEvent {
BucketStateChange {
changes: Vec<BucketDownloadState>,
incremental: bool,
},
SchemaChange(ObservedSchemaType),
}

#[derive(Serialize)]
pub struct BucketDownloadState {
pub name: String,
pub progress: BucketProgress,
}

#[derive(Serialize)]
pub struct ObservedSchemaType {
pub table: String,
pub column: String,
pub value_type: ValueType,
}

#[derive(Serialize, Clone, Copy, PartialEq)]
pub enum ValueType {
Null,
String,
Integer,
Real,
}

#[derive(Default)]
pub struct DiagnosticsCollector {
inferred_schema: BTreeMap<String, BTreeMap<String, ValueType>>,
}

impl DiagnosticsCollector {
pub fn for_options(options: &StartSyncStream) -> Option<Self> {
options.diagnostics.as_ref().map(|_| Self::default())
}

pub fn handle_tracking_checkpoint(
&self,
status: &DownloadSyncStatus,
instructions: &mut Vec<Instruction>,
) {
let mut buckets = vec![];
if let Some(downloading) = &status.downloading {
for (name, progress) in &downloading.buckets {
buckets.push(BucketDownloadState {
name: name.clone(),
progress: progress.clone(),
});
}
}

instructions.push(Instruction::HandleDiagnostics(
DiagnosticsEvent::BucketStateChange {
changes: buckets,
incremental: false,
},
));
}

/// Updates the internal inferred schema with types from the handled data line.
///
/// Emits a diagnostic line for each changed column.
pub fn handle_data_line<'a>(
&mut self,
line: &'a DataLine<'a>,
status: &DownloadSyncStatus,
instructions: &mut Vec<Instruction>,
) {
if let Some(download_status) = &status.downloading {
if let Some(progress) = download_status.buckets.get(line.bucket.as_ref()) {
let mut changes = vec![];
changes.push(BucketDownloadState {
name: line.bucket.to_string(),
progress: progress.clone(),
});

instructions.push(Instruction::HandleDiagnostics(
DiagnosticsEvent::BucketStateChange {
changes,
incremental: true,
},
));
}
}

for op in &line.data {
if let (Some(data), Some(object_type)) = (&op.data, &op.object_type) {
let OplogData::Json { data } = data;
let table = self
.inferred_schema
.entry(object_type.to_string())
.or_default();

let mut de = serde_json::Deserializer::from_str(data);

struct TypeInferringVisitor<'a> {
table_name: &'a str,
table: &'a mut BTreeMap<String, ValueType>,
instructions: &'a mut Vec<Instruction>,
}

impl TypeInferringVisitor<'_> {
fn observe_type<'a>(&mut self, name: Cow<'a, str>, column_type: ValueType) {
if column_type == ValueType::Null {
// We don't track nullability in the inferred schema.
return;
}

if let Some(existing) = self.table.get_mut(name.as_ref()) {
if *existing != column_type && *existing != ValueType::String {
*existing = column_type;

self.instructions.push(Instruction::HandleDiagnostics(
DiagnosticsEvent::SchemaChange(ObservedSchemaType {
table: self.table_name.to_string(),
column: name.into_owned(),
value_type: column_type,
}),
));
}
} else {
let name = name.into_owned();
self.table.insert(name.clone(), column_type);

self.instructions.push(Instruction::HandleDiagnostics(
DiagnosticsEvent::SchemaChange(ObservedSchemaType {
table: self.table_name.to_string(),
column: name,
value_type: column_type,
}),
));
}
}
}

impl<'de> Visitor<'de> for TypeInferringVisitor<'de> {
type Value = ();

fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result {
write!(formatter, "a map")
}

fn visit_map<A>(mut self, mut map: A) -> Result<Self::Value, A::Error>
where
A: serde::de::MapAccess<'de>,
{
while let Some(k) = map.next_key::<SyncLineStr<'de>>()? {
if k == "id" {
map.next_value::<IgnoredAny>()?;
} else {
let value_type = map.next_value::<ValueToValueType>()?.0;
self.observe_type(k, value_type);
}
}

Ok(())
}
}

let _ = de.deserialize_map(TypeInferringVisitor {
table_name: object_type,
table,
instructions,
});
}
}
}
}

/// Utility to deserialize the [ValueType] from a [serde_json::Value] without reading it into a
/// structure that requires allocation.
struct ValueToValueType(ValueType);

impl<'de> Deserialize<'de> for ValueToValueType {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct ValueTypeVisitor;

impl<'de> Visitor<'de> for ValueTypeVisitor {
type Value = ValueType;

fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result {
write!(formatter, "a sync value")
}

fn visit_f64<E>(self, _v: f64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(ValueType::Real)
}

fn visit_u64<E>(self, _v: u64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(ValueType::Integer)
}

fn visit_i64<E>(self, _v: i64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(ValueType::Integer)
}

fn visit_str<E>(self, _v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(ValueType::String)
}

fn visit_unit<E>(self) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
// Unit is used to represent nulls, see https://github.com/serde-rs/json/blob/4f6dbfac79647d032b0997b5ab73022340c6dab7/src/de.rs#L1404-L1409
Ok(ValueType::Null)
}
}

Ok(ValueToValueType(
deserializer.deserialize_any(ValueTypeVisitor)?,
))
}
}
11 changes: 11 additions & 0 deletions crates/core/src/sync/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::create_sqlite_text_fn;
use crate::error::PowerSyncError;
use crate::schema::Schema;
use crate::state::DatabaseState;
use crate::sync::diagnostics::{DiagnosticOptions, DiagnosticsEvent};
use crate::sync::storage_adapter::StorageAdapter;
use crate::sync::subscriptions::{StreamKey, apply_subscriptions};
use alloc::borrow::Cow;
Expand Down Expand Up @@ -43,6 +44,10 @@ pub struct StartSyncStream {
pub active_streams: Rc<Vec<StreamKey>>,
#[serde(default)]
pub app_metadata: Option<Box<RawValue>>,

/// Whether sync diagnostics with detailed download stats and inferred schema should be reported
/// by the sync client.
pub diagnostics: Option<DiagnosticOptions>,
}

impl StartSyncStream {
Expand All @@ -59,6 +64,7 @@ impl Default for StartSyncStream {
include_defaults: Self::include_defaults_by_default(),
active_streams: Default::default(),
app_metadata: Default::default(),
diagnostics: Default::default(),
}
}
}
Expand Down Expand Up @@ -138,6 +144,11 @@ pub enum Instruction {
FlushFileSystem {},
/// Notify that a sync has been completed, prompting client SDKs to clear earlier errors.
DidCompleteSync {},

/// Handle a diagnostic event.
///
/// This instruction is only emitted if diagnostics have been enabled on [StartSyncStream].
HandleDiagnostics(DiagnosticsEvent),
}

#[derive(Serialize, Default)]
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/sync/line.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use super::bucket_priority::BucketPriority;
/// With the JSON decoder, borrowing from input data is only possible when the string contains no
/// escape sequences (otherwise, the string is not a direct view of input data and we need an
/// internal copy).
type SyncLineStr<'a> = Cow<'a, str>;
pub type SyncLineStr<'a> = Cow<'a, str>;

#[derive(Debug)]

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use powersync_sqlite_nostd::{self as sqlite, ResultCode};
mod bucket_priority;
pub mod checkpoint;
mod checksum;
mod diagnostics;
mod interface;
pub mod line;
pub mod operations;
Expand Down
15 changes: 14 additions & 1 deletion crates/core/src/sync/streaming_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::{
sync::{
BucketPriority,
checkpoint::OwnedBucketChecksum,
diagnostics::DiagnosticsCollector,
interface::{CloseSyncStream, StartSyncStream, StreamSubscriptionRequest},
line::{
BucketSubscriptionReason, DataLine, StreamDescription, StreamSubscriptionError,
Expand Down Expand Up @@ -149,11 +150,12 @@ impl SyncIterationHandle {
) -> Result<Self, PowerSyncError> {
let runner = StreamingSyncIteration {
db,
validated_but_not_applied: None,
diagnostics: DiagnosticsCollector::for_options(&options),
options,
state,
adapter: StorageAdapter::new(db)?,
status: SyncStatusContainer::new(),
validated_but_not_applied: None,
};
let future = runner.run().boxed_local();

Expand Down Expand Up @@ -232,6 +234,7 @@ struct StreamingSyncIteration {
// pending local data. We will retry applying this checkpoint when the client SDK informs us
// that it has finished uploading changes.
validated_but_not_applied: Option<OwnedCheckpoint>,
diagnostics: Option<DiagnosticsCollector>,
}

impl StreamingSyncIteration {
Expand Down Expand Up @@ -455,10 +458,20 @@ impl StreamingSyncIteration {
// something worth doing.
self.validated_but_not_applied = None;
*target = updated_target;

if let Some(diagnostics) = &self.diagnostics {
let status = self.status.inner().borrow();
diagnostics.handle_tracking_checkpoint(&*status, &mut event.instructions);
}
}
SyncStateMachineTransition::DataLineSaved { line } => {
self.status
.update(|s| s.track_line(&line), &mut event.instructions);

if let Some(diagnostics) = &mut self.diagnostics {
let status = self.status.inner().borrow();
diagnostics.handle_data_line(line, &*status, &mut event.instructions);
}
}
SyncStateMachineTransition::CloseIteration(close) => return Some(close),
SyncStateMachineTransition::SyncLocalFailedDueToPendingCrud {
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/sync/sync_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ pub struct SyncPriorityStatus {
}

/// Per-bucket download progress information.
#[derive(Serialize, Hash)]
#[derive(Serialize, Hash, Clone)]
pub struct BucketProgress {
pub priority: BucketPriority,
pub at_last: i64,
Expand All @@ -277,7 +277,7 @@ pub struct BucketProgress {

#[derive(Hash)]
pub struct SyncDownloadProgress {
buckets: BTreeMap<String, BucketProgress>,
pub buckets: BTreeMap<String, BucketProgress>,
}

impl Serialize for SyncDownloadProgress {
Expand Down
Loading