Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion crates/core/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::fix_data::apply_v035_fix;
use crate::schema::inspection::ExistingView;
use crate::sync::BucketPriority;

pub const LATEST_VERSION: i32 = 11;
pub const LATEST_VERSION: i32 = 12;

pub fn powersync_migrate(
ctx: *mut sqlite::context,
Expand Down Expand Up @@ -413,5 +413,16 @@ json_object('sql', 'DELETE FROM ps_migration WHERE id >= 11')
local_db.exec_safe(stmt).into_db_result(local_db)?;
}

if current_version < 12 && target_version >= 12 {
let stmt = "\
ALTER TABLE ps_buckets ADD COLUMN downloaded_size INTEGER NOT NULL DEFAULT 0;
INSERT INTO ps_migration(id, down_migrations) VALUES(12, json_array(
json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN downloaded_size'),
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 12')
));
";
local_db.exec_safe(stmt).into_db_result(local_db)?;
}

Ok(())
}
2 changes: 1 addition & 1 deletion crates/core/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub fn insert_operation(db: *mut sqlite::sqlite3, data: &str) -> Result<(), Powe
let adapter = StorageAdapter::new(db)?;

for line in &batch.buckets {
insert_bucket_operations(&adapter, &line)?;
insert_bucket_operations(&adapter, &line, 0)?;
}

Ok(())
Expand Down
46 changes: 46 additions & 0 deletions crates/core/src/sync/line.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use serde::Deserialize;
use serde::de::{Error, IgnoredAny, VariantAccess, Visitor};
use serde_with::{DisplayFromStr, serde_as};

use crate::bson;
use crate::error::PowerSyncError;

use super::Checksum;
use super::bucket_priority::BucketPriority;

Expand All @@ -15,6 +18,49 @@ use super::bucket_priority::BucketPriority;
/// internal copy).
type SyncLineStr<'a> = Cow<'a, str>;

#[derive(Clone, Copy)]
pub enum SyncLineSource<'a> {
/// Sync lines that have been decoded from JSON.
Text(&'a str),

/// Sync lines that have been decoded from BSON.
Binary(&'a [u8]),
}

impl<'a> SyncLineSource<'a> {
pub fn len(&self) -> usize {
match self {
SyncLineSource::Text(text) => text.len(),
SyncLineSource::Binary(binary) => binary.len(),
}
}
}

pub struct SyncLineWithSource<'a> {
pub source: SyncLineSource<'a>,
pub line: SyncLine<'a>,
}

impl<'a> SyncLineWithSource<'a> {
pub fn from_text(source: &'a str) -> Result<Self, PowerSyncError> {
let line = serde_json::from_str(source)
.map_err(|e| PowerSyncError::sync_protocol_error("invalid text line", e))?;
Ok(SyncLineWithSource {
source: SyncLineSource::Text(source),
line,
})
}

pub fn from_binary(source: &'a [u8]) -> Result<Self, PowerSyncError> {
let line = bson::from_bytes(source)
.map_err(|e| PowerSyncError::sync_protocol_error("invalid binary line", e))?;
Ok(SyncLineWithSource {
source: SyncLineSource::Binary(source),
line,
})
}
}

#[derive(Debug)]

pub enum SyncLine<'a> {
Expand Down
6 changes: 5 additions & 1 deletion crates/core/src/sync/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ use super::{
storage_adapter::{BucketInfo, StorageAdapter},
};

/// If known, `size` should be the size of the buffer from which data has been decoded in bytes.
pub fn insert_bucket_operations(
adapter: &StorageAdapter,
data: &DataLine,
size: usize,
) -> Result<(), PowerSyncError> {
let db = adapter.db;
let BucketInfo {
Expand Down Expand Up @@ -194,14 +196,16 @@ WHERE bucket = ?1",
SET last_op = ?2,
add_checksum = (add_checksum + ?3) & 0xffffffff,
op_checksum = (op_checksum + ?4) & 0xffffffff,
count_since_last = count_since_last + ?5
count_since_last = count_since_last + ?5,
downloaded_size = downloaded_size + ?6
WHERE id = ?1",
)?;
statement.bind_int64(1, bucket_id)?;
statement.bind_int64(2, *last_op)?;
statement.bind_int(3, add_checksum.bitcast_i32())?;
statement.bind_int(4, op_checksum.bitcast_i32())?;
statement.bind_int(5, added_ops)?;
statement.bind_int64(6, size as i64)?;

statement.exec()?;
}
Expand Down
19 changes: 9 additions & 10 deletions crates/core/src/sync/streaming_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use alloc::{
use futures_lite::FutureExt;

use crate::{
bson,
error::{PowerSyncError, PowerSyncErrorCause},
kv::client_id,
state::DatabaseState,
Expand All @@ -28,7 +27,7 @@ use crate::{
interface::{CloseSyncStream, StartSyncStream, StreamSubscriptionRequest},
line::{
BucketSubscriptionReason, DataLine, StreamDescription, StreamSubscriptionError,
StreamSubscriptionErrorCause,
StreamSubscriptionErrorCause, SyncLineWithSource,
},
subscriptions::LocallyTrackedSubscription,
sync_status::{ActiveStreamSubscription, Timestamp},
Expand Down Expand Up @@ -268,8 +267,10 @@ impl StreamingSyncIteration {
&self,
target: &SyncTarget,
event: &mut ActiveEvent,
line: &'a SyncLine<'a>,
line: &'a SyncLineWithSource<'a>,
) -> Result<SyncStateMachineTransition<'a>, PowerSyncError> {
let SyncLineWithSource { source, line } = line;

Ok(match line {
SyncLine::Checkpoint(checkpoint) => {
let (to_delete, updated_target) = target.track_checkpoint(&checkpoint);
Expand Down Expand Up @@ -386,7 +387,7 @@ impl StreamingSyncIteration {
}
}
SyncLine::Data(data_line) => {
insert_bucket_operations(&self.adapter, &data_line)?;
insert_bucket_operations(&self.adapter, &data_line, source.len())?;
SyncStateMachineTransition::DataLineSaved { line: data_line }
}
SyncLine::KeepAlive(token) => {
Expand Down Expand Up @@ -492,7 +493,7 @@ impl StreamingSyncIteration {
&mut self,
target: &mut SyncTarget,
event: &mut ActiveEvent,
line: &SyncLine,
line: &SyncLineWithSource,
) -> Result<Option<CloseSyncStream>, PowerSyncError> {
let transition = self.prepare_handling_sync_line(target, event, line)?;
Ok(self.apply_transition(target, event, transition))
Expand All @@ -506,7 +507,7 @@ impl StreamingSyncIteration {
let hide_disconnect = loop {
let event = Self::receive_event().await;

let line: SyncLine = match event.event {
let line: SyncLineWithSource = match event.event {
SyncEvent::Initialize { .. } => {
panic!("Initialize should only be emited once")
}
Expand All @@ -515,10 +516,8 @@ impl StreamingSyncIteration {
.update(|s| s.disconnect(), &mut event.instructions);
break false;
}
SyncEvent::TextLine { data } => serde_json::from_str(data)
.map_err(|e| PowerSyncError::sync_protocol_error("invalid text line", e))?,
SyncEvent::BinaryLine { data } => bson::from_bytes(data)
.map_err(|e| PowerSyncError::sync_protocol_error("invalid binary line", e))?,
SyncEvent::TextLine { data } => SyncLineWithSource::from_text(data)?,
SyncEvent::BinaryLine { data } => SyncLineWithSource::from_binary(data)?,
SyncEvent::UploadFinished => {
self.try_applying_write_after_completed_upload(event)?;

Expand Down
16 changes: 16 additions & 0 deletions dart/test/sync_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,22 @@ CREATE TRIGGER users_ref_delete
db.dispose();
expect(vfs.openFiles, isZero);
});

test('tracks download size', () {
invokeControl('start', null);
pushCheckpoint(buckets: [bucketDescription('a', count: 2)]);

void expectDownloadSize(int size) {
final [row] = db.select('SELECT downloaded_size FROM ps_buckets');
expect(row[0], size);
}

pushSyncData('a', '1', 'row-0', 'PUT', {'col': 'hi'});
expectDownloadSize(isBson ? 185 : 186);

pushSyncData('a', '1', 'row-1', 'PUT', {'col': 'hi again'});
expectDownloadSize(isBson ? 376 : 378);
});
}

final priorityBuckets = [
Expand Down
12 changes: 6 additions & 6 deletions dart/test/utils/fix_035_fixtures.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ const dataBroken = '''

/// Data after applying the migration fix, but before sync_local
const dataMigrated = '''
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last) VALUES
(1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0),
(2, 'b2', 0, 0, 0, 0, 3, 0, 0, 0)
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, downloaded_size) VALUES
(1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0, 0),
(2, 'b2', 0, 0, 0, 0, 3, 0, 0, 0, 0)
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
(1, 1, 'todos', 't1', '', '{}', 100),
(1, 2, 'todos', 't2', '', '{}', 20),
Expand All @@ -39,9 +39,9 @@ const dataMigrated = '''

/// Data after applying the migration fix and sync_local
const dataFixed = '''
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last) VALUES
(1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0),
(2, 'b2', 0, 0, 0, 0, 3, 0, 0, 0)
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, downloaded_size) VALUES
(1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0, 0),
(2, 'b2', 0, 0, 0, 0, 3, 0, 0, 0, 0)
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
(1, 1, 'todos', 't1', '', '{}', 100),
(1, 2, 'todos', 't2', '', '{}', 20),
Expand Down
75 changes: 74 additions & 1 deletion dart/test/utils/migration_fixtures.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/// The current database version
const databaseVersion = 11;
const databaseVersion = 12;

/// This is the base database state that we expect at various schema versions.
/// Generated by loading the specific library version, and exporting the schema.
Expand Down Expand Up @@ -414,6 +414,67 @@ const expectedState = <int, String>{
;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(10, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"DELETE FROM ps_migration WHERE id >= 10"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(11, '[{"sql":"DROP TABLE ps_stream_subscriptions"},{"sql":"DELETE FROM ps_migration WHERE id >= 11"}]')
''',
12: r'''
;CREATE TABLE ps_buckets(
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
last_applied_op INTEGER NOT NULL DEFAULT 0,
last_op INTEGER NOT NULL DEFAULT 0,
target_op INTEGER NOT NULL DEFAULT 0,
add_checksum INTEGER NOT NULL DEFAULT 0,
op_checksum INTEGER NOT NULL DEFAULT 0,
pending_delete INTEGER NOT NULL DEFAULT 0
, count_at_last INTEGER NOT NULL DEFAULT 0, count_since_last INTEGER NOT NULL DEFAULT 0, downloaded_size INTEGER NOT NULL DEFAULT 0) STRICT
;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER)
;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB)
;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT)
;CREATE TABLE ps_oplog(
bucket INTEGER NOT NULL,
op_id INTEGER NOT NULL,
row_type TEXT,
row_id TEXT,
key TEXT,
data TEXT,
hash INTEGER NOT NULL) STRICT
;CREATE TABLE ps_stream_subscriptions (
id INTEGER NOT NULL PRIMARY KEY,
stream_name TEXT NOT NULL,
active INTEGER NOT NULL DEFAULT FALSE,
is_default INTEGER NOT NULL DEFAULT FALSE,
local_priority INTEGER,
local_params TEXT NOT NULL DEFAULT 'null',
ttl INTEGER,
expires_at INTEGER,
last_synced_at INTEGER,
UNIQUE (stream_name, local_params)
) STRICT
;CREATE TABLE ps_sync_state (
priority INTEGER NOT NULL PRIMARY KEY,
last_synced_at TEXT NOT NULL
) STRICT
;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER)
;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id))
;CREATE TABLE ps_updated_rows(
row_type TEXT,
row_id TEXT,
PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID
;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name)
;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key)
;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id)
;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id)
;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null)
;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = 2147483647"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(8, '[{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL,\n last_synced_at TEXT NOT NULL\n) STRICT"},{"sql":"INSERT INTO ps_sync_state SELECT * FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 8"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(10, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"DELETE FROM ps_migration WHERE id >= 10"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(11, '[{"sql":"DROP TABLE ps_stream_subscriptions"},{"sql":"DELETE FROM ps_migration WHERE id >= 11"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(12, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN downloaded_size"},{"sql":"DELETE FROM ps_migration WHERE id >= 12"}]')
''',
};

Expand Down Expand Up @@ -527,6 +588,17 @@ const data1 = <int, String>{
(2, 3, 'lists', 'l1', '', '{}', 3)
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
('lists', 'l2')
''',
12: r'''
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, downloaded_size) VALUES
(1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0, 0),
(2, 'b2', 0, 0, 0, 1005, 3, 0, 0, 0, 0)
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
(1, 1, 'todos', 't1', '', '{}', 100),
(1, 2, 'todos', 't2', '', '{}', 20),
(2, 3, 'lists', 'l1', '', '{}', 3)
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
('lists', 'l2')
'''
};

Expand Down Expand Up @@ -573,6 +645,7 @@ final dataDown1 = <int, String>{
8: data1[5]!,
9: data1[9]!,
10: data1[9]!,
11: data1[10]!,
};

final finalData1 = data1[databaseVersion]!;
Expand Down