diff --git a/crates/core/src/schema/table_info.rs b/crates/core/src/schema/table_info.rs index 5a6a77b..010cf73 100644 --- a/crates/core/src/schema/table_info.rs +++ b/crates/core/src/schema/table_info.rs @@ -1,4 +1,6 @@ -use alloc::{format, string::String, vec, vec::Vec}; +use alloc::string::ToString; +use alloc::vec; +use alloc::{collections::btree_set::BTreeSet, format, string::String, vec::Vec}; use serde::{Deserialize, de::Visitor}; #[derive(Deserialize)] @@ -252,16 +254,77 @@ impl<'de> Deserialize<'de> for TableInfoFlags { } } -#[derive(Deserialize)] pub struct PendingStatement { pub sql: String, /// This vec should contain an entry for each parameter in [sql]. pub params: Vec, + + /// Present if this statement has a [PendingStatementValue::Rest] parameter. + pub named_parameters_index: Option, +} + +pub struct RestColumnIndex { + /// All column names referenced by this statement. + pub named_parameters: BTreeSet, + /// Parameter indices that should be bound to a JSON object containing those values from the + /// source row that haven't been referenced by [PendingStatementValue::Column]. + pub rest_parameter_positions: Vec, +} + +impl<'de> Deserialize<'de> for PendingStatement { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + #[derive(Deserialize)] + struct PendingStatementSource { + pub sql: String, + /// This vec should contain an entry for each parameter in [sql]. + pub params: Vec, + } + + let source = PendingStatementSource::deserialize(deserializer)?; + let mut named_parameters_index = None; + if source + .params + .iter() + .any(|s| matches!(s, PendingStatementValue::Rest)) + { + let mut set = BTreeSet::new(); + let mut rest_parameter_positions = vec![]; + for (i, column) in source.params.iter().enumerate() { + set.insert(match column { + PendingStatementValue::Id => "id".to_string(), + PendingStatementValue::Column(name) => name.clone(), + PendingStatementValue::Rest => { + rest_parameter_positions.push(i); + continue; + } + }); + } + + named_parameters_index = Some(RestColumnIndex { + named_parameters: set, + rest_parameter_positions, + }); + } + + return Ok(Self { + sql: source.sql, + params: source.params, + named_parameters_index, + }); + } } #[derive(Deserialize)] pub enum PendingStatementValue { + /// Bind to the PowerSync row id of the affected row. Id, + /// Bind to the value of column in the synced row. Column(String), + /// Bind to a JSON object containing all columns from the synced row that haven't been matched + /// by other statement values. + Rest, // TODO: Stuff like a raw object of put data? } diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index bbaabe9..6796d0b 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -2,7 +2,8 @@ use alloc::collections::btree_map::BTreeMap; use alloc::format; use alloc::string::String; use alloc::vec::Vec; -use serde::Deserialize; +use serde::ser::SerializeMap; +use serde::{Deserialize, Serialize}; use crate::error::{PSResult, PowerSyncError}; use crate::schema::inspection::ExistingTable; @@ -153,7 +154,14 @@ impl<'a> SyncOperation<'a> { let stmt = raw.put_statement(self.db)?; let parsed: serde_json::Value = serde_json::from_str(data) .map_err(PowerSyncError::json_local_error)?; - stmt.bind_for_put(id, &parsed)?; + let json_object = parsed.as_object().ok_or_else(|| { + PowerSyncError::argument_error( + "expected oplog data to be an object", + ) + })?; + + let rest = stmt.render_rest_object(json_object)?; + stmt.bind_for_put(id, &json_object, &rest)?; stmt.exec(self.db, type_name, id, Some(&parsed))?; } Err(_) => { @@ -510,7 +518,7 @@ impl<'a> ParsedSchemaTable<'a> { struct PreparedPendingStatement<'a> { stmt: ManagedStmt, - params: &'a [PendingStatementValue], + definition: &'a PendingStatement, } impl<'a> PreparedPendingStatement<'a> { @@ -532,17 +540,64 @@ impl<'a> PreparedPendingStatement<'a> { Ok(Self { stmt, - params: &pending.params, + definition: pending, + }) + } + + pub fn render_rest_object( + &self, + json_data: &serde_json::Map, + ) -> Result, PowerSyncError> { + use serde_json::Value; + + let Some(ref index) = self.definition.named_parameters_index else { + return Ok(None); + }; + + struct UnmatchedValues<'a>(BTreeMap<&'a String, &'a Value>); + + impl<'a> Serialize for UnmatchedValues<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut map = serializer.serialize_map(Some(self.0.len()))?; + + for (k, v) in &self.0 { + map.serialize_entry(k, v)?; + } + + map.end() + } + } + + let mut unmatched_values: Option = None; + for (key, value) in json_data { + if !index.named_parameters.contains(key) { + unmatched_values + .get_or_insert_with(|| UnmatchedValues(BTreeMap::new())) + .0 + .insert(key, value); + } + } + + Ok(match unmatched_values { + None => None, + Some(unmatched) => { + Some(serde_json::to_string(&unmatched).map_err(|e| PowerSyncError::internal(e))?) + } }) } pub fn bind_for_put( &self, id: &str, - json_data: &serde_json::Value, + json_data: &serde_json::Map, + rest: &Option, ) -> Result<(), PowerSyncError> { use serde_json::Value; - for (i, source) in self.params.iter().enumerate() { + + for (i, source) in self.definition.params.iter().enumerate() { let i = (i + 1) as i32; match source { @@ -550,11 +605,7 @@ impl<'a> PreparedPendingStatement<'a> { self.stmt.bind_text(i, id, Destructor::STATIC)?; } PendingStatementValue::Column(column) => { - let parsed = json_data.as_object().ok_or_else(|| { - PowerSyncError::argument_error("expected oplog data to be an object") - })?; - - match parsed.get(column) { + match json_data.get(column) { Some(Value::Bool(value)) => { self.stmt.bind_int(i, if *value { 1 } else { 0 }) } @@ -573,6 +624,20 @@ impl<'a> PreparedPendingStatement<'a> { _ => self.stmt.bind_null(i), }?; } + PendingStatementValue::Rest => { + // These are bound later. + debug_assert!(self.definition.named_parameters_index.is_some()); + } + } + } + + if let Some(index) = &self.definition.named_parameters_index { + for target in &index.rest_parameter_positions { + let index = (*target + 1) as i32; + match rest { + None => self.stmt.bind_null(index), + Some(value) => self.stmt.bind_text(index, &*value, Destructor::STATIC), + }?; } } @@ -580,7 +645,7 @@ impl<'a> PreparedPendingStatement<'a> { } pub fn bind_for_delete(&self, id: &str) -> Result<(), PowerSyncError> { - for (i, source) in self.params.iter().enumerate() { + for (i, source) in self.definition.params.iter().enumerate() { if let PendingStatementValue::Id = source { self.stmt .bind_text((i + 1) as i32, id, Destructor::STATIC)?; diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index 8d0847d..c359d86 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -1197,6 +1197,67 @@ CREATE TRIGGER users_delete ); }); + test('rest column', () { + db.execute( + 'CREATE TABLE users (id TEXT NOT NULL, name TEXT, _rest TEXT)'); + invokeControl( + 'start', + json.encode({ + 'schema': { + 'tables': [], + 'raw_tables': [ + { + 'name': 'users', + 'put': { + 'sql': + 'INSERT OR REPLACE INTO users (id, name, _rest) VALUES (?, ?, ?);', + 'params': [ + 'Id', + {'Column': 'name'}, + 'Rest' + ], + }, + 'delete': { + 'sql': 'DELETE FROM users WHERE id = ?', + 'params': ['Id'], + }, + 'clear': 'DELETE FROM users;', + } + ] + } + }), + ); + + pushCheckpoint(buckets: [bucketDescription('a')]); + pushSyncData( + 'a', + '1', + 'user1', + 'PUT', + {'name': 'First user'}, + objectType: 'users', + ); + pushSyncData( + 'a', + '2', + 'user2', + 'PUT', + {'name': 'Second user', 'foo': 'bar', 'another': 3}, + objectType: 'users', + ); + pushCheckpointComplete(); + + final users = db.select('SELECT * FROM users;'); + expect(users, [ + {'id': 'user1', 'name': 'First user', '_rest': null}, + { + 'id': 'user2', + 'name': 'Second user', + '_rest': json.encode({'another': 3, 'foo': 'bar'}) + }, + ]); + }); + test('crud vtab', () { // This is mostly a test for the triggers, validating the suggestions we // give on https://docs.powersync.com/usage/use-case-examples/raw-tables#capture-local-writes-with-triggers