diff --git a/crates/bench/benches/index.rs b/crates/bench/benches/index.rs index ded3aa47547..8c77622ba20 100644 --- a/crates/bench/benches/index.rs +++ b/crates/bench/benches/index.rs @@ -184,7 +184,8 @@ impl Index for IBTree { Self(<_>::default()) } fn insert(&mut self, key: K, val: RowPointer) -> Result<(), RowPointer> { - self.0.insert(key, val) + // SAFETY: we never insert `(key, val)` twice in benchmarks. + unsafe { self.0.insert(key, val) } } fn seek(&self, key: K) -> impl Iterator { self.0.seek_range(&(key..=key)) @@ -250,7 +251,8 @@ impl Index for IDirectIndex { Self(<_>::default()) } fn insert(&mut self, key: K, val: RowPointer) -> Result<(), RowPointer> { - self.0.insert(key, val) + // SAFETY: we never insert `(key, val)` twice in benchmarks. + unsafe { self.0.insert(key, val) } } fn seek(&self, key: K) -> impl Iterator { self.0.seek_point(&key) diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 183696fe940..fcaef335597 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -860,7 +860,9 @@ impl CommittedState { let is_unique = unique_constraints.contains(&(table_id, columns)); let index = table.new_index(&algo, is_unique)?; - // SAFETY: `index` was derived from `table`. + // SAFETY: + // 1. `index` was derived from `table`. + // 2. `index` was just created, so it is empty. unsafe { table.insert_index(blob_store, index_id, index) }; index_id_map.insert(index_id, table_id); } diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index e4b417ba497..fd37b65e39f 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -1213,8 +1213,12 @@ impl MutTxId { // due to the existing rows having the same value for some column(s). let build_from_rows = |index: &mut TableIndex, table: &Table, bs: &dyn BlobStore| -> Result<()> { let rows = table.scan_rows(bs); - // SAFETY: (1) `tx_index` / `commit_index` was derived from `table` / `commit_table` + // SAFETY: + // + // (1) `tx_index` / `commit_index` was derived from `table` / `commit_table` // which in turn was derived from `commit_table`. + // + // (2) Both `tx_index` and `commit_index` are empty before this call. let violation = unsafe { index.build_from_rows(rows) }; violation.map_err(|v| map_violation(v, index, table, bs)) }; @@ -2705,8 +2709,13 @@ pub(super) fn insert<'a, const GENERATE: bool>( let ok = |row_ref| Ok((gen_cols, row_ref, insert_flags)); // `CHECK_SAME_ROW = true`, as there might be an identical row already in the tx state. - // SAFETY: `tx_table.is_row_present(row)` holds as we still haven't deleted the row, + // SAFETY: + // + // - `tx_table.is_row_present(row)` holds as we still haven't deleted the row, // in particular, the `write_gen_val_to_col` call does not remove the row. + // + // - `tx_row_ptr` was freshly made + // so we couldn't have inserted it into any of the indices yet. let res = unsafe { tx_table.confirm_insertion::(tx_blob_store, tx_row_ptr, blob_bytes) }; match res { @@ -2931,9 +2940,14 @@ impl MutTxId { // but it cannot, as the committed state already has `X` for `C`. // So we don't need to check the tx state for a duplicate row. // - // SAFETY: `tx_table.is_row_present(row)` holds as we still haven't deleted the row, + // SAFETY: + // + // - `tx_table.is_row_present(row)` holds as we still haven't deleted the row, // in particular, the `write_gen_val_to_col` call does not remove the row. // On error, `tx_row_ptr` has already been removed, so don't do it again. + // + // - `tx_row_ptr` was freshly made + // so we couldn't have inserted it into any of the indices yet. let (_, tx_row_ptr) = unsafe { tx_table.confirm_insertion::(tx_blob_store, tx_row_ptr, blob_bytes) }?; @@ -2950,9 +2964,14 @@ impl MutTxId { // This ensures that the old row is removed from the indices // before attempting to insert the new row into the indices. // - // SAFETY: `tx_table.is_row_present(tx_row_ptr)` and `tx_table.is_row_present(old_ptr)` both hold + // SAFETY: + // + // - `tx_table.is_row_present(tx_row_ptr)` and `tx_table.is_row_present(old_ptr)` both hold // as we've deleted neither. // In particular, the `write_gen_val_to_col` call does not remove the row. + // + // - `tx_row_ptr` was freshly made + // so we couldn't have inserted it into any of the indices yet. let tx_row_ptr = unsafe { tx_table.confirm_update(tx_blob_store, tx_row_ptr, old_ptr, blob_bytes) }?; if let Some(old_commit_del_ptr) = old_commit_del_ptr { diff --git a/crates/sats/src/proptest.rs b/crates/sats/src/proptest.rs index 5cf498a6d42..0edb0daf328 100644 --- a/crates/sats/src/proptest.rs +++ b/crates/sats/src/proptest.rs @@ -211,7 +211,7 @@ fn generate_array_value(ty: AlgebraicType) -> BoxedStrategy { } } -fn gen_with( +pub fn gen_with( with: impl Strategy, then: impl Fn(T) -> US, ) -> impl Strategy { diff --git a/crates/table/benches/page_manager.rs b/crates/table/benches/page_manager.rs index 69241cf9a0a..06e679dece9 100644 --- a/crates/table/benches/page_manager.rs +++ b/crates/table/benches/page_manager.rs @@ -760,7 +760,9 @@ fn make_table_with_index(unique: bool) -> (Table, IndexId) { let index_id = IndexId::SENTINEL; let algo = BTreeAlgorithm { columns: cols }.into(); let idx = tbl.new_index(&algo, unique).unwrap(); - // SAFETY: index was derived from the table. + // SAFETY: + // 1. Index was derived from the table. + // 2. Index was empty before this call. unsafe { tbl.insert_index(&NullBlobStore, index_id, idx) }; (tbl, index_id) diff --git a/crates/table/proptest-regressions/table_index/mod.txt b/crates/table/proptest-regressions/table_index/mod.txt index 8f6d53474b3..87f67a9df2a 100644 --- a/crates/table/proptest-regressions/table_index/mod.txt +++ b/crates/table/proptest-regressions/table_index/mod.txt @@ -6,3 +6,4 @@ # everyone who runs the test benefits from these saved cases. cc 3276d3db4a1a70d78db9a6a01eaa3bba810a2317e9c67e4d5d8d93cbba472c99 # shrinks to ((ty, cols, pv), is_unique) = ((ProductType {None: Bool}, [ColId(0)], ProductValue { elements: [Bool(false)] }), false) cc bc80b80ac2390452c0a152d2c6e2abc29ce146642f3cd0fe136ffe6173cf4c8c # shrinks to (ty, cols, pv) = (ProductType {None: I64}, [ColId(0)], ProductValue { elements: [I64(0)] }), kind = Direct +cc b08557e7170d6255f0eb54bf8d213c5f20d33b104bf9dba106c3360a6c4db742 # shrinks to (ty, cols, pvs) = (ProductType {None: Array(ArrayType { elem_ty: Product(ProductType {}) }), None: Product(ProductType {}), None: Product(ProductType {"field_0": I32, "field_1": Product(ProductType {}), "field_2": F32, "field_3": U8, "field_4": Product(ProductType {})}), None: Product(ProductType {"field_0": U128, "field_1": String, "field_2": String, "field_3": Product(ProductType {}), "field_4": Product(ProductType {}), "field_5": U256, "field_6": String, "field_7": Product(ProductType {}), "field_8": Product(ProductType {}), "field_9": Product(ProductType {}), "field_10": Product(ProductType {}), "field_11": Product(ProductType {}), "field_12": Product(ProductType {}), "field_13": String, "field_14": I8}), None: Sum(SumType {"variant_0": Product(ProductType {}), "variant_1": Product(ProductType {}), "variant_2": Product(ProductType {}), "variant_3": String, "variant_4": I256, "variant_5": I16, "variant_6": U32, "variant_7": Product(ProductType {}), "variant_8": String, "variant_9": U128, "variant_10": Product(ProductType {}), "variant_11": String, "variant_12": U8, "variant_13": String, "variant_14": Product(ProductType {})})}, [ColId(0)], {ProductValue { elements: [Array([ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }]), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [I32(-1328563213), Product(ProductValue { elements: [] }), F32(Total(958158200000.0)), U8(203), Product(ProductValue { elements: [] })] }), Product(ProductValue { elements: [U128(Packed(251502612027954903598235106926969200112)), String("açѨ𑾰ಫ𞄅\u{1cf44}"), String("¥ⶥ𓅴/"), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), U256(14146624945421842192487613124473728284456634249970206008103750852206939464228), String("ⷜΌ05<ਛ"), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), String("?<:ꩥ𐌼3¥ᒐ🕴*"), I8(26)] }), Sum(SumValue { tag: 9, value: U128(Packed(125257749904311445818453176702095543350)) })] }, ProductValue { elements: [Array([ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }]), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [I32(-751750749), Product(ProductValue { elements: [] }), F32(Total(3.336987e-35)), U8(42), Product(ProductValue { elements: [] })] }), Product(ProductValue { elements: [U128(Packed(339104954473373805302093813396056989268)), String("D¡fে"), String("t,$᪖~?\\¤<\\8{%:ট𞹉𐨜/|\u{113d2}q"), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), U256(41773988496513295035720168499770300429513056844463058281982480850904823330850), String("Ⱥ\u{d62}\u{a0}u$u/ଳ🯖𞹛סּ𐠼ͼ\u{ccd}/*k&=$(ᆴ%:ꡨ𖭝𞢘d:"), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), String("꯲ஐ𞹇Ⱥﺘ𐄂𐐠ⷃ&ã𞹡4Ⴭ𞟮\u{1bc9d}፱5`$𐵻ῩH"), I8(-70)] }), Sum(SumValue { tag: 1, value: Product(ProductValue { elements: [] }) })] }, ProductValue { elements: [Array([ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }]), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [I32(-1118630758), Product(ProductValue { elements: [] }), F32(Total(-0.0)), U8(239), Product(ProductValue { elements: [] })] }), Product(ProductValue { elements: [U128(Packed(27536991987242777193405425217462174222)), String("𝅀𝔛`>ଅ$?𐖐ᛡࡩ𑛗𐊛𑛃Y$𖫬🕴\u{180b}b$𝋎𖵘𞹛𐂥.'ਬȺ\"$"), String("𑴈சT'V)ø𑌞𝍨%$hÆN🕴\"¥;o!ଂ @{/k`"), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), U256(29647333290262267566949551720292103306397329265983271305104466210817874163429), String("4p¢𪦝R$𞺉Ῠ+𝒞a\\`ᅩ𐞗\u{11368}:𑶧\u{9d7}U{?}w¼\"?𐄚M𝌤"), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), String("ቝဓk𐵁ȺcNໆגּ`"), I8(-48)] }), Sum(SumValue { tag: 2, value: Product(ProductValue { elements: [] }) })] }, ProductValue { elements: [Array([ProductValue { elements: [] }, ProductValue { elements: [] }]), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [I32(698027962), Product(ProductValue { elements: [] }), F32(Total(-39284745000000.0)), U8(112), Product(ProductValue { elements: [] })] }), Product(ProductValue { elements: [U128(Packed(174967764461688626745346351186100520227)), String("\u{1932}'\\*ᝧ¥'\u{1c31}P"), String("%vȺ𖩧Ⱥ𖼁=🕴𐆠:𐮙ȺA0𝼖𐄀M𑶌"), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), U256(37717031760385427292736334126399394818959330048323119881691271279214480342799), String("JﬖF𫦴Ѩ[/🜸𞸻1Yᝧ�9\u{1e029}(𝼪"), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), String("꣕vﶩG\u{cc8}/e=Ѩ2ೝ𐔓𐭟*"), I8(-75)] }), Sum(SumValue { tag: 8, value: String("ⴚq//Gmâଋ$,>=") })] }, ProductValue { elements: [Array([ProductValue { elements: [] }]), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [I32(808239480), Product(ProductValue { elements: [] }), F32(Total(-3.414441e37)), U8(177), Product(ProductValue { elements: [] })] }), Product(ProductValue { elements: [U128(Packed(10204641291867593007668743197885064201)), String("3G𝔩+\"�\"ໆMꬠ𐁑'{{𑱁=H&P𝒻:É�\u{10a05}¥Ⱥ\u{a0}'"), String("?=ì#ተ+𖩗¥ஜ"), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), U256(14046885727081498117947153136180091694669074271105084926963954623447898517582), String("ໃ$🬯pꔣ<:�tȺ𑚯[𐕡\"/�🕴pi-%\u{11d3a}"), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), String("'"), I8(105)] }), Sum(SumValue { tag: 11, value: String("æ\"") })] }, ProductValue { elements: [Array([ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }]), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [I32(-70980545), Product(ProductValue { elements: [] }), F32(Total(1.171588e-29)), U8(223), Product(ProductValue { elements: [] })] }), Product(ProductValue { elements: [U128(Packed(88777670818810963552904284719155639123)), String("𐼑𐶁ῖ🫖t𞺦🟡iꬓ\u{11d90}(<$É\\G?᥀ᅵ¶𓄌9`&னȸ"), String("\"\":*ï"), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), U256(54010679623959203229181053928725898254083356964342913551887149504923935650601), String("𞹗&`B𝍴Sೳ𞠰_F𑂚ꧣ𝒷⭽/\\Ⱥ\\⭷੦&O.𪹟8"), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), String("🫤.`𑴄\u{1d17e}🝥x"), I8(53)] }), Sum(SumValue { tag: 11, value: String("¬Z") })] }, ProductValue { elements: [Array([ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }, ProductValue { elements: [] }]), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [I32(108646348), Product(ProductValue { elements: [] }), F32(Total(7.8576715e33)), U8(222), Product(ProductValue { elements: [] })] }), Product(ProductValue { elements: [U128(Packed(40574578188674017083176300176629015122)), String(".🕴.{`ધ:쁏Ѩඒ𝧅J¥𐍇]'*<𮰼ᾦ\"¥/yT\\𐭹ᥱ{=}"), String("\u{dd6}`H+𛅒Ѩ<ῐr*ࡷ𞓔�`=ಐᳳ??Ѩ𐿢?"), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), U256(60630591893869796294376955453587017416654442715111668314202502849166405246984), String("Y*V\u{11366}\u{8cd}מּ\"`H𝆹𝅥%"), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), String("ⶱÅቔ%�=𑇳Ꮱ"), I8(31)] }), Sum(SumValue { tag: 1, value: Product(ProductValue { elements: [] }) })] }, ProductValue { elements: [Array([ProductValue { elements: [] }, ProductValue { elements: [] }]), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [I32(830118612), Product(ProductValue { elements: [] }), F32(Total(1.101187e-33)), U8(65), Product(ProductValue { elements: [] })] }), Product(ProductValue { elements: [U128(Packed(137337744496642533251742504155558136608)), String("\u{dd3}&`/[`6࠳*xtäנּ.KE"), String("tP�V:`?A/𑃳"), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), U256(80787311696497238070344266539849847782086419751086673488319811239072145291490), String("&�𑶔ਸr𞸧\"\\&%�\u{1182f}𝔄𑊨Gx𱇨J${\"ₙ$🂼*"), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), String("?🕴�'ᏽ\u{89a}*C�¥¥0\u{cd6}𑋰𑎐\\q᧪\u{bcd}ൽ/=🕴"), I8(26)] }), Sum(SumValue { tag: 6, value: U32(2024581402) })] }, ProductValue { elements: [Array([]), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [I32(24951015), Product(ProductValue { elements: [] }), F32(Total(-12695150000000.0)), U8(173), Product(ProductValue { elements: [] })] }), Product(ProductValue { elements: [U128(Packed(258919236975175920825620631536077804022)), String("=<|/\\s'צw{¬'.¥"), String("⫓\\*ஸ\u{ac7}Y: ?t:$2Ѩ5୧%'𐿱ਗ਼𞥟$>"), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), U256(22057633765777686815195263370569165748936401038219764560190933715062466133713), String("i*�ໜ𑿜𒍽X'Èꟙ?$/x/'𑴈ງ{\u{bd7}"), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), Product(ProductValue { elements: [] }), String("G\"\u{11357}XਵὠzѨ@+Þ'V/"), I8(-125)] }), Sum(SumValue { tag: 7, value: Product(ProductValue { elements: [] }) })] }}), kind = BTree, is_unique = false diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index 5054ed10d62..daf13e1708b 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -597,11 +597,16 @@ impl Table { let row_ptr = row_ref.pointer(); // Confirm the insertion, checking any constraints, removing the physical row on error. - // SAFETY: We just inserted `ptr`, so it must be present. + // // Re. `CHECK_SAME_ROW = true`, // where `insert` is called, we are not dealing with transactions, // and we already know there cannot be a duplicate row error, // but we check just in case it isn't. + // + // SAFETY: We just inserted `row_ptr`, + // so it must be present + // and we haven't done any other mutation + // so `row_ptr` couldn't be in any of `table`'s indexes yet. let (hash, row_ptr) = unsafe { self.confirm_insertion::(blob_store, row_ptr, blob_bytes) }?; // SAFETY: Per post-condition of `confirm_insertion`, `row_ptr` refers to a valid row. let row_ref = unsafe { self.get_row_ref_unchecked(blob_store, row_ptr) }; @@ -796,7 +801,8 @@ impl Table { /// /// # Safety /// - /// `self.is_row_present(row)` must hold. + /// - `self.is_row_present(row)` must hold. + /// - `new_ptr` must not be present in any of the indices yet. pub unsafe fn confirm_insertion<'a, const CHECK_SAME_ROW: bool>( &'a mut self, blob_store: &'a mut dyn BlobStore, @@ -805,7 +811,9 @@ impl Table { ) -> Result<(Option, RowPointer), InsertError> { // SAFETY: Caller promised that `self.is_row_present(ptr)` holds. let hash = unsafe { self.insert_into_pointer_map(blob_store, ptr) }?; - // SAFETY: Caller promised that `self.is_row_present(ptr)` holds. + // SAFETY: + // - Caller promised that `self.is_row_present(ptr)` holds. + // - Caller promised that `ptr` is not present in any of the indices yet. unsafe { self.insert_into_indices::(blob_store, ptr) }?; self.update_statistics_added_row(blob_bytes); @@ -824,7 +832,8 @@ impl Table { /// /// # Safety /// - /// `self.is_row_present(new_row)` and `self.is_row_present(old_row)` must hold. + /// - `self.is_row_present(new_row)` and `self.is_row_present(old_row)` must hold. + /// - `new_ptr` must not be present in any of the indices yet. pub unsafe fn confirm_update<'a>( &'a mut self, blob_store: &'a mut dyn BlobStore, @@ -837,10 +846,14 @@ impl Table { unsafe { self.delete_from_indices(blob_store, old_ptr) }; // Insert new row into indices. - // SAFETY: Caller promised that `self.is_row_present(ptr)` holds. + // SAFETY: + // - Caller promised that `self.is_row_present(ptr)` holds. + // - Caller promised that `new_ptr` is not present in any of the indices yet. let res = unsafe { self.insert_into_indices::(blob_store, new_ptr) }; if let Err(e) = res { // Undo (1). + // SAFETY: In `(1)`, we removed `old_ptr` from the indices, + // so it cannot exist in any of them. unsafe { self.insert_into_indices::(blob_store, old_ptr) } .expect("re-inserting the old row into indices should always work"); return Err(e); @@ -880,6 +893,8 @@ impl Table { /// /// SAFETY: `self.is_row_present(new)` must hold. /// Post-condition: If this method returns `Ok(_)`, the row still exists. + /// + /// SAFETY: `new` must not be present in any of the indices yet. unsafe fn insert_into_indices<'a, const CHECK_SAME_ROW: bool>( &'a mut self, blob_store: &'a mut dyn BlobStore, @@ -1410,12 +1425,18 @@ impl Table { /// /// # Safety /// - /// Caller must promise that `index` was constructed with the same row type/layout as this table. + /// 1. Caller must promise that `index` was constructed with the same row type/layout as this table. + /// 2. Caller promises that `index.is_empty()`. pub unsafe fn insert_index(&mut self, blob_store: &dyn BlobStore, index_id: IndexId, mut index: TableIndex) { let rows = self.scan_rows(blob_store); - // SAFETY: Caller promised that table's row type/layout + // SAFETY: + // + // 1. Caller promised that table's row type/layout // matches that which `index` was constructed with. // It follows that this applies to any `rows`, as required. + // + // 2. Caller promised that the index was empty, + // so none of the `rows` could've already existed in `index`. let violation = unsafe { index.build_from_rows(rows) }; violation.unwrap_or_else(|ptr| { let index_schema = &self.schema.indexes.iter().find(|index_schema| index_schema.index_id == index_id).expect("Index should exist"); @@ -2412,7 +2433,9 @@ pub(crate) mod test { let algo = BTreeAlgorithm { columns: cols.clone() }.into(); let index = table.new_index(&algo, true).unwrap(); - // SAFETY: Index was derived from `table`. + // SAFETY: + // 1. Index was derived from `table`. + // 2. Index is empty before this call. unsafe { table.insert_index(&NullBlobStore, index_schema.index_id, index) }; // Reserve a page so that we can check the hash. @@ -2531,7 +2554,8 @@ pub(crate) mod test { let index = TableIndex::new(&ty, indexed_columns.clone(), IndexKind::BTree, false).unwrap(); // Add an index on column 0. // Safety: - // We're using `ty` as the row type for both `table` and the new index. + // 1. We're using `ty` as the row type for both `table` and the new index. + // 2. The index is empty before this call. unsafe { table.insert_index(&blob_store, index_id, index) }; // We have one index, which should be fully populated, @@ -2562,7 +2586,8 @@ pub(crate) mod test { let index = TableIndex::new(&ty, indexed_columns, IndexKind::BTree, false).unwrap(); // Add a duplicate of the same index, so we can check that all above quantities double. // Safety: - // As above, we're using `ty` as the row type for both `table` and the new index. + // 1. We're using `ty` as the row type for both `table` and the new index. + // 2. The index is empty before this call. unsafe { table.insert_index(&blob_store, IndexId(1), index) }; prop_assert_eq!(table.num_rows_in_indexes(), table.num_rows() * 2); @@ -2707,7 +2732,10 @@ pub(crate) mod test { let row_ptr = row_ref.pointer(); // Confirm the insertion, checking any constraints, removing the physical row on error. - // SAFETY: We just inserted `ptr`, so it must be present. + // SAFETY: We just inserted `row_ptr`, + // so it must be present + // and we haven't done any other mutation + // so `row_ptr` couldn't be in any of `table`'s indexes yet. let (hash, row_ptr) = unsafe { table.confirm_insertion::(blob_store, row_ptr, blob_bytes) }?; // SAFETY: Per post-condition of `confirm_insertion`, `row_ptr` refers to a valid row. let row_ref = unsafe { table.get_row_ref_unchecked(blob_store, row_ptr) }; diff --git a/crates/table/src/table_index/hash_index.rs b/crates/table/src/table_index/hash_index.rs index c1fd89cfd97..9712fd767a2 100644 --- a/crates/table/src/table_index/hash_index.rs +++ b/crates/table/src/table_index/hash_index.rs @@ -1,5 +1,5 @@ use super::{ - key_size::KeyBytesStorage, + multimap::MultiMapStatistics, same_key_entry::{same_key_iter, SameKeyEntry, SameKeyEntryIter}, Index, KeySize, }; @@ -22,55 +22,72 @@ pub struct HashIndex { /// as we allow a single element to be stored inline /// to improve performance for the common case of one element. map: HashMap, - /// The memoized number of rows indexed in `self.map`. - num_rows: usize, - /// Storage for [`Index::num_key_bytes`]. - num_key_bytes: u64, + /// Stats for `num_rows` and `num_key_bytes`. + stats: MultiMapStatistics, } impl Default for HashIndex { fn default() -> Self { Self { map: <_>::default(), - num_rows: <_>::default(), - num_key_bytes: <_>::default(), + stats: <_>::default(), } } } impl MemoryUsage for HashIndex { fn heap_usage(&self) -> usize { - let Self { - map, - num_rows, - num_key_bytes, - } = self; - map.heap_usage() + num_rows.heap_usage() + num_key_bytes.heap_usage() + let Self { map, stats } = self; + map.heap_usage() + stats.heap_usage() } } -impl Index for HashIndex { +// SAFETY: The implementations of all constructing +// and mutating methods uphold the invariant, +// assuming the caller requirements of the `unsafe` methods are upheld, +// that every `key -> ptr` pair only ever occurs once. +unsafe impl Index for HashIndex { type Key = K; + // ========================================================================= + // Construction + // ========================================================================= + fn clone_structure(&self) -> Self { <_>::default() } - /// Inserts the relation `key -> ptr` to this multimap. - /// - /// The map does not check whether `key -> ptr` was already in the map. - /// It's assumed that the same `ptr` is never added twice, - /// and multimaps do not bind one `key` to the same `ptr`. - fn insert(&mut self, key: Self::Key, ptr: RowPointer) -> Result<(), RowPointer> { - self.num_rows += 1; - self.num_key_bytes.add_to_key_bytes::(&key); - self.map.entry(key).or_default().push(ptr); + // ========================================================================= + // Mutation + // ========================================================================= + + unsafe fn insert(&mut self, key: Self::Key, ptr: RowPointer) -> Result<(), RowPointer> { + self.debug_ensure_key_ptr_not_included(&key, ptr); + + self.stats.add::(&key); + let entry = self.map.entry(key).or_default(); + // SAFETY: caller promised that `(key, ptr)` does not exist in the index + // so it also does not exist in `entry`. + unsafe { entry.push(ptr) }; Ok(()) } - /// Deletes `key -> ptr` from this multimap. - /// - /// Returns whether `key -> ptr` was present. + unsafe fn merge_from(&mut self, src_index: Self, mut translate: impl FnMut(RowPointer) -> RowPointer) { + // Merge `stats`. + self.stats.merge_from(src_index.stats); + + // Move over the `key -> ptr` pairs + // and translate `ptr`s from `src_index` to fit `self`. + for (key, src) in src_index.map { + let dst = self.map.entry(key).or_default(); + + // SAFETY: Given `(key, ptr)` in `src_index`, + // `(key, translate(ptr))` does not exist in `self`. + // It follows that the `dst ∩ translate(src) = ∅`. + unsafe { dst.merge_from(src, &mut translate) }; + } + } + fn delete(&mut self, key: &K, ptr: RowPointer) -> bool { let EntryRef::Occupied(mut entry) = self.map.entry_ref(key) else { return false; @@ -79,8 +96,7 @@ impl Index for HashIndex { let (deleted, is_empty) = entry.get_mut().delete(ptr); if deleted { - self.num_rows -= 1; - self.num_key_bytes.sub_from_key_bytes::(key); + self.stats.delete::(key); } if is_empty { @@ -90,6 +106,16 @@ impl Index for HashIndex { deleted } + fn clear(&mut self) { + // This will not deallocate the outer map. + self.map.clear(); + self.stats.clear(); + } + + // ========================================================================= + // Querying + // ========================================================================= + type PointIter<'a> = SameKeyEntryIter<'a> where @@ -103,16 +129,12 @@ impl Index for HashIndex { self.map.len() } - fn num_rows(&self) -> usize { - self.num_rows + fn num_key_bytes(&self) -> u64 { + self.stats.num_key_bytes } - /// Deletes all entries from the multimap, leaving it empty. - /// This will not deallocate the outer map. - fn clear(&mut self) { - self.map.clear(); - self.num_rows = 0; - self.num_key_bytes.reset_to_zero(); + fn num_rows(&self) -> usize { + self.stats.num_rows } fn can_merge(&self, _: &Self, _: impl Fn(&RowPointer) -> bool) -> Result<(), RowPointer> { diff --git a/crates/table/src/table_index/index.rs b/crates/table/src/table_index/index.rs index 62152bbf430..7518c51a4f1 100644 --- a/crates/table/src/table_index/index.rs +++ b/crates/table/src/table_index/index.rs @@ -1,7 +1,22 @@ use crate::{indexes::RowPointer, table_index::KeySize}; use core::{mem, ops::RangeBounds}; -pub trait Index { +/// An index relating `key -> ptr` pairs for quick lookup from `key`s. +/// +/// Indices may relate `key`s to `ptr`s injectively. +/// In other words, uniquely / with a 1-1 mapping. +/// They may also not, relating a `key` to several `ptr`s. +/// +/// # Safety +/// +/// An invariant and safety requirement +/// of all indices is that `key -> ptr` only occurs once. +/// This can be exploited by implementations for better performance +/// and is satisfied by the caller requirements of +/// [`Index::insert`] and [`Index::merge_from`], +/// the implementation of those methods, +/// and by the remaining implementation of the index not doing anything weird. +pub unsafe trait Index { /// The type of keys indexed. type Key: KeySize; @@ -27,23 +42,62 @@ pub trait Index { /// if inserting `key` is not compatible with the index /// or if it would be profitable to replace the index with a B-Tree. /// The provided default implementation does not return `Despecialize`. + /// + /// # Safety + /// + /// The relation `key -> ptr` must not exist in the index. + /// Note that inserting `key -> ptr_other`, + /// where `ptr != ptr_other`, is legal. fn insert_maybe_despecialize( &mut self, key: Self::Key, ptr: RowPointer, ) -> Result, Despecialize> { - Ok(self.insert(key, ptr)) + // SAFETY: forward caller requirements. + Ok(unsafe { self.insert(key, ptr) }) } /// Inserts the relation `key -> ptr` to this map. /// /// If `key` was already present in the index, - /// does not add an association with val. - /// Returns the existing associated pointer instead. - fn insert(&mut self, key: Self::Key, ptr: RowPointer) -> Result<(), RowPointer> { - self.insert_maybe_despecialize(key, ptr).unwrap() + /// and the index is unique, + /// does not add the relation `key -> ptr` + /// and returns the existing associated pointer instead. + /// + /// # Safety + /// + /// The relation `key -> ptr` must not exist in the index. + /// Note that inserting `key -> ptr_other`, + /// where `ptr != ptr_other`, is legal. + unsafe fn insert(&mut self, key: Self::Key, ptr: RowPointer) -> Result<(), RowPointer> { + // SAFETY: forward caller requirements. + #[allow(unused_unsafe)] + unsafe { self.insert_maybe_despecialize(key, ptr) }.unwrap() } + #[cfg(test)] + fn insert_for_test(&mut self, key: Self::Key, ptr: RowPointer) -> Result<(), RowPointer> { + // SAFETY: proof obligation checked inside the method for debug builds. + unsafe { self.insert(key, ptr) } + } + + /// Merge `src` into `self`. + /// + /// The closure `translate` is called + /// to convert row pointers from ones that fit the tx state + /// to ones that fit the committed state, + /// typically by using a translation table. + /// + /// This method does not return a result, + /// as it's only used when merging, + /// when we already know that there will be no uniqueness constraint conflicts. + /// + /// # Safety + /// + /// For every `(key, ptr)` in `src`, + /// `(key, translate(ptr))` does not exist in `self`. + unsafe fn merge_from(&mut self, src: Self, translate: impl Fn(RowPointer) -> RowPointer); + /// Deletes `key -> ptr` from this index. /// /// Returns whether `key -> ptr` was present. @@ -123,6 +177,19 @@ pub trait Index { /// /// If the index is unique, this will at most return one element. fn seek_point(&self, point: &Self::Key) -> Self::PointIter<'_>; + + /// Verifies the type invariant that each `(key, ptr)` only occurs once. + /// + /// This is intended to be used inside implementations of [`Index::insert`]. + #[inline(always)] // We don't want a call to this in release builds! + fn debug_ensure_key_ptr_not_included(&self, key: &Self::Key, ptr: RowPointer) { + #[cfg(debug_assertions)] + { + for existing_ptr in self.seek_point(key) { + assert_ne!(existing_ptr, ptr); + } + } + } } pub trait RangedIndex: Index { @@ -141,7 +208,7 @@ pub trait RangedIndex: Index { } /// An error indicating that the index should be despecialized to a B-Tree. -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct Despecialize; /// An error indicating that the [`Index`] is not a [`RangedIndex`]. diff --git a/crates/table/src/table_index/key_size.rs b/crates/table/src/table_index/key_size.rs index 30873e6d6d8..0b5b69b3f01 100644 --- a/crates/table/src/table_index/key_size.rs +++ b/crates/table/src/table_index/key_size.rs @@ -11,6 +11,9 @@ pub trait KeyBytesStorage: Default + MemoryUsage { /// Add `key.key_size_in_bytes()` to the statistics. fn add_to_key_bytes(&mut self, key: &I::Key); + /// Merges `src` into `self`. + fn merge(&mut self, src: Self); + /// Subtract `key.key_size_in_bytes()` from the statistics. fn sub_from_key_bytes(&mut self, key: &I::Key); @@ -23,6 +26,7 @@ pub trait KeyBytesStorage: Default + MemoryUsage { impl KeyBytesStorage for () { fn add_to_key_bytes(&mut self, _: &I::Key) {} + fn merge(&mut self, _: Self) {} fn sub_from_key_bytes(&mut self, _: &I::Key) {} fn reset_to_zero(&mut self) {} fn get(&self, index: &I) -> u64 { @@ -34,6 +38,9 @@ impl KeyBytesStorage for u64 { fn add_to_key_bytes(&mut self, key: &I::Key) { *self += key.key_size_in_bytes() as u64; } + fn merge(&mut self, src: Self) { + *self += src; + } fn sub_from_key_bytes(&mut self, key: &I::Key) { *self -= key.key_size_in_bytes() as u64; } diff --git a/crates/table/src/table_index/mod.rs b/crates/table/src/table_index/mod.rs index a467f87efc8..e7632174b43 100644 --- a/crates/table/src/table_index/mod.rs +++ b/crates/table/src/table_index/mod.rs @@ -419,7 +419,7 @@ fn as_sum_tag(av: &AlgebraicValue) -> Option<&SumTag> { as_tag(av).map(|s| s.into()) } -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Copy, Clone)] #[cfg_attr(test, derive(proptest_derive::Arbitrary))] pub enum IndexKind { BTree, @@ -668,6 +668,7 @@ impl TypedIndex { /// /// 1. Caller promises that `cols` matches what was given at construction (`Self::new`). /// 2. Caller promises that the projection of `row_ref`'s type's equals the index's key type. + /// 3. Caller promises that `row_ref` doesn't already exist in `self`. unsafe fn insert(&mut self, cols: &ColList, row_ref: RowRef<'_>) -> Result<(), RowPointer> { fn project_to_singleton_key(cols: &ColList, row_ref: RowRef<'_>) -> T { // Extract the column. @@ -703,7 +704,10 @@ impl TypedIndex { row_ref: RowRef<'_>, ) -> (Result<(), RowPointer>, Option) { let key = project_to_singleton_key(cols, row_ref); - let res = this.insert(key, row_ref.pointer()); + let ptr = row_ref.pointer(); + // SAFETY: Caller promised that `row_ref` doesn't already exist in `self`. + // This implies that `ptr` is not already in the index. + let res = unsafe { this.insert(key, ptr) }; (res, None) } @@ -728,7 +732,9 @@ impl TypedIndex { Ok(res) => (res, None), Err(Despecialize) => outlined_call(|| { let mut index = this.into_btree(); - let res = index.insert(key, ptr); + // SAFETY: Caller promised that `row_ref` doesn't already exist in `self`. + // This implies that `ptr` is not already in the index. + let res = unsafe { index.insert(key, ptr) }; (res, Some(index.into())) }), } @@ -741,7 +747,10 @@ impl TypedIndex { ) -> (Result<(), RowPointer>, Option) { // SAFETY: Caller promised that any `col` in `cols` is in-bounds of `row_ref`'s layout. let key = unsafe { row_ref.project_unchecked(cols) }; - let res = this.insert(key, row_ref.pointer()); + let ptr = row_ref.pointer(); + // SAFETY: Caller promised that `row_ref` doesn't already exist in `self`. + // This implies that `ptr` is not already in the index. + let res = unsafe { this.insert(key, ptr) }; (res, None) } @@ -833,6 +842,107 @@ impl TypedIndex { res } + /// Merge `src` into `self`. + /// + /// The closure `translate` is called + /// to convert row pointers from ones that fit the tx state + /// to ones that fit the committed state, + /// typically by using a translation table. + /// + /// This method does not return a result, + /// as it's only used when merging, + /// when we already know that there will be no uniqueness constraint conflicts. + /// + /// # Safety + /// + /// For every `(key, ptr)` in `src`, + /// `(key, translate(ptr))` does not exist in `self`. + unsafe fn merge_from(&mut self, src: Self, translate: impl Fn(RowPointer) -> RowPointer) { + use TypedIndex::*; + + // SAFETY: forward caller requirements. + match (self, src) { + (BtreeBool(lhs), BtreeBool(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (BtreeU8(lhs), BtreeU8(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (BtreeSumTag(lhs), BtreeSumTag(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (BtreeI8(lhs), BtreeI8(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (BtreeU16(lhs), BtreeU16(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (BtreeI16(lhs), BtreeI16(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (BtreeU32(lhs), BtreeU32(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (BtreeI32(lhs), BtreeI32(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (BtreeU64(lhs), BtreeU64(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (BtreeI64(lhs), BtreeI64(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (BtreeU128(lhs), BtreeU128(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (BtreeI128(lhs), BtreeI128(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (BtreeU256(lhs), BtreeU256(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (BtreeI256(lhs), BtreeI256(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (BtreeF32(lhs), BtreeF32(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (BtreeF64(lhs), BtreeF64(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (BtreeString(lhs), BtreeString(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (BtreeAV(lhs), BtreeAV(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (HashBool(lhs), HashBool(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (HashU8(lhs), HashU8(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (HashSumTag(lhs), HashSumTag(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (HashI8(lhs), HashI8(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (HashU16(lhs), HashU16(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (HashI16(lhs), HashI16(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (HashU32(lhs), HashU32(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (HashI32(lhs), HashI32(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (HashU64(lhs), HashU64(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (HashI64(lhs), HashI64(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (HashU128(lhs), HashU128(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (HashI128(lhs), HashI128(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (HashU256(lhs), HashU256(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (HashI256(lhs), HashI256(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (HashF32(lhs), HashF32(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (HashF64(lhs), HashF64(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (HashString(lhs), HashString(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (HashAV(lhs), HashAV(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueBtreeBool(lhs), UniqueBtreeBool(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueBtreeU8(lhs), UniqueBtreeU8(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueBtreeSumTag(lhs), UniqueBtreeSumTag(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueBtreeI8(lhs), UniqueBtreeI8(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueBtreeU16(lhs), UniqueBtreeU16(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueBtreeI16(lhs), UniqueBtreeI16(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueBtreeU32(lhs), UniqueBtreeU32(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueBtreeI32(lhs), UniqueBtreeI32(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueBtreeU64(lhs), UniqueBtreeU64(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueBtreeI64(lhs), UniqueBtreeI64(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueBtreeU128(lhs), UniqueBtreeU128(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueBtreeI128(lhs), UniqueBtreeI128(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueBtreeU256(lhs), UniqueBtreeU256(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueBtreeI256(lhs), UniqueBtreeI256(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueBtreeF32(lhs), UniqueBtreeF32(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueBtreeF64(lhs), UniqueBtreeF64(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueBtreeString(lhs), UniqueBtreeString(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueBtreeAV(lhs), UniqueBtreeAV(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueHashBool(lhs), UniqueHashBool(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueHashU8(lhs), UniqueHashU8(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueHashSumTag(lhs), UniqueHashSumTag(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueHashI8(lhs), UniqueHashI8(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueHashU16(lhs), UniqueHashU16(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueHashI16(lhs), UniqueHashI16(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueHashU32(lhs), UniqueHashU32(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueHashI32(lhs), UniqueHashI32(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueHashU64(lhs), UniqueHashU64(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueHashI64(lhs), UniqueHashI64(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueHashU128(lhs), UniqueHashU128(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueHashI128(lhs), UniqueHashI128(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueHashU256(lhs), UniqueHashU256(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueHashI256(lhs), UniqueHashI256(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueHashF32(lhs), UniqueHashF32(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueHashF64(lhs), UniqueHashF64(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueHashString(lhs), UniqueHashString(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueHashAV(lhs), UniqueHashAV(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueDirectU8(lhs), UniqueDirectU8(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueDirectSumTag(lhs), UniqueDirectSumTag(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueDirectU16(lhs), UniqueDirectU16(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueDirectU32(lhs), UniqueDirectU32(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + (UniqueDirectU64(lhs), UniqueDirectU64(rhs)) => unsafe { lhs.merge_from(rhs, translate) }, + _ => unreachable!("non-matching index kinds"), + } + } + /// Remove the row referred to by `row_ref` from the index `self`, /// which must be keyed at `cols`. /// @@ -1161,7 +1271,6 @@ impl TypedIndex { same_for_all_types!(self, this => this.clear()) } - #[allow(unused)] // used only by tests fn is_empty(&self) -> bool { self.num_rows() == 0 } @@ -1187,7 +1296,7 @@ impl TypedIndex { /// This method runs in constant time. /// /// See the [`KeySize`] trait for more details on how this method computes its result. - pub fn num_key_bytes(&self) -> u64 { + fn num_key_bytes(&self) -> u64 { same_for_all_types!(self, this => this.num_key_bytes()) } } @@ -1263,18 +1372,46 @@ impl TableIndex { /// /// # Safety /// - /// Caller promises that projecting the `row_ref`'s type - /// to the index's columns equals the index's key type. - /// This is entailed by an index belonging to the table's schema. - /// It also follows from `row_ref`'s type/layout - /// being the same as passed in on `self`'s construction. + /// 1. Caller promises that projecting the `row_ref`'s type + /// to the index's columns equals the index's key type. + /// This is entailed by an index belonging to the table's schema. + /// It also follows from `row_ref`'s type/layout + /// being the same as passed in on `self`'s construction. + /// + /// 2. Caller promises that `row_ref` doesn't already exist in `self`. pub unsafe fn check_and_insert(&mut self, row_ref: RowRef<'_>) -> Result<(), RowPointer> { // SAFETY: // 1. We're passing the same `ColList` that was provided during construction. // 2. Forward the caller's proof obligation. + // 3. Forward the caller's proof obligation. unsafe { self.idx.insert(&self.indexed_columns, row_ref) } } + /// Merge `src_index` into `self`. + /// + /// The closure `translate` is called + /// to convert row pointers from ones that fit the tx state + /// to ones that fit the committed state, + /// typically by using a translation table. + /// + /// This method does not return a result, + /// as it's only used when merging, + /// when we already know that there will be no uniqueness constraint conflicts. + /// + /// # Safety + /// + /// 1. Caller promises that `self.key_type == src.key_type`. + /// 2. Caller promises that `self.indexed_columns == src.indexed_columns`. + /// 3. For every `(key, ptr)` in `src`, + /// `(key, translate(ptr))` does not exist in `self`. + pub unsafe fn merge_from(&mut self, src: Self, translate: impl Fn(RowPointer) -> RowPointer) { + debug_assert_eq!(self.key_type, src.key_type); + debug_assert_eq!(self.indexed_columns, src.indexed_columns); + + // SAFETY: Forward caller proof obligation 3. + unsafe { self.idx.merge_from(src.idx, translate) }; + } + /// Deletes `row_ref` with its indexed value `row_ref.project(&self.indexed_columns)` from this index. /// /// Returns whether `ptr` was present. @@ -1323,17 +1460,19 @@ impl TableIndex { /// /// # Safety /// - /// Caller promises that projecting any of the `row_ref`'s type - /// to the index's columns equals the index's key type. - /// This is entailed by an index belonging to the table's schema. - /// It also follows from `row_ref`'s type/layout - /// being the same as passed in on `self`'s construction. + /// 1. Caller promises that projecting any of the `row_ref`'s type + /// to the index's columns equals the index's key type. + /// This is entailed by an index belonging to the table's schema. + /// It also follows from `row_ref`'s type/layout + /// being the same as passed in on `self`'s construction. + /// + /// 2. Caller promises that none of the `rows`s already exist in `self`. pub unsafe fn build_from_rows<'table>( &mut self, rows: impl IntoIterator>, ) -> Result<(), RowPointer> { rows.into_iter() - // SAFETY: Forward caller proof obligation. + // SAFETY: Forward caller proof obligations 1. and 2. .try_for_each(|row_ref| unsafe { self.check_and_insert(row_ref) }) } @@ -1437,6 +1576,12 @@ impl TableIndex { self.idx.clear(); } + /// Returns whether this index is empty. + #[allow(unused)] // used only by tests + fn is_empty(&self) -> bool { + self.idx.is_empty() + } + /// The number of unique keys in this index. pub fn num_keys(&self) -> usize { self.idx.num_keys() @@ -1470,6 +1615,7 @@ mod test { use crate::page_pool::PagePool; use crate::{blob_store::HashMapBlobStore, table::test::table}; use core::ops::Bound::*; + use core::panic::AssertUnwindSafe; use decorum::Total; use proptest::prelude::*; use proptest::{ @@ -1479,12 +1625,14 @@ mod test { use spacetimedb_data_structures::map::HashMap; use spacetimedb_lib::ProductTypeElement; use spacetimedb_primitives::ColId; - use spacetimedb_sats::proptest::{generate_algebraic_value, generate_primitive_algebraic_type}; + use spacetimedb_sats::proptest::{gen_with, generate_algebraic_value, generate_primitive_algebraic_type}; use spacetimedb_sats::{ product, proptest::{generate_product_value, generate_row_type}, AlgebraicType, ProductType, ProductValue, }; + use std::collections::HashSet; + use std::panic; fn gen_cols(ty_len: usize) -> impl Strategy { vec((0..ty_len as u16).prop_map_into::(), 1..=ty_len) @@ -1492,13 +1640,17 @@ mod test { } fn gen_row_and_cols() -> impl Strategy { - generate_row_type(1..16).prop_flat_map(|ty| { - ( - Just(ty.clone()), - gen_cols(ty.elements.len()), - generate_product_value(ty), - ) + gen_with(generate_row_type(1..16), |ty| { + (gen_cols(ty.elements.len()), generate_product_value(ty)) + }) + .prop_map(|(a, (b, c))| (a, b, c)) + } + + fn gen_distinct_rows_and_cols() -> impl Strategy)> { + gen_with(generate_row_type(1..16), |ty| { + (gen_cols(ty.elements.len()), hash_set(generate_product_value(ty), 0..16)) }) + .prop_map(|(a, (b, c))| (a, b, c)) } impl IndexKind { @@ -1579,7 +1731,7 @@ mod test { let mut blob_store = HashMapBlobStore::default(); let row_ref = table.insert(&pool, &mut blob_store, &pv).unwrap().1; prop_assert_eq!(index.delete(row_ref).unwrap(), false); - prop_assert!(index.idx.is_empty()); + prop_assert!(index.is_empty()); prop_assert_eq!(index.num_keys(), 0); prop_assert_eq!(index.num_key_bytes(), 0); prop_assert_eq!(index.num_rows(), 0); @@ -1810,5 +1962,94 @@ mod test { let rows = index.seek_range(&(Excluded(&val), Excluded(&val))).unwrap().collect::>(); assert_eq!(rows, []); } + + #[test] + fn insert_twice_will_panic((ty, cols, pv) in gen_row_and_cols(), is_unique: bool, kind: IndexKind) { + let mut index = new_index(&ty, &cols, is_unique, kind); + let mut table = table(ty); + let pool = PagePool::new_for_test(); + let mut blob_store = HashMapBlobStore::default(); + let row_ref = table.insert(&pool, &mut blob_store, &pv).unwrap().1; + + // First insertion of `row_ref` works. + unsafe { index.check_and_insert(row_ref) }.unwrap(); + + // Second insertion panics. + let hook = panic::take_hook(); + panic::set_hook(Box::new(|_| {})); + let result = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe { index.check_and_insert(row_ref) })); + panic::set_hook(hook); + prop_assert!(result.is_err()); + } + + #[test] + fn merge_from_is_union((ty, cols, pvs) in gen_distinct_rows_and_cols(), kind: IndexKind, is_unique: bool) { + // Make the table and stuff. + let mut table = table(ty.clone()); + let pool = PagePool::new_for_test(); + let mut blob_store = HashMapBlobStore::default(); + + if is_unique { + // Ensure all rows are distinct on the indexed columns. + let mut seen = HashSet::new(); + for pv in &pvs { + let key = get_fields(&cols, pv); + prop_assume!(seen.insert(key)); + } + } + + // Make two indices. + let mut index_lhs = new_index(&ty, &cols, is_unique, kind); + let mut index_rhs = new_index(&ty, &cols, is_unique, kind); + + // Insert all rows into the table. + for pv in pvs { + let _ = table.insert(&pool, &mut blob_store, &pv).unwrap().1; + } + + // Insert half of the rows into each index. + let row_refs = table.scan_rows(&blob_store).collect::>(); + let (left, right) = row_refs.split_at(row_refs.len() / 2); + for row_ref in left { + unsafe { index_lhs.check_and_insert(*row_ref) }.unwrap(); + } + for row_ref in right { + unsafe { index_rhs.check_and_insert(*row_ref) }.unwrap(); + } + + // First ensure that `can_merge` says they can be merged. + prop_assert_eq!(index_lhs.can_merge(&index_rhs, |_| false), Ok(())); + // Ignoring all rows allows merging. + prop_assert_eq!(index_lhs.can_merge(&index_lhs, |_| true), Ok(())); + prop_assert_eq!(index_rhs.can_merge(&index_rhs, |_| true), Ok(())); + // For unique indices, + // either side cannot be merged with themselves, + // unless they are empty. + if is_unique { + if !index_lhs.is_empty() { + prop_assert!(index_lhs.can_merge(&index_lhs, |_| false).is_err()); + } + if !index_rhs.is_empty() { + prop_assert!(index_rhs.can_merge(&index_rhs, |_| false).is_err()); + } + } + + // Merge `index_rhs` into `index_lhs`. + let num_keys_lhs = index_lhs.num_keys(); + let num_keys_rhs = index_rhs.num_keys(); + let num_key_bytes_lhs = index_lhs.num_key_bytes(); + let num_key_bytes_rhs = index_rhs.num_key_bytes(); + let num_rows_lhs = index_lhs.num_rows(); + let num_rows_rhs = index_rhs.num_rows(); + unsafe { index_lhs.merge_from(index_rhs, |x| x) }; + + // Witness the union of `index_lhs` and `index_rhs` in `index_lhs`. + prop_assert_eq!(index_lhs.num_rows(), table.num_rows()); + prop_assert_eq!(index_lhs.num_rows(), num_rows_lhs + num_rows_rhs); + prop_assert_eq!(index_lhs.num_key_bytes(), num_key_bytes_lhs + num_key_bytes_rhs); + if is_unique { + prop_assert_eq!(index_lhs.num_keys(), num_keys_lhs + num_keys_rhs); + } + } } } diff --git a/crates/table/src/table_index/multimap.rs b/crates/table/src/table_index/multimap.rs index b1ab1361072..4e9416a4828 100644 --- a/crates/table/src/table_index/multimap.rs +++ b/crates/table/src/table_index/multimap.rs @@ -5,6 +5,44 @@ use core::ops::RangeBounds; use spacetimedb_sats::memory_usage::MemoryUsage; use std::collections::btree_map::{BTreeMap, Range}; +/// Statistics for a non-unique index. +#[derive(Default, Debug, PartialEq, Eq)] +pub(super) struct MultiMapStatistics { + /// The memoized number of rows indexed in `self.map`. + pub(super) num_rows: usize, + /// Storage for [`Index::num_key_bytes`]. + pub(super) num_key_bytes: u64, +} + +impl MemoryUsage for MultiMapStatistics {} + +impl MultiMapStatistics { + /// Adds statistics for a row with `key`. + pub(super) fn add(&mut self, key: &I::Key) { + self.num_rows += 1; + self.num_key_bytes.add_to_key_bytes::(key); + } + + /// Merges `src` into `self`. + pub(super) fn merge_from(&mut self, src: Self) { + self.num_rows += src.num_rows; + self.num_key_bytes += src.num_key_bytes; + } + + /// Deletes statistics for a row with `key`. + pub(super) fn delete(&mut self, key: &I::Key) { + self.num_rows -= 1; + self.num_key_bytes.sub_from_key_bytes::(key); + } + + /// Clears the statistics to zero. + pub(super) fn clear(&mut self) { + self.num_rows = 0; + self.num_key_bytes.reset_to_zero(); + } +} + +// TODO(centril): rename this to `BTreeIndex`. /// A multi map that relates a `K` to a *set* of `RowPointer`s. #[derive(Debug, PartialEq, Eq)] pub struct MultiMap { @@ -15,55 +53,72 @@ pub struct MultiMap { /// as we allow a single element to be stored inline /// to improve performance for the common case of one element. map: BTreeMap, - /// The memoized number of rows indexed in `self.map`. - num_rows: usize, - /// Storage for [`Index::num_key_bytes`]. - num_key_bytes: u64, + /// Stats for `num_rows` and `num_key_bytes`. + stats: MultiMapStatistics, } impl Default for MultiMap { fn default() -> Self { Self { map: <_>::default(), - num_rows: <_>::default(), - num_key_bytes: <_>::default(), + stats: <_>::default(), } } } impl MemoryUsage for MultiMap { fn heap_usage(&self) -> usize { - let Self { - map, - num_rows, - num_key_bytes, - } = self; - map.heap_usage() + num_rows.heap_usage() + num_key_bytes.heap_usage() + let Self { map, stats } = self; + map.heap_usage() + stats.heap_usage() } } -impl Index for MultiMap { +// SAFETY: The implementations of all constructing +// and mutating methods uphold the invariant, +// assuming the caller requirements of the `unsafe` methods are upheld, +// that every `key -> ptr` pair only ever occurs once. +unsafe impl Index for MultiMap { type Key = K; + // ========================================================================= + // Construction + // ========================================================================= + fn clone_structure(&self) -> Self { <_>::default() } - /// Inserts the relation `key -> ptr` to this multimap. - /// - /// The map does not check whether `key -> ptr` was already in the map. - /// It's assumed that the same `ptr` is never added twice, - /// and multimaps do not bind one `key` to the same `ptr`. - fn insert(&mut self, key: Self::Key, ptr: RowPointer) -> Result<(), RowPointer> { - self.num_rows += 1; - self.num_key_bytes.add_to_key_bytes::(&key); - self.map.entry(key).or_default().push(ptr); + // ========================================================================= + // Mutation + // ========================================================================= + + unsafe fn insert(&mut self, key: Self::Key, ptr: RowPointer) -> Result<(), RowPointer> { + self.debug_ensure_key_ptr_not_included(&key, ptr); + + self.stats.add::(&key); + let entry = self.map.entry(key).or_default(); + // SAFETY: caller promised that `(key, ptr)` does not exist in the index + // so it also does not exist in `entry`. + unsafe { entry.push(ptr) }; Ok(()) } - /// Deletes `key -> ptr` from this multimap. - /// - /// Returns whether `key -> ptr` was present. + unsafe fn merge_from(&mut self, src_index: Self, mut translate: impl FnMut(RowPointer) -> RowPointer) { + // Merge `stats`. + self.stats.merge_from(src_index.stats); + + // Move over the `key -> ptr` pairs + // and translate `ptr`s from `src_index` to fit `self`. + for (key, src) in src_index.map { + let dst = self.map.entry(key).or_default(); + + // SAFETY: Given `(key, ptr)` in `src_index`, + // `(key, translate(ptr))` does not exist in `self`. + // It follows that the `dst ∩ translate(src) = ∅`. + unsafe { dst.merge_from(src, &mut translate) }; + } + } + fn delete(&mut self, key: &K, ptr: RowPointer) -> bool { let Some(vset) = self.map.get_mut(key) else { return false; @@ -76,13 +131,22 @@ impl Index for MultiMap { } if deleted { - self.num_rows -= 1; - self.num_key_bytes.sub_from_key_bytes::(key); + self.stats.delete::(key); } deleted } + fn clear(&mut self) { + // Unfortunately, this will drop the existing allocation. + self.map.clear(); + self.stats.clear(); + } + + // ========================================================================= + // Querying + // ========================================================================= + type PointIter<'a> = SameKeyEntryIter<'a> where @@ -97,19 +161,11 @@ impl Index for MultiMap { } fn num_key_bytes(&self) -> u64 { - self.num_key_bytes + self.stats.num_key_bytes } fn num_rows(&self) -> usize { - self.num_rows - } - - /// Deletes all entries from the multimap, leaving it empty. - /// This will not deallocate the outer map. - fn clear(&mut self) { - self.map.clear(); - self.num_rows = 0; - self.num_key_bytes.reset_to_zero(); + self.stats.num_rows } fn can_merge(&self, _: &Self, _: impl Fn(&RowPointer) -> bool) -> Result<(), RowPointer> { diff --git a/crates/table/src/table_index/same_key_entry.rs b/crates/table/src/table_index/same_key_entry.rs index b17d7c38c1a..a00a3d8e2fc 100644 --- a/crates/table/src/table_index/same_key_entry.rs +++ b/crates/table/src/table_index/same_key_entry.rs @@ -1,9 +1,12 @@ use crate::{indexes::RowPointer, static_assert_size}; -use core::slice; +use core::{mem, slice}; use smallvec::SmallVec; use spacetimedb_data_structures::map::{hash_set, HashCollectionExt, HashSet}; use spacetimedb_memory_usage::MemoryUsage; +type Small = SmallVec<[RowPointer; 2]>; +type Large = HashSet; + /// A supporting type for multimap implementations /// that handles all the values for the same key, /// leaving the multimap to only have to care about the keys. @@ -28,7 +31,10 @@ pub(super) enum SameKeyEntry { /// Up to two values are represented inline here. /// It's not profitable to represent this as a separate variant /// as that would increase `size_of::()` by 8 bytes. - Small(SmallVec<[RowPointer; 2]>), + /// + /// Invariant: The callers of [`SameKeyEntry::push`] and [`SameKeyEntry::merge_from`] + /// ensure that this will never contain duplicates. + Small(Small), /// A large number of values. /// @@ -38,7 +44,7 @@ pub(super) enum SameKeyEntry { /// Note that using a `HashSet`, with `S = RandomState`, /// entails that the iteration order is not deterministic. /// This is observed when doing queries against the index. - Large(HashSet), + Large(Large), } static_assert_size!(SameKeyEntry, 32); @@ -63,28 +69,104 @@ impl SameKeyEntry { /// beyond which the strategy is changed from small to large storage. const LARGE_AFTER_LEN: usize = 4096 / size_of::(); + /// Extends `set` with `elems`. + /// + // SAFETY: `elems` must not be contained in `set`. + #[inline] + unsafe fn extend_unique(set: &mut Large, elems: impl IntoIterator) { + for val in elems.into_iter() { + // SAFETY: caller promised that `small` contains no duplicates. + unsafe { set.insert_unique_unchecked(val) }; + } + } + /// Pushes `val` as an entry for the key. /// /// This assumes that `val` was previously not recorded. /// The structure does not check whether it was previously resident. /// As a consequence, the time complexity is `O(k)` amortized. - pub(super) fn push(&mut self, val: RowPointer) { + /// + /// # Safety + /// + /// - `val` does not occur in `self`. + pub(super) unsafe fn push(&mut self, val: RowPointer) { match self { - Self::Small(list) if list.len() <= Self::LARGE_AFTER_LEN => { - list.push(val); + Self::Small(set) if set.len() <= Self::LARGE_AFTER_LEN => { + // SAFETY: The caller promised that `val` is not in `set`, + // so this preserves our invariant that `set` is a set. + set.push(val); } Self::Small(list) => { - // Reconstruct into a set. + // Reconstruct into a hash set. let mut set = HashSet::with_capacity(list.len() + 1); - set.extend(list.drain(..)); + // SAFETY: Before `.push`, `list` was a set and contained no duplicates. + unsafe { Self::extend_unique(&mut set, mem::take(list)) }; // Add `val`. - set.insert(val); + // SAFETY: Caller promised that `set` did not include `val`. + unsafe { set.insert_unique_unchecked(val) }; *self = Self::Large(set); } Self::Large(set) => { - set.insert(val); + // SAFETY: Caller promised that `set` did not include `val`. + unsafe { set.insert_unique_unchecked(val) }; + } + } + } + + /// Merges all values in `src` into `self`, + /// translating all the former via `by` first. + /// + /// # Safety + /// + /// - The intersection of `dst` and `by(src)` is empty. + /// That is, `self ∩ by(src) = ∅` holds. + pub(super) unsafe fn merge_from(&mut self, src: Self, mut by: impl FnMut(RowPointer) -> RowPointer) { + match src { + Self::Small(mut src) => { + // Translate the source. + for ptr in &mut src { + *ptr = by(*ptr); + } + + // Insert into `self`. + match self { + Self::Small(dst) => { + // SAFETY: The intersection is empty, so `dst ++ src` is also a set. + dst.append(&mut src); + } + Self::Large(dst) => { + for val in src.into_iter() { + // SAFETY: The intersection is empty, so the union is also a set. + unsafe { dst.insert_unique_unchecked(val) }; + } + } + } + } + Self::Large(src) => { + // Translate the source. + let src = src.into_iter().map(by); + + match self { + Self::Small(dst) => { + // Reconstruct into a hash set with combined size. + let mut set = HashSet::with_capacity(dst.len() + src.len()); + let dst = mem::take(dst).into_iter(); + // SAFETY: `dst` is a set by `Self`'s invariant and `set` is empty. + unsafe { Self::extend_unique(&mut set, dst) }; + + // Merge `src` into `set`. + // SAFETY: The intersection is empty, so the union is also a set. + unsafe { Self::extend_unique(&mut set, src) }; + } + Self::Large(dst) => { + // Merge `src` into `set`. + dst.reserve(src.len()); + // SAFETY: The intersection is empty, so the union is also a set. + unsafe { Self::extend_unique(dst, src) }; + } + } } } } diff --git a/crates/table/src/table_index/unique_direct_fixed_cap_index.rs b/crates/table/src/table_index/unique_direct_fixed_cap_index.rs index 805d70fc83d..2211c7ec5cf 100644 --- a/crates/table/src/table_index/unique_direct_fixed_cap_index.rs +++ b/crates/table/src/table_index/unique_direct_fixed_cap_index.rs @@ -40,27 +40,43 @@ impl UniqueDirectFixedCapIndex { } } -impl Index for UniqueDirectFixedCapIndex { +// SAFETY: The implementations of all constructing +// and mutating methods uphold the invariant, +// assuming the caller requirements of the `unsafe` methods are upheld, +// that every `key -> ptr` pair only ever occurs once. +// In fact, given that this is a unique index, +// this is statically guaranteed as one `key` only has one slot for one `ptr`. +unsafe impl Index for UniqueDirectFixedCapIndex { type Key = K; + // ========================================================================= + // Construction + // ========================================================================= + /// Clones the structure of the index and returns one with the same capacity. fn clone_structure(&self) -> Self { Self::new(self.array.len()) } - /// Inserts the relation `key -> val` to this index. + // ========================================================================= + // Mutation + // ========================================================================= + + /// Inserts the relation `key -> ptr` to this map. /// - /// If `key` was already present in the index, does not add an association with `val`. - /// Returns the existing associated value instead. + /// If `key` was already present in the index, + /// and the index is unique, + /// does not add the relation `key -> ptr` + /// and returns the existing associated pointer instead. /// /// Panics if the key is beyond the fixed capacity of this index. - fn insert(&mut self, key: Self::Key, val: RowPointer) -> Result<(), RowPointer> { + unsafe fn insert(&mut self, key: Self::Key, ptr: RowPointer) -> Result<(), RowPointer> { // Fetch the slot. let slot = &mut self.array[key.to_usize()]; let in_slot = *slot; if in_slot == NONE_PTR { // We have `NONE_PTR`, so not set yet. - *slot = injest(val); + *slot = injest(ptr); self.len += 1; Ok(()) } else { @@ -68,6 +84,20 @@ impl Index for UniqueDirectFixedCapIndex { } } + /// Merge `src`, mapped through `translate`, into `self`. + unsafe fn merge_from(&mut self, src: Self, translate: impl Fn(RowPointer) -> RowPointer) { + assert_eq!(self.array.len(), src.array.len()); + self.len += src.len; + + src.array + .into_vec() + .into_iter() + .enumerate() + // Ignore unset slots. + .filter(|(_, ptr)| *ptr != NONE_PTR) + .for_each(|(key, ptr)| self.array[key] = translate(ptr)); + } + fn delete(&mut self, &key: &Self::Key, _: RowPointer) -> bool { let Some(slot) = self.array.get_mut(key.to_usize()) else { return false; @@ -78,6 +108,15 @@ impl Index for UniqueDirectFixedCapIndex { deleted } + fn clear(&mut self) { + self.array.fill(NONE_PTR); + self.len = 0; + } + + // ========================================================================= + // Querying + // ========================================================================= + type PointIter<'a> = UniqueDirectIndexPointIter where @@ -92,11 +131,6 @@ impl Index for UniqueDirectFixedCapIndex { self.len } - fn clear(&mut self) { - self.array.fill(NONE_PTR); - self.len = 0; - } - fn can_merge(&self, other: &Self, ignore: impl Fn(&RowPointer) -> bool) -> Result<(), RowPointer> { for (slot_s, slot_o) in self.array.iter().zip(other.array.iter()) { let ptr_s = expose(*slot_s); @@ -178,7 +212,7 @@ mod test { let mut index = UniqueDirectFixedCapIndex::new(u8::MAX as usize + 1); for (key, ptr) in keys.iter().zip(&ptrs) { - index.insert(*key, *ptr).unwrap(); + index.insert_for_test(*key, *ptr).unwrap(); } assert_eq!(index.num_rows(), (range.end - range.start) as usize); (index, range, ptrs) @@ -196,7 +230,7 @@ mod test { fn inserting_again_errors(start: u8, end: u8) { let (mut index, keys, ptrs) = setup(start, end); for (key, ptr) in keys.zip(&ptrs) { - assert_eq!(index.insert(key, *ptr).unwrap_err(), *ptr) + assert_eq!(index.insert_for_test(key, *ptr).unwrap_err(), *ptr) } } @@ -215,7 +249,7 @@ mod test { assert!(!index.delete(&key, ptr)); assert_eq!(index.num_rows(), (range.end - range.start - 1) as usize); - index.insert(key, ptr).unwrap(); + index.insert_for_test(key, ptr).unwrap(); assert_eq!(index.num_rows(), (range.end - range.start) as usize); } } diff --git a/crates/table/src/table_index/unique_direct_index.rs b/crates/table/src/table_index/unique_direct_index.rs index 6730e642270..4a1f75ec7ce 100644 --- a/crates/table/src/table_index/unique_direct_index.rs +++ b/crates/table/src/table_index/unique_direct_index.rs @@ -163,17 +163,31 @@ impl InnerIndex { } } -impl Index for UniqueDirectIndex { +// SAFETY: The implementations of all constructing +// and mutating methods uphold the invariant, +// assuming the caller requirements of the `unsafe` methods are upheld, +// that every `key -> ptr` pair only ever occurs once. +// In fact, given that this is a unique index, +// this is statically guaranteed as one `key` only has one slot for one `ptr`. +unsafe impl Index for UniqueDirectIndex { type Key = K; + // ========================================================================= + // Construction + // ========================================================================= + fn clone_structure(&self) -> Self { Self::default() } + // ========================================================================= + // Mutation + // ========================================================================= + fn insert_maybe_despecialize( &mut self, key: Self::Key, - val: RowPointer, + ptr: RowPointer, ) -> Result, Despecialize> { let key = key.to_usize(); if key > const { u32::MAX as usize } { @@ -203,7 +217,7 @@ impl Index for UniqueDirectIndex { let in_slot = *slot; Ok(if in_slot == NONE_PTR { // We have `NONE_PTR`, so not set yet. - *slot = injest(val); + *slot = injest(ptr); self.len += 1; Ok(()) } else { @@ -211,6 +225,48 @@ impl Index for UniqueDirectIndex { }) } + unsafe fn merge_from(&mut self, src: Self, translate: impl Fn(RowPointer) -> RowPointer) { + self.len += src.len; + + // Fetch the `self`'s outer index and ensure it can house at least `src.outer.len()`. + let dst_outer = &mut self.outer; + dst_outer.resize(dst_outer.len().max(src.outer.len()), None); + + for (key_outer, mut src_inner) in src + .outer + .into_iter() + .enumerate() + // Ignore unused outer slots. + .filter_map(|(pos, o)| o.map(|o| (pos, o))) + { + // Fetch `self`'s inner index. + // SAFETY: ensured in `.resize(_)` that `key_outer < dst_outer.len()`, making indexing to `key_outer` valid. + let dst_inner = unsafe { dst_outer.get_unchecked_mut(key_outer) }; + + match dst_inner { + // `self` doesn't have this inner index, so move it over but translate first. + None => { + // Translate all used slots in `src_inner`. + src_inner + .inner + .iter_mut() + .filter(|s| **s != NONE_PTR) + .for_each(|slot| *slot = injest(translate(*slot))); + *dst_inner = Some(src_inner); + } + // `self` does have the inner index, so write each ptr in `src_inner` to `dst_inner`. + Some(dst_inner) => { + dst_inner + .inner + .iter_mut() + .zip(src_inner.inner.iter().copied()) + .filter(|(_, src)| *src != NONE_PTR) + .for_each(|(dst, src)| *dst = injest(translate(src))); + } + } + } + } + fn delete(&mut self, key: &Self::Key, _: RowPointer) -> bool { let key = key.to_usize(); let (key_outer, key_inner) = split_key(key); @@ -225,6 +281,16 @@ impl Index for UniqueDirectIndex { false } + fn clear(&mut self) { + // This will not deallocate the outer index. + self.outer.clear(); + self.len = 0; + } + + // ========================================================================= + // Querying + // ========================================================================= + type PointIter<'a> = UniqueDirectIndexPointIter where @@ -246,13 +312,6 @@ impl Index for UniqueDirectIndex { self.len } - /// Deletes all entries from the index, leaving it empty. - /// This will not deallocate the outer index. - fn clear(&mut self) { - self.outer.clear(); - self.len = 0; - } - /// Returns whether `other` can be merged into `self` /// with an error containing the element in `self` that caused the violation. /// @@ -395,9 +454,9 @@ impl UniqueDirectIndex { let key = key_outer * KEYS_PER_INNER + key_inner; let key = K::from_usize(key); - new_index - .insert(key, expose(slot)) - .expect("insertions from one unique index to another cannot fail") + let ptr = expose(slot); + // SAFETY: + unsafe { new_index.insert(key, ptr) }.expect("insertions from one unique index to another cannot fail") } } @@ -436,7 +495,7 @@ pub(super) mod test { let mut index = UniqueDirectIndex::default(); for (key, ptr) in keys.iter().zip(&ptrs) { - index.insert(*key, *ptr).unwrap(); + index.insert_for_test(*key, *ptr).unwrap(); } assert_eq!(index.num_rows(), 4); @@ -451,11 +510,11 @@ pub(super) mod test { let mut index = UniqueDirectIndex::default(); for (key, ptr) in keys.iter().zip(&ptrs) { - index.insert(*key, *ptr).unwrap(); + index.insert_for_test(*key, *ptr).unwrap(); } for (key, ptr) in keys.iter().zip(&ptrs) { - assert_eq!(index.insert(*key, *ptr).unwrap_err(), *ptr) + assert_eq!(index.insert_for_test(*key, *ptr).unwrap_err(), *ptr) } } @@ -466,7 +525,7 @@ pub(super) mod test { let mut index = UniqueDirectIndex::default(); for (key, ptr) in keys.iter().zip(&ptrs) { - index.insert(*key, *ptr).unwrap(); + index.insert_for_test(*key, *ptr).unwrap(); } assert_eq!(index.num_rows(), 4); @@ -476,7 +535,17 @@ pub(super) mod test { assert!(!index.delete(&key, ptr)); assert_eq!(index.num_rows(), 3); - index.insert(key, ptr).unwrap(); + index.insert_for_test(key, ptr).unwrap(); assert_eq!(index.num_rows(), 4); } + + #[test] + fn too_large_key() { + let mut index = UniqueDirectIndex::default(); + let whatever = RowPointer(0); + assert_eq!( + index.insert_maybe_despecialize(u32::MAX as u64 + 1, whatever), + Err(Despecialize) + ); + } } diff --git a/crates/table/src/table_index/unique_hash_index.rs b/crates/table/src/table_index/unique_hash_index.rs index ba572d03085..6728e0d031c 100644 --- a/crates/table/src/table_index/unique_hash_index.rs +++ b/crates/table/src/table_index/unique_hash_index.rs @@ -36,24 +36,58 @@ impl MemoryUsage for UniqueHashIndex { } } -impl Index for UniqueHashIndex { +// SAFETY: The implementations of all constructing +// and mutating methods uphold the invariant, +// assuming the caller requirements of the `unsafe` methods are upheld, +// that every `key -> ptr` pair only ever occurs once. +// In fact, given that this is a unique index, +// this is statically guaranteed as one `key` only has one slot for one `ptr`. +unsafe impl Index for UniqueHashIndex { type Key = K; + // ========================================================================= + // Construction + // ========================================================================= + fn clone_structure(&self) -> Self { <_>::default() } - fn insert(&mut self, key: Self::Key, ptr: RowPointer) -> Result<(), RowPointer> { + // ========================================================================= + // Mutation + // ========================================================================= + + unsafe fn insert(&mut self, key: Self::Key, ptr: RowPointer) -> Result<(), RowPointer> { match self.map.entry(key) { + // `key` not found in index, let's add it. Entry::Vacant(e) => { self.num_key_bytes.add_to_key_bytes::(e.key()); e.insert(ptr); Ok(()) } + // Unique constraint violation! Entry::Occupied(e) => Err(*e.into_mut()), } } + unsafe fn merge_from(&mut self, src_index: Self, mut translate: impl FnMut(RowPointer) -> RowPointer) { + // Merge `num_key_bytes`. + self.num_key_bytes.merge(src_index.num_key_bytes); + + // Move over the `key -> ptr` pairs + // and translate all row pointers in `src` to fit `self`. + let src = src_index.map; + self.map.reserve(src.len()); + for (key, src) in src { + // SAFETY: Given `(key, ptr)` in `src`, + // `(key, translate(ptr))` does not exist in `self`. + // As each index is a unique index, + // it follows that if `key` exist in `src` + // it does not also exist in `self`. + unsafe { self.map.insert_unique_unchecked(key, translate(src)) }; + } + } + fn delete(&mut self, key: &Self::Key, _: RowPointer) -> bool { let ret = self.map.remove(key).is_some(); if ret { @@ -67,6 +101,10 @@ impl Index for UniqueHashIndex { self.num_key_bytes.reset_to_zero(); } + // ========================================================================= + // Querying + // ========================================================================= + fn can_merge(&self, other: &Self, ignore: impl Fn(&RowPointer) -> bool) -> Result<(), RowPointer> { let Some(found) = other .map diff --git a/crates/table/src/table_index/uniquemap.rs b/crates/table/src/table_index/uniquemap.rs index fa77885d233..4fb1eba8917 100644 --- a/crates/table/src/table_index/uniquemap.rs +++ b/crates/table/src/table_index/uniquemap.rs @@ -4,6 +4,7 @@ use core::{ops::RangeBounds, option::IntoIter}; use spacetimedb_sats::memory_usage::MemoryUsage; use std::collections::btree_map::{BTreeMap, Entry, Range}; +// TODO(centril): rename this to `UniqueBTreeIndex`. /// A "unique map" that relates a `K` to a `RowPointer`. /// /// (This is just a `BTreeMap`) with a slightly modified interface. @@ -31,24 +32,51 @@ impl MemoryUsage for UniqueMap { } } -impl Index for UniqueMap { +// SAFETY: The implementations of all constructing +// and mutating methods uphold the invariant, +// assuming the caller requirements of the `unsafe` methods are upheld, +// that every `key -> ptr` pair only ever occurs once. +// In fact, given that this is a unique index, +// this is statically guaranteed as one `key` only has one slot for one `ptr`. +unsafe impl Index for UniqueMap { type Key = K; + // ========================================================================= + // Construction + // ========================================================================= + fn clone_structure(&self) -> Self { Self::default() } - fn insert(&mut self, key: K, val: RowPointer) -> Result<(), RowPointer> { + // ========================================================================= + // Mutation + // ========================================================================= + + unsafe fn insert(&mut self, key: K, ptr: RowPointer) -> Result<(), RowPointer> { match self.map.entry(key) { + // `key` not found in index, let's add it. Entry::Vacant(e) => { self.num_key_bytes.add_to_key_bytes::(e.key()); - e.insert(val); + e.insert(ptr); Ok(()) } + // Unique constraint violation! Entry::Occupied(e) => Err(*e.into_mut()), } } + unsafe fn merge_from(&mut self, mut src: Self, translate: impl Fn(RowPointer) -> RowPointer) { + // Merge `num_key_bytes`. + self.num_key_bytes.merge(src.num_key_bytes); + + // Translate all row pointers in `src` to fit `self`. + src.map.values_mut().for_each(|ptr| *ptr = translate(*ptr)); + + // Move over the `key -> ptr` pairs. + self.map.append(&mut src.map); + } + fn delete(&mut self, key: &K, _: RowPointer) -> bool { let ret = self.map.remove(key).is_some(); if ret { @@ -57,6 +85,16 @@ impl Index for UniqueMap { ret } + fn clear(&mut self) { + // Unfortunately, this will drop the existing allocation. + self.map.clear(); + self.num_key_bytes.reset_to_zero(); + } + + // ========================================================================= + // Querying + // ========================================================================= + fn num_keys(&self) -> usize { self.map.len() } @@ -75,14 +113,6 @@ impl Index for UniqueMap { UniqueMapPointIter { iter } } - /// Deletes all entries from the map, leaving it empty. - /// - /// Unfortunately, this will drop the existing allocation. - fn clear(&mut self) { - self.map.clear(); - self.num_key_bytes.reset_to_zero(); - } - fn can_merge(&self, other: &Self, ignore: impl Fn(&RowPointer) -> bool) -> Result<(), RowPointer> { let Some(found) = other .map