diff --git a/crates/core/src/operations_vtab.rs b/crates/core/src/operations_vtab.rs index 137d9366..9c0e42f6 100644 --- a/crates/core/src/operations_vtab.rs +++ b/crates/core/src/operations_vtab.rs @@ -57,9 +57,14 @@ extern "C" fn connect( } extern "C" fn disconnect(vtab: *mut sqlite::vtab) -> c_int { - unsafe { - drop(Box::from_raw(vtab as *mut VirtualTable)); - } + // Assume ownership of vtab since xDisconnect is supposed to destroy the connection. + let vtab = unsafe { Box::from_raw(vtab as *mut VirtualTable) }; + + // This is an eponymous virtual table. It will only be disconnected when the database is closed. + // So we can use this as a "pre-close" hook and ensure we clear prepared statements the core + // extension might hold. + vtab.state.release_resources(); + ResultCode::OK as c_int } @@ -103,6 +108,11 @@ extern "C" fn update( } else if op == "delete_bucket" { let result = delete_bucket(db, args[3].text()); vtab_result(vtab, result) + } else if op == "noop" { + // We call this to ensure the table is added to an active database, ensuring that + // the disconnect callback runs before the database is closed (allowing us to free + // resources from the client state). + ResultCode::OK as c_int } else { ResultCode::MISUSE as c_int } diff --git a/crates/core/src/state.rs b/crates/core/src/state.rs index f9f55f3a..1239215c 100644 --- a/crates/core/src/state.rs +++ b/crates/core/src/state.rs @@ -11,7 +11,7 @@ use alloc::{ use powersync_sqlite_nostd::{self as sqlite, Context}; use sqlite::{Connection, ResultCode}; -use crate::schema::Schema; +use crate::{schema::Schema, sync::SyncClient}; /// State that is shared for a SQLite database connection after the core extension has been /// registered on it. @@ -24,6 +24,7 @@ pub struct DatabaseState { schema: RefCell>, pending_updates: RefCell>, commited_updates: RefCell>, + pub sync_client: RefCell>, } impl DatabaseState { @@ -90,6 +91,12 @@ impl DatabaseState { core::mem::replace(&mut *committed, Default::default()) } + /// Releases global resources (like prepared statements for the sync client) referenced from + /// this state. + pub fn release_resources(&self) { + self.sync_client.replace(None); + } + /// ## Safety /// /// This is only safe to call when an `Rc` has been installed as the `user_data` diff --git a/crates/core/src/sync/interface.rs b/crates/core/src/sync/interface.rs index ac2883c5..8fea4bf6 100644 --- a/crates/core/src/sync/interface.rs +++ b/crates/core/src/sync/interface.rs @@ -188,14 +188,6 @@ pub struct BucketRequest { pub after: String, } -/// Wrapper around a [SyncClient]. -/// -/// We allocate one instance of this per database (in [register]) - the [SyncClient] has an initial -/// empty state that doesn't consume any resources. -struct SqlController { - client: SyncClient, -} - pub fn register(db: *mut sqlite::sqlite3, state: Rc) -> Result<(), ResultCode> { extern "C" fn control( ctx: *mut sqlite::context, @@ -203,11 +195,10 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc) -> Result<() argv: *mut *mut sqlite::value, ) -> () { let result = (|| -> Result<(), PowerSyncError> { - debug_assert!(!ctx.db_handle().get_autocommit()); - - let controller = unsafe { ctx.user_data().cast::().as_mut() } - .ok_or_else(|| PowerSyncError::unknown_internal())?; + let db = ctx.db_handle(); + debug_assert!(!db.get_autocommit()); + let state = unsafe { DatabaseState::from_context(&ctx) }; let args = sqlite::args!(argc, argv); let [op, payload] = args else { // This should be unreachable, we register the function with two arguments. @@ -222,14 +213,24 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc) -> Result<() let op = op.text(); let event = match op { - "start" => SyncControlRequest::StartSyncStream({ - if payload.value_type() == ColumnType::Text { - serde_json::from_str(payload.text()) - .map_err(PowerSyncError::as_argument_error)? - } else { - StartSyncStream::default() - } - }), + "start" => { + // Ensure the operations vtab exists. It's not actually used by the sync client, + // but we rely on that vtab being destroyed as a pre-close hook for the database + // connection to free statements preserved across multiple powersync_control + // invocations. + db.exec_safe( + "insert into powersync_operations (op, data) VALUES ('noop', null);", + )?; + + SyncControlRequest::StartSyncStream({ + if payload.value_type() == ColumnType::Text { + serde_json::from_str(payload.text()) + .map_err(PowerSyncError::as_argument_error)? + } else { + StartSyncStream::default() + } + }) + } "stop" => SyncControlRequest::StopSyncStream, "line_text" => SyncControlRequest::SyncEvent(SyncEvent::TextLine { data: if payload.value_type() == ColumnType::Text { @@ -267,14 +268,25 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc) -> Result<() "subscriptions" => { let request = serde_json::from_str(payload.text()) .map_err(PowerSyncError::as_argument_error)?; - return apply_subscriptions(ctx.db_handle(), request); + return apply_subscriptions(db, request); } _ => { return Err(PowerSyncError::argument_error("Unknown operation")); } }; - let instructions = controller.client.push_event(event)?; + let instructions = { + let mut client = state.sync_client.borrow_mut(); + + client + .get_or_insert_with(|| { + let state = unsafe { DatabaseState::clone_from(ctx.user_data()) }; + + SyncClient::new(db, &state) + }) + .push_event(event) + }?; + let formatted = serde_json::to_string(&instructions).map_err(PowerSyncError::internal)?; ctx.result_text_transient(&formatted); @@ -288,23 +300,15 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc) -> Result<() } } - unsafe extern "C" fn destroy(ptr: *mut c_void) { - drop(unsafe { Box::from_raw(ptr.cast::()) }); - } - - let controller = Box::new(SqlController { - client: SyncClient::new(db, state), - }); - db.create_function_v2( "powersync_control", 2, sqlite::UTF8 | sqlite::DIRECTONLY | SQLITE_RESULT_SUBTYPE, - Some(Box::into_raw(controller).cast()), + Some(Rc::into_raw(state.clone()) as *mut c_void), Some(control), None, None, - Some(destroy), + Some(DatabaseState::destroy_rc), )?; db.create_function_v2( diff --git a/crates/core/src/sync/mod.rs b/crates/core/src/sync/mod.rs index d4354c1c..47b137f0 100644 --- a/crates/core/src/sync/mod.rs +++ b/crates/core/src/sync/mod.rs @@ -16,6 +16,7 @@ pub use bucket_priority::BucketPriority; pub use checksum::Checksum; use crate::state::DatabaseState; +pub use streaming_sync::SyncClient; pub fn register(db: *mut sqlite::sqlite3, state: Rc) -> Result<(), ResultCode> { interface::register(db, state) diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index c31c3397..de5f27b7 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -11,7 +11,7 @@ use alloc::{ boxed::Box, collections::{btree_map::BTreeMap, btree_set::BTreeSet}, format, - rc::Rc, + rc::{Rc, Weak}, string::{String, ToString}, vec::Vec, }; @@ -51,16 +51,16 @@ use super::{ /// initialized. pub struct SyncClient { db: *mut sqlite::sqlite3, - db_state: Rc, + db_state: Weak, /// The current [ClientState] (essentially an optional [StreamingSyncIteration]). state: ClientState, } impl SyncClient { - pub fn new(db: *mut sqlite::sqlite3, state: Rc) -> Self { + pub fn new(db: *mut sqlite::sqlite3, state: &Rc) -> Self { Self { db, - db_state: state, + db_state: Rc::downgrade(state), state: ClientState::Idle, } } @@ -145,7 +145,7 @@ impl SyncIterationHandle { fn new( db: *mut sqlite::sqlite3, options: StartSyncStream, - state: Rc, + state: Weak, ) -> Result { let runner = StreamingSyncIteration { db, @@ -224,7 +224,7 @@ impl<'a> ActiveEvent<'a> { struct StreamingSyncIteration { db: *mut sqlite::sqlite3, - state: Rc, + state: Weak, adapter: StorageAdapter, options: StartSyncStream, status: SyncStatusContainer, @@ -813,9 +813,14 @@ impl StreamingSyncIteration { target: &OwnedCheckpoint, priority: Option, ) -> Result { - let result = - self.adapter - .sync_local(&self.state, target, priority, &self.options.schema)?; + let state = match self.state.upgrade() { + Some(state) => state, + None => return Err(PowerSyncError::unknown_internal()), + }; + + let result = self + .adapter + .sync_local(&*state, target, priority, &self.options.schema)?; if matches!(&result, SyncLocalResult::ChangesApplied) { // Update affected stream subscriptions to mark them as synced. diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index f67fc8e1..32ec3dea 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -13,6 +13,7 @@ import 'package:path/path.dart'; import 'utils/native_test_utils.dart'; import 'utils/test_utils.dart'; +import 'utils/tracking_vfs.dart'; void main() { final vfs = @@ -1375,6 +1376,23 @@ CREATE TRIGGER users_ref_delete expect(db.select('select * from user_reference'), isEmpty); }); }); + + test('can close database while iteration is active', () { + // The sync client caches prepared statements, we need to ensure those are + // freed when we close the connection since SQLite would keep files open + // otherwise. + final vfs = TrackingFileSystem( + parent: InMemoryFileSystem(), name: 'sync-test-cleanup'); + sqlite3.registerVirtualFileSystem(vfs); + addTearDown(() => sqlite3.unregisterVirtualFileSystem(vfs)); + + db = openTestDatabase(vfs: vfs, fileName: '/test.db') + ..select('select powersync_init();'); + invokeControl('start', null); + expect(vfs.openFiles, isPositive); + db.dispose(); + expect(vfs.openFiles, isZero); + }); } final priorityBuckets = [ diff --git a/dart/test/utils/tracking_vfs.dart b/dart/test/utils/tracking_vfs.dart index 86c2707c..2c92b326 100644 --- a/dart/test/utils/tracking_vfs.dart +++ b/dart/test/utils/tracking_vfs.dart @@ -8,6 +8,7 @@ final class TrackingFileSystem extends BaseVirtualFileSystem { int tempWrites = 0; int dataReads = 0; int dataWrites = 0; + int openFiles = 0; TrackingFileSystem({super.name = 'tracking', required this.parent}); @@ -29,6 +30,7 @@ final class TrackingFileSystem extends BaseVirtualFileSystem { @override XOpenResult xOpen(Sqlite3Filename path, int flags) { final result = parent.xOpen(path, flags); + openFiles++; return ( outFlags: result.outFlags, file: TrackingFile( @@ -85,6 +87,7 @@ class TrackingFile implements VirtualFileSystemFile { @override void xClose() { + vfs.openFiles--; return parentFile.xClose(); }