From 256fb53facd18c71edebf06cb4805b85cad4ecc1 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 26 Jan 2026 18:02:25 +0100 Subject: [PATCH 1/3] Track download size --- crates/core/src/migrations.rs | 13 ++++- crates/core/src/operations.rs | 6 +- crates/core/src/sync/line.rs | 46 +++++++++++++++ crates/core/src/sync/operations.rs | 6 +- crates/core/src/sync/streaming_sync.rs | 19 +++---- dart/test/sync_test.dart | 16 ++++++ dart/test/utils/fix_035_fixtures.dart | 12 ++-- dart/test/utils/migration_fixtures.dart | 75 ++++++++++++++++++++++++- 8 files changed, 173 insertions(+), 20 deletions(-) diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index 56a05745..7a2c13ee 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -13,7 +13,7 @@ use crate::fix_data::apply_v035_fix; use crate::schema::inspection::ExistingView; use crate::sync::BucketPriority; -pub const LATEST_VERSION: i32 = 11; +pub const LATEST_VERSION: i32 = 12; pub fn powersync_migrate( ctx: *mut sqlite::context, @@ -413,5 +413,16 @@ json_object('sql', 'DELETE FROM ps_migration WHERE id >= 11') local_db.exec_safe(stmt).into_db_result(local_db)?; } + if current_version < 12 && target_version >= 12 { + let stmt = "\ +ALTER TABLE ps_buckets ADD COLUMN download_size INTEGER NOT NULL DEFAULT 0; +INSERT INTO ps_migration(id, down_migrations) VALUES(12, json_array( +json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN download_size'), +json_object('sql', 'DELETE FROM ps_migration WHERE id >= 12') +)); +"; + local_db.exec_safe(stmt).into_db_result(local_db)?; + } + Ok(()) } diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index 961c1205..01b02c4d 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -20,9 +20,13 @@ pub fn insert_operation(db: *mut sqlite::sqlite3, data: &str) -> Result<(), Powe let batch: BucketBatch = serde_json::from_str(data).map_err(PowerSyncError::as_argument_error)?; let adapter = StorageAdapter::new(db)?; + let assumed_bucket_length = data + .len() + .checked_div(batch.buckets.len()) + .unwrap_or_default(); for line in &batch.buckets { - insert_bucket_operations(&adapter, &line)?; + insert_bucket_operations(&adapter, &line, assumed_bucket_length)?; } Ok(()) diff --git a/crates/core/src/sync/line.rs b/crates/core/src/sync/line.rs index e4853b33..cd258406 100644 --- a/crates/core/src/sync/line.rs +++ b/crates/core/src/sync/line.rs @@ -6,6 +6,9 @@ use serde::Deserialize; use serde::de::{Error, IgnoredAny, VariantAccess, Visitor}; use serde_with::{DisplayFromStr, serde_as}; +use crate::bson; +use crate::error::PowerSyncError; + use super::Checksum; use super::bucket_priority::BucketPriority; @@ -15,6 +18,49 @@ use super::bucket_priority::BucketPriority; /// internal copy). type SyncLineStr<'a> = Cow<'a, str>; +#[derive(Clone, Copy)] +pub enum SyncLineSource<'a> { + /// Sync lines that have been decoded from JSON. + Text(&'a str), + + /// Sync lines that have been decoded from BSON. + Binary(&'a [u8]), +} + +impl<'a> SyncLineSource<'a> { + pub fn len(&self) -> usize { + match self { + SyncLineSource::Text(text) => text.len(), + SyncLineSource::Binary(binary) => binary.len(), + } + } +} + +pub struct SyncLineWithSource<'a> { + pub source: SyncLineSource<'a>, + pub line: SyncLine<'a>, +} + +impl<'a> SyncLineWithSource<'a> { + pub fn from_text(source: &'a str) -> Result { + let line = serde_json::from_str(source) + .map_err(|e| PowerSyncError::sync_protocol_error("invalid text line", e))?; + Ok(SyncLineWithSource { + source: SyncLineSource::Text(source), + line, + }) + } + + pub fn from_binary(source: &'a [u8]) -> Result { + let line = bson::from_bytes(source) + .map_err(|e| PowerSyncError::sync_protocol_error("invalid binary line", e))?; + Ok(SyncLineWithSource { + source: SyncLineSource::Binary(source), + line, + }) + } +} + #[derive(Debug)] pub enum SyncLine<'a> { diff --git a/crates/core/src/sync/operations.rs b/crates/core/src/sync/operations.rs index 3af102d5..f113991f 100644 --- a/crates/core/src/sync/operations.rs +++ b/crates/core/src/sync/operations.rs @@ -16,9 +16,11 @@ use super::{ storage_adapter::{BucketInfo, StorageAdapter}, }; +/// If known, `size` should be the size of the buffer from which data has been decoded in bytes. pub fn insert_bucket_operations( adapter: &StorageAdapter, data: &DataLine, + size: usize, ) -> Result<(), PowerSyncError> { let db = adapter.db; let BucketInfo { @@ -194,7 +196,8 @@ WHERE bucket = ?1", SET last_op = ?2, add_checksum = (add_checksum + ?3) & 0xffffffff, op_checksum = (op_checksum + ?4) & 0xffffffff, - count_since_last = count_since_last + ?5 + count_since_last = count_since_last + ?5, + download_size = download_size + ?6 WHERE id = ?1", )?; statement.bind_int64(1, bucket_id)?; @@ -202,6 +205,7 @@ WHERE bucket = ?1", statement.bind_int(3, add_checksum.bitcast_i32())?; statement.bind_int(4, op_checksum.bitcast_i32())?; statement.bind_int(5, added_ops)?; + statement.bind_int64(6, size as i64)?; statement.exec()?; } diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index de5f27b7..7eafe05a 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -18,7 +18,6 @@ use alloc::{ use futures_lite::FutureExt; use crate::{ - bson, error::{PowerSyncError, PowerSyncErrorCause}, kv::client_id, state::DatabaseState, @@ -28,7 +27,7 @@ use crate::{ interface::{CloseSyncStream, StartSyncStream, StreamSubscriptionRequest}, line::{ BucketSubscriptionReason, DataLine, StreamDescription, StreamSubscriptionError, - StreamSubscriptionErrorCause, + StreamSubscriptionErrorCause, SyncLineWithSource, }, subscriptions::LocallyTrackedSubscription, sync_status::{ActiveStreamSubscription, Timestamp}, @@ -268,8 +267,10 @@ impl StreamingSyncIteration { &self, target: &SyncTarget, event: &mut ActiveEvent, - line: &'a SyncLine<'a>, + line: &'a SyncLineWithSource<'a>, ) -> Result, PowerSyncError> { + let SyncLineWithSource { source, line } = line; + Ok(match line { SyncLine::Checkpoint(checkpoint) => { let (to_delete, updated_target) = target.track_checkpoint(&checkpoint); @@ -386,7 +387,7 @@ impl StreamingSyncIteration { } } SyncLine::Data(data_line) => { - insert_bucket_operations(&self.adapter, &data_line)?; + insert_bucket_operations(&self.adapter, &data_line, source.len())?; SyncStateMachineTransition::DataLineSaved { line: data_line } } SyncLine::KeepAlive(token) => { @@ -492,7 +493,7 @@ impl StreamingSyncIteration { &mut self, target: &mut SyncTarget, event: &mut ActiveEvent, - line: &SyncLine, + line: &SyncLineWithSource, ) -> Result, PowerSyncError> { let transition = self.prepare_handling_sync_line(target, event, line)?; Ok(self.apply_transition(target, event, transition)) @@ -506,7 +507,7 @@ impl StreamingSyncIteration { let hide_disconnect = loop { let event = Self::receive_event().await; - let line: SyncLine = match event.event { + let line: SyncLineWithSource = match event.event { SyncEvent::Initialize { .. } => { panic!("Initialize should only be emited once") } @@ -515,10 +516,8 @@ impl StreamingSyncIteration { .update(|s| s.disconnect(), &mut event.instructions); break false; } - SyncEvent::TextLine { data } => serde_json::from_str(data) - .map_err(|e| PowerSyncError::sync_protocol_error("invalid text line", e))?, - SyncEvent::BinaryLine { data } => bson::from_bytes(data) - .map_err(|e| PowerSyncError::sync_protocol_error("invalid binary line", e))?, + SyncEvent::TextLine { data } => SyncLineWithSource::from_text(data)?, + SyncEvent::BinaryLine { data } => SyncLineWithSource::from_binary(data)?, SyncEvent::UploadFinished => { self.try_applying_write_after_completed_upload(event)?; diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index 8d0847dd..cf327b38 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -1391,6 +1391,22 @@ CREATE TRIGGER users_ref_delete db.dispose(); expect(vfs.openFiles, isZero); }); + + test('tracks download size', () { + invokeControl('start', null); + pushCheckpoint(buckets: [bucketDescription('a', count: 2)]); + + void expectDownloadSize(int size) { + final [row] = db.select('SELECT download_size FROM ps_buckets'); + expect(row[0], size); + } + + pushSyncData('a', '1', 'row-0', 'PUT', {'col': 'hi'}); + expectDownloadSize(isBson ? 185 : 186); + + pushSyncData('a', '1', 'row-1', 'PUT', {'col': 'hi again'}); + expectDownloadSize(isBson ? 376 : 378); + }); } final priorityBuckets = [ diff --git a/dart/test/utils/fix_035_fixtures.dart b/dart/test/utils/fix_035_fixtures.dart index c88461da..9e043589 100644 --- a/dart/test/utils/fix_035_fixtures.dart +++ b/dart/test/utils/fix_035_fixtures.dart @@ -18,9 +18,9 @@ const dataBroken = ''' /// Data after applying the migration fix, but before sync_local const dataMigrated = ''' -;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last) VALUES - (1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0), - (2, 'b2', 0, 0, 0, 0, 3, 0, 0, 0) +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, download_size) VALUES + (1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0, 0), + (2, 'b2', 0, 0, 0, 0, 3, 0, 0, 0, 0) ;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES (1, 1, 'todos', 't1', '', '{}', 100), (1, 2, 'todos', 't2', '', '{}', 20), @@ -39,9 +39,9 @@ const dataMigrated = ''' /// Data after applying the migration fix and sync_local const dataFixed = ''' -;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last) VALUES - (1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0), - (2, 'b2', 0, 0, 0, 0, 3, 0, 0, 0) +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, download_size) VALUES + (1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0, 0), + (2, 'b2', 0, 0, 0, 0, 3, 0, 0, 0, 0) ;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES (1, 1, 'todos', 't1', '', '{}', 100), (1, 2, 'todos', 't2', '', '{}', 20), diff --git a/dart/test/utils/migration_fixtures.dart b/dart/test/utils/migration_fixtures.dart index 41640ec6..fcf3e374 100644 --- a/dart/test/utils/migration_fixtures.dart +++ b/dart/test/utils/migration_fixtures.dart @@ -1,5 +1,5 @@ /// The current database version -const databaseVersion = 11; +const databaseVersion = 12; /// This is the base database state that we expect at various schema versions. /// Generated by loading the specific library version, and exporting the schema. @@ -414,6 +414,67 @@ const expectedState = { ;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(10, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"DELETE FROM ps_migration WHERE id >= 10"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(11, '[{"sql":"DROP TABLE ps_stream_subscriptions"},{"sql":"DELETE FROM ps_migration WHERE id >= 11"}]') +''', + 12: r''' +;CREATE TABLE ps_buckets( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + last_applied_op INTEGER NOT NULL DEFAULT 0, + last_op INTEGER NOT NULL DEFAULT 0, + target_op INTEGER NOT NULL DEFAULT 0, + add_checksum INTEGER NOT NULL DEFAULT 0, + op_checksum INTEGER NOT NULL DEFAULT 0, + pending_delete INTEGER NOT NULL DEFAULT 0 +, count_at_last INTEGER NOT NULL DEFAULT 0, count_since_last INTEGER NOT NULL DEFAULT 0, download_size INTEGER NOT NULL DEFAULT 0) STRICT +;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER) +;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB) +;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT) +;CREATE TABLE ps_oplog( + bucket INTEGER NOT NULL, + op_id INTEGER NOT NULL, + row_type TEXT, + row_id TEXT, + key TEXT, + data TEXT, + hash INTEGER NOT NULL) STRICT +;CREATE TABLE ps_stream_subscriptions ( + id INTEGER NOT NULL PRIMARY KEY, + stream_name TEXT NOT NULL, + active INTEGER NOT NULL DEFAULT FALSE, + is_default INTEGER NOT NULL DEFAULT FALSE, + local_priority INTEGER, + local_params TEXT NOT NULL DEFAULT 'null', + ttl INTEGER, + expires_at INTEGER, + last_synced_at INTEGER, + UNIQUE (stream_name, local_params) +) STRICT +;CREATE TABLE ps_sync_state ( + priority INTEGER NOT NULL PRIMARY KEY, + last_synced_at TEXT NOT NULL +) STRICT +;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER) +;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id)) +;CREATE TABLE ps_updated_rows( + row_type TEXT, + row_id TEXT, + PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID +;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name) +;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key) +;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id) +;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id) +;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null) +;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = 2147483647"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(8, '[{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL,\n last_synced_at TEXT NOT NULL\n) STRICT"},{"sql":"INSERT INTO ps_sync_state SELECT * FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 8"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(10, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"DELETE FROM ps_migration WHERE id >= 10"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(11, '[{"sql":"DROP TABLE ps_stream_subscriptions"},{"sql":"DELETE FROM ps_migration WHERE id >= 11"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(12, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN download_size"},{"sql":"DELETE FROM ps_migration WHERE id >= 12"}]') ''', }; @@ -527,6 +588,17 @@ const data1 = { (2, 3, 'lists', 'l1', '', '{}', 3) ;INSERT INTO ps_updated_rows(row_type, row_id) VALUES ('lists', 'l2') +''', + 12: r''' +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, download_size) VALUES + (1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0, 0), + (2, 'b2', 0, 0, 0, 1005, 3, 0, 0, 0, 0) +;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES + (1, 1, 'todos', 't1', '', '{}', 100), + (1, 2, 'todos', 't2', '', '{}', 20), + (2, 3, 'lists', 'l1', '', '{}', 3) +;INSERT INTO ps_updated_rows(row_type, row_id) VALUES + ('lists', 'l2') ''' }; @@ -573,6 +645,7 @@ final dataDown1 = { 8: data1[5]!, 9: data1[9]!, 10: data1[9]!, + 11: data1[10]!, }; final finalData1 = data1[databaseVersion]!; From 555b46773e99c352306362bbba792b928091ab74 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 26 Jan 2026 20:44:42 +0100 Subject: [PATCH 2/3] Don't track for legacy oplog vtab --- crates/core/src/operations.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index 01b02c4d..adbeef03 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -20,13 +20,9 @@ pub fn insert_operation(db: *mut sqlite::sqlite3, data: &str) -> Result<(), Powe let batch: BucketBatch = serde_json::from_str(data).map_err(PowerSyncError::as_argument_error)?; let adapter = StorageAdapter::new(db)?; - let assumed_bucket_length = data - .len() - .checked_div(batch.buckets.len()) - .unwrap_or_default(); for line in &batch.buckets { - insert_bucket_operations(&adapter, &line, assumed_bucket_length)?; + insert_bucket_operations(&adapter, &line, 0)?; } Ok(()) From 77e8f053380ebb9c14c6b3317c53c531e574d070 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 28 Jan 2026 11:03:02 +0100 Subject: [PATCH 3/3] download_size -> downloaded_size --- crates/core/src/migrations.rs | 4 ++-- crates/core/src/sync/operations.rs | 2 +- dart/test/sync_test.dart | 2 +- dart/test/utils/fix_035_fixtures.dart | 4 ++-- dart/test/utils/migration_fixtures.dart | 6 +++--- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index 7a2c13ee..95df2208 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -415,9 +415,9 @@ json_object('sql', 'DELETE FROM ps_migration WHERE id >= 11') if current_version < 12 && target_version >= 12 { let stmt = "\ -ALTER TABLE ps_buckets ADD COLUMN download_size INTEGER NOT NULL DEFAULT 0; +ALTER TABLE ps_buckets ADD COLUMN downloaded_size INTEGER NOT NULL DEFAULT 0; INSERT INTO ps_migration(id, down_migrations) VALUES(12, json_array( -json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN download_size'), +json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN downloaded_size'), json_object('sql', 'DELETE FROM ps_migration WHERE id >= 12') )); "; diff --git a/crates/core/src/sync/operations.rs b/crates/core/src/sync/operations.rs index f113991f..e3fa0d7b 100644 --- a/crates/core/src/sync/operations.rs +++ b/crates/core/src/sync/operations.rs @@ -197,7 +197,7 @@ WHERE bucket = ?1", add_checksum = (add_checksum + ?3) & 0xffffffff, op_checksum = (op_checksum + ?4) & 0xffffffff, count_since_last = count_since_last + ?5, - download_size = download_size + ?6 + downloaded_size = downloaded_size + ?6 WHERE id = ?1", )?; statement.bind_int64(1, bucket_id)?; diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index cf327b38..83454696 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -1397,7 +1397,7 @@ CREATE TRIGGER users_ref_delete pushCheckpoint(buckets: [bucketDescription('a', count: 2)]); void expectDownloadSize(int size) { - final [row] = db.select('SELECT download_size FROM ps_buckets'); + final [row] = db.select('SELECT downloaded_size FROM ps_buckets'); expect(row[0], size); } diff --git a/dart/test/utils/fix_035_fixtures.dart b/dart/test/utils/fix_035_fixtures.dart index 9e043589..5f7b3978 100644 --- a/dart/test/utils/fix_035_fixtures.dart +++ b/dart/test/utils/fix_035_fixtures.dart @@ -18,7 +18,7 @@ const dataBroken = ''' /// Data after applying the migration fix, but before sync_local const dataMigrated = ''' -;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, download_size) VALUES +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, downloaded_size) VALUES (1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0, 0), (2, 'b2', 0, 0, 0, 0, 3, 0, 0, 0, 0) ;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES @@ -39,7 +39,7 @@ const dataMigrated = ''' /// Data after applying the migration fix and sync_local const dataFixed = ''' -;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, download_size) VALUES +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, downloaded_size) VALUES (1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0, 0), (2, 'b2', 0, 0, 0, 0, 3, 0, 0, 0, 0) ;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES diff --git a/dart/test/utils/migration_fixtures.dart b/dart/test/utils/migration_fixtures.dart index fcf3e374..601d2b17 100644 --- a/dart/test/utils/migration_fixtures.dart +++ b/dart/test/utils/migration_fixtures.dart @@ -425,7 +425,7 @@ const expectedState = { add_checksum INTEGER NOT NULL DEFAULT 0, op_checksum INTEGER NOT NULL DEFAULT 0, pending_delete INTEGER NOT NULL DEFAULT 0 -, count_at_last INTEGER NOT NULL DEFAULT 0, count_since_last INTEGER NOT NULL DEFAULT 0, download_size INTEGER NOT NULL DEFAULT 0) STRICT +, count_at_last INTEGER NOT NULL DEFAULT 0, count_since_last INTEGER NOT NULL DEFAULT 0, downloaded_size INTEGER NOT NULL DEFAULT 0) STRICT ;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER) ;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB) ;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT) @@ -474,7 +474,7 @@ const expectedState = { ;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(10, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"DELETE FROM ps_migration WHERE id >= 10"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(11, '[{"sql":"DROP TABLE ps_stream_subscriptions"},{"sql":"DELETE FROM ps_migration WHERE id >= 11"}]') -;INSERT INTO ps_migration(id, down_migrations) VALUES(12, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN download_size"},{"sql":"DELETE FROM ps_migration WHERE id >= 12"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(12, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN downloaded_size"},{"sql":"DELETE FROM ps_migration WHERE id >= 12"}]') ''', }; @@ -590,7 +590,7 @@ const data1 = { ('lists', 'l2') ''', 12: r''' -;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, download_size) VALUES +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, downloaded_size) VALUES (1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0, 0), (2, 'b2', 0, 0, 0, 1005, 3, 0, 0, 0, 0) ;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES