diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 5d3ff33b3..7cf867c04 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -89,6 +89,7 @@ set(ICEBERG_SOURCES update/fast_append.cc update/pending_update.cc update/set_snapshot.cc + update/snapshot_manager.cc update/snapshot_update.cc update/update_location.cc update/update_partition_spec.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index fd82a889b..7dd3409ad 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -107,6 +107,7 @@ iceberg_sources = files( 'update/fast_append.cc', 'update/pending_update.cc', 'update/set_snapshot.cc', + 'update/snapshot_manager.cc', 'update/snapshot_update.cc', 'update/update_location.cc', 'update/update_partition_spec.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index b6c26ea00..2a31eb3b1 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -32,6 +32,7 @@ #include "iceberg/table_scan.h" #include "iceberg/transaction.h" #include "iceberg/update/expire_snapshots.h" +#include "iceberg/update/snapshot_manager.h" #include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_partition_statistics.h" #include "iceberg/update/update_properties.h" @@ -222,6 +223,10 @@ Result> Table::NewUpdatePartitionStat return transaction->NewUpdatePartitionStatistics(); } +Result> Table::NewSnapshotManager() { + return SnapshotManager::Make(name().ToString(), shared_from_this()); +} + Result> StagedTable::Make( TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 1f3135dd7..423911c21 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -168,6 +168,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// \brief Create a new FastAppend to append data files and commit the changes. virtual Result> NewFastAppend(); + /// \brief Create a new SnapshotManager to manage snapshots and snapshot references. + virtual Result> NewSnapshotManager(); + protected: Table(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index ae61d819f..a8f119fa4 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -179,6 +179,7 @@ if(ICEBERG_BUILD_BUNDLE) expire_snapshots_test.cc fast_append_test.cc set_snapshot_test.cc + snapshot_manager_test.cc transaction_test.cc update_location_test.cc update_partition_spec_test.cc diff --git a/src/iceberg/test/snapshot_manager_test.cc b/src/iceberg/test/snapshot_manager_test.cc new file mode 100644 index 000000000..f9bd044c7 --- /dev/null +++ b/src/iceberg/test/snapshot_manager_test.cc @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/snapshot_manager.h" + +#include +#include + +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" +#include "iceberg/transaction.h" +#include "iceberg/update/fast_append.h" + +namespace iceberg { + +class SnapshotManagerTest : public UpdateTestBase { + protected: + // These snapshot IDs correspond to the snapshots in the TableMetadataV2Valid.json + static constexpr int64_t kOldestSnapshotId = 3051729675574597004; + static constexpr int64_t kCurrentSnapshotId = 3055729675574597004; +}; + +TEST_F(SnapshotManagerTest, CreateBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); + EXPECT_EQ(ref->snapshot_id, kCurrentSnapshotId); +} + +TEST_F(SnapshotManagerTest, CreateBranchWithoutSnapshotId) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1"); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); + EXPECT_EQ(ref->snapshot_id, kCurrentSnapshotId); +} + +TEST_F(SnapshotManagerTest, CreateBranchOnEmptyTable) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, + SnapshotManager::Make("minimal_table", minimal_table_)); + manager->CreateBranch("branch1"); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(minimal_table_ident_)); + EXPECT_FALSE( + reloaded->metadata()->refs.contains(std::string(SnapshotRef::kMainBranch))); + auto it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); +} + +TEST_F(SnapshotManagerTest, CreateBranchOnEmptyTableFailsWhenRefAlreadyExists) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, + SnapshotManager::Make("minimal_table", minimal_table_)); + manager->CreateBranch("branch1"); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manager2, + SnapshotManager::Make("minimal_table", minimal_table_)); + manager2->CreateBranch("branch1"); + auto result = manager2->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_THAT(result, HasErrorMessage("branch 'branch1' was created concurrently")); +} + +TEST_F(SnapshotManagerTest, CreateBranchFailsWhenRefAlreadyExists) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + // Try to create a branch with an existing name + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", table_)); + manager2->CreateBranch("branch1", kCurrentSnapshotId); + auto result = manager2->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_THAT(result, HasErrorMessage("branch 'branch1' was created concurrently")); +} + +TEST_F(SnapshotManagerTest, CreateTag) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateTag("tag1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("tag1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->type(), SnapshotRefType::kTag); + EXPECT_EQ(ref->snapshot_id, kCurrentSnapshotId); +} + +TEST_F(SnapshotManagerTest, CreateTagFailsWhenRefAlreadyExists) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateTag("tag1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + // Try to create a tag with an existing name + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", table_)); + manager2->CreateTag("tag1", kCurrentSnapshotId); + auto result = manager2->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_THAT(result, HasErrorMessage("tag 'tag1' was created concurrently")); +} + +TEST_F(SnapshotManagerTest, RemoveBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->RemoveBranch("branch1"); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + EXPECT_FALSE(reloaded->metadata()->refs.contains("branch1")); + } +} + +TEST_F(SnapshotManagerTest, RemovingNonExistingBranchFails) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->RemoveBranch("non-existing"); + auto result = manager->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Branch does not exist: non-existing")); +} + +TEST_F(SnapshotManagerTest, RemovingMainBranchFails) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->RemoveBranch(std::string(SnapshotRef::kMainBranch)); + auto result = manager->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Cannot remove main branch")); +} + +TEST_F(SnapshotManagerTest, RemoveTag) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateTag("tag1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->RemoveTag("tag1"); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + EXPECT_FALSE(reloaded->metadata()->refs.contains("tag1")); + } +} + +TEST_F(SnapshotManagerTest, RemovingNonExistingTagFails) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->RemoveTag("non-existing"); + auto result = manager->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Tag does not exist: non-existing")); +} + +TEST_F(SnapshotManagerTest, ReplaceBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kOldestSnapshotId); + manager->CreateBranch("branch2", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->ReplaceBranch("branch1", "branch2"); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_NE(ref, nullptr); + EXPECT_EQ(ref->snapshot_id, kCurrentSnapshotId); + } +} + +TEST_F(SnapshotManagerTest, ReplaceBranchNonExistingToBranchFails) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->ReplaceBranch("branch1", "non-existing"); + auto result = manager2->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Ref does not exist: non-existing")); +} + +TEST_F(SnapshotManagerTest, ReplaceBranchNonExistingFromBranchCreatesTheBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->ReplaceBranch("new-branch", "branch1"); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("new-branch"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); + EXPECT_EQ(ref->snapshot_id, kCurrentSnapshotId); + } +} + +TEST_F(SnapshotManagerTest, FastForwardBranchNonExistingFromBranchCreatesTheBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->FastForwardBranch("new-branch", "branch1"); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("new-branch"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); + EXPECT_EQ(ref->snapshot_id, kCurrentSnapshotId); + } +} + +TEST_F(SnapshotManagerTest, FastForwardBranchNonExistingToFails) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->FastForwardBranch("branch1", "non-existing"); + auto result = manager2->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Ref does not exist: non-existing")); +} + +TEST_F(SnapshotManagerTest, ReplaceTag) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateTag("tag1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->ReplaceTag("tag1", kCurrentSnapshotId); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("tag1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_NE(ref, nullptr); + EXPECT_EQ(ref->snapshot_id, kCurrentSnapshotId); + } +} + +TEST_F(SnapshotManagerTest, UpdatingBranchRetention) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->SetMinSnapshotsToKeep("branch1", 10); + manager2->SetMaxSnapshotAgeMs("branch1", 20000); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); + const auto& branch = std::get(ref->retention); + EXPECT_EQ(branch.max_snapshot_age_ms, 20000); + EXPECT_EQ(branch.min_snapshots_to_keep, 10); + } +} + +TEST_F(SnapshotManagerTest, SettingBranchRetentionOnTagFails) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateTag("tag1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->SetMinSnapshotsToKeep("tag1", 10); + auto result = manager2->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Ref 'tag1' is a tag not a branch")); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->SetMaxSnapshotAgeMs("tag1", 10); + auto result = manager2->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Ref 'tag1' is a tag not a branch")); + } +} + +TEST_F(SnapshotManagerTest, UpdatingBranchMaxRefAge) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->SetMaxRefAgeMs("branch1", 10000); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->max_ref_age_ms(), 10000); + } +} + +TEST_F(SnapshotManagerTest, UpdatingTagMaxRefAge) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateTag("tag1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->SetMaxRefAgeMs("tag1", 10000); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("tag1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->max_ref_age_ms(), 10000); + } +} + +TEST_F(SnapshotManagerTest, RenameBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, SnapshotManager::Make("test_table", reloaded)); + manager2->RenameBranch("branch1", "branch2"); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it1 = reloaded->metadata()->refs.find("branch1"); + EXPECT_EQ(it1, reloaded->metadata()->refs.end()); + + auto it2 = reloaded->metadata()->refs.find("branch2"); + EXPECT_NE(it2, reloaded->metadata()->refs.end()); + auto ref2 = it2->second; + EXPECT_EQ(ref2->snapshot_id, kCurrentSnapshotId); + } +} + +TEST_F(SnapshotManagerTest, FailRenamingMainBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->RenameBranch(std::string(SnapshotRef::kMainBranch), "some-branch"); + auto result = manager->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Cannot rename main branch")); +} + +TEST_F(SnapshotManagerTest, RenamingNonExistingBranchFails) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->RenameBranch("some-missing-branch", "some-branch"); + auto result = manager->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Branch does not exist: some-missing-branch")); +} + +TEST_F(SnapshotManagerTest, RollbackTo) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->RollbackTo(kOldestSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId); +} + +TEST_F(SnapshotManagerTest, SetCurrentSnapshot) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->SetCurrentSnapshot(kOldestSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId); +} + +TEST_F(SnapshotManagerTest, CreateReferencesAndRollback) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make("test_table", table_)); + manager->CreateBranch("branch1", kCurrentSnapshotId); + manager->CreateTag("tag1", kCurrentSnapshotId); + manager->RollbackTo(kOldestSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId); + + auto branch_it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(branch_it, reloaded->metadata()->refs.end()); + EXPECT_EQ(branch_it->second->snapshot_id, kCurrentSnapshotId); + + auto tag_it = reloaded->metadata()->refs.find("tag1"); + EXPECT_NE(tag_it, reloaded->metadata()->refs.end()); + EXPECT_EQ(tag_it->second->snapshot_id, kCurrentSnapshotId); +} + +TEST_F(SnapshotManagerTest, SnapshotManagerThroughTransaction) { + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make(txn)); + + manager->RollbackTo(kOldestSnapshotId); + EXPECT_THAT(manager->Commit(), IsOk()); + EXPECT_THAT(txn->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId); +} + +} // namespace iceberg diff --git a/src/iceberg/test/update_test_base.h b/src/iceberg/test/update_test_base.h index c14cb76b9..8f6e08c25 100644 --- a/src/iceberg/test/update_test_base.h +++ b/src/iceberg/test/update_test_base.h @@ -43,6 +43,7 @@ class UpdateTestBase : public ::testing::Test { void SetUp() override { InitializeFileIO(); RegisterTableFromResource("TableMetadataV2Valid.json"); + RegisterMinimalTableFromResource("TableMetadataV2ValidMinimal.json"); } /// \brief Initialize file IO and create necessary directories. @@ -56,6 +57,7 @@ class UpdateTestBase : public ::testing::Test { static_cast(*file_io_).fs()); ASSERT_TRUE(arrow_fs != nullptr); ASSERT_TRUE(arrow_fs->CreateDir(table_location_ + "/metadata").ok()); + ASSERT_TRUE(arrow_fs->CreateDir(minimal_table_location_ + "/metadata").ok()); } /// \brief Register a table from a metadata resource file. @@ -78,11 +80,35 @@ class UpdateTestBase : public ::testing::Test { catalog_->RegisterTable(table_ident_, metadata_location)); } + /// \brief Register a minimal table from a metadata resource file. + /// + /// \param resource_name The name of the metadata resource file + void RegisterMinimalTableFromResource(const std::string& resource_name) { + // Drop existing table if it exists + std::ignore = catalog_->DropTable(minimal_table_ident_, /*purge=*/false); + + // Write table metadata to the table location. + auto metadata_location = + std::format("{}/metadata/00001-{}.metadata.json", minimal_table_location_, + Uuid::GenerateV7().ToString()); + ICEBERG_UNWRAP_OR_FAIL(auto metadata, ReadTableMetadataFromResource(resource_name)); + metadata->location = minimal_table_location_; + ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location, *metadata), + IsOk()); + + // Register the table in the catalog. + ICEBERG_UNWRAP_OR_FAIL( + minimal_table_, catalog_->RegisterTable(minimal_table_ident_, metadata_location)); + } + const TableIdentifier table_ident_{.name = "test_table"}; const std::string table_location_{"/warehouse/test_table"}; + const TableIdentifier minimal_table_ident_{.name = "minimal_table"}; + const std::string minimal_table_location_{"/warehouse/minimal_table"}; std::shared_ptr file_io_; std::shared_ptr catalog_; std::shared_ptr
table_; + std::shared_ptr
minimal_table_; }; } // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index b24aa0da3..981823d04 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -36,6 +36,7 @@ #include "iceberg/update/fast_append.h" #include "iceberg/update/pending_update.h" #include "iceberg/update/set_snapshot.h" +#include "iceberg/update/snapshot_manager.h" #include "iceberg/update/snapshot_update.h" #include "iceberg/update/update_location.h" #include "iceberg/update/update_partition_spec.h" @@ -428,4 +429,11 @@ Transaction::NewUpdateSnapshotReference() { return update_ref; } +Result> Transaction::NewSnapshotManager() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr snapshot_manager, + SnapshotManager::Make(shared_from_this())); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(snapshot_manager)); + return snapshot_manager; +} + } // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index e975be7ff..e694a48c9 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -105,6 +105,9 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewUpdateSnapshotReference(); + /// \brief Create a new SnapshotManager to manage snapshots. + Result> NewSnapshotManager(); + private: Transaction(std::shared_ptr
table, Kind kind, bool auto_commit, std::unique_ptr metadata_builder); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index e97de0ac5..a712c2ede 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -184,6 +184,7 @@ class TableProperties; /// \brief Table update. class TableMetadataBuilder; class TableUpdate; +class SnapshotManager; class TableRequirement; class TableUpdateContext; class Transaction; diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 102471c04..6acb007a1 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -21,6 +21,7 @@ install_headers( 'fast_append.h', 'pending_update.h', 'set_snapshot.h', + 'snapshot_manager.h', 'snapshot_update.h', 'update_location.h', 'update_partition_spec.h', diff --git a/src/iceberg/update/snapshot_manager.cc b/src/iceberg/update/snapshot_manager.cc new file mode 100644 index 000000000..5afedb455 --- /dev/null +++ b/src/iceberg/update/snapshot_manager.cc @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/snapshot_manager.h" + +#include +#include + +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/update/fast_append.h" +#include "iceberg/update/set_snapshot.h" +#include "iceberg/update/update_snapshot_reference.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> SnapshotManager::Make( + const std::string& table_name, std::shared_ptr
table) { + if (table == nullptr) { + return InvalidArgument("Table cannot be null"); + } + if (table->metadata() == nullptr) { + return InvalidArgument("Cannot manage snapshots: table {} does not exist", + table_name); + } + // Create a transaction first + ICEBERG_ASSIGN_OR_RAISE(auto transaction, + Transaction::Make(table, Transaction::Kind::kUpdate, + /*auto_commit=*/false)); + auto manager = std::shared_ptr( + new SnapshotManager(std::move(transaction), /*is_external=*/false)); + return manager; +} + +Result> SnapshotManager::Make( + std::shared_ptr transaction) { + if (transaction == nullptr) { + return InvalidArgument("Invalid input transaction: null"); + } + return std::shared_ptr( + new SnapshotManager(std::move(transaction), /*is_external=*/true)); +} + +SnapshotManager::SnapshotManager(std::shared_ptr transaction, + bool is_external) + : PendingUpdate(transaction), is_external_transaction_(is_external) {} + +SnapshotManager::~SnapshotManager() = default; + +SnapshotManager& SnapshotManager::Cherrypick(int64_t snapshot_id) { + ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist()); + // TODO(anyone): Implement cherrypick operation + ICEBERG_BUILDER_CHECK(false, "Cherrypick operation not yet implemented"); + return *this; +} + +SnapshotManager& SnapshotManager::SetCurrentSnapshot(int64_t snapshot_id) { + ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist()); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot, transaction_->NewSetSnapshot()); + set_snapshot->SetCurrentSnapshot(snapshot_id); + ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit()); + return *this; +} + +SnapshotManager& SnapshotManager::RollbackToTime(TimePointMs timestamp_ms) { + ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist()); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot, transaction_->NewSetSnapshot()); + set_snapshot->RollbackToTime(UnixMsFromTimePointMs(timestamp_ms)); + ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit()); + return *this; +} + +SnapshotManager& SnapshotManager::RollbackTo(int64_t snapshot_id) { + ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist()); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot, transaction_->NewSetSnapshot()); + set_snapshot->RollbackTo(snapshot_id); + ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit()); + return *this; +} + +SnapshotManager& SnapshotManager::CreateBranch(const std::string& name) { + if (base().current_snapshot_id != kInvalidSnapshotId) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_snapshot, base().Snapshot()); + if (current_snapshot != nullptr) { + return CreateBranch(name, current_snapshot->snapshot_id); + } + } + const auto& current_refs = base().refs; + ICEBERG_BUILDER_CHECK(!base().refs.contains(name), "Ref {} already exists", name); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto fast_append, transaction_->NewFastAppend()); + ICEBERG_BUILDER_RETURN_IF_ERROR(fast_append->ToBranch(name).Commit()); + return *this; +} + +SnapshotManager& SnapshotManager::CreateBranch(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->CreateBranch(name, snapshot_id); + return *this; +} + +SnapshotManager& SnapshotManager::CreateTag(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->CreateTag(name, snapshot_id); + return *this; +} + +SnapshotManager& SnapshotManager::RemoveBranch(const std::string& name) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->RemoveBranch(name); + return *this; +} + +SnapshotManager& SnapshotManager::RemoveTag(const std::string& name) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->RemoveTag(name); + return *this; +} + +SnapshotManager& SnapshotManager::ReplaceTag(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->ReplaceTag(name, snapshot_id); + return *this; +} + +SnapshotManager& SnapshotManager::ReplaceBranch(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->ReplaceBranch(name, snapshot_id); + return *this; +} + +SnapshotManager& SnapshotManager::ReplaceBranch(const std::string& from, + const std::string& to) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->ReplaceBranch(from, to); + return *this; +} + +SnapshotManager& SnapshotManager::FastForwardBranch(const std::string& from, + const std::string& to) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->FastForward(from, to); + return *this; +} + +SnapshotManager& SnapshotManager::RenameBranch(const std::string& name, + const std::string& new_name) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->RenameBranch(name, new_name); + return *this; +} + +SnapshotManager& SnapshotManager::SetMinSnapshotsToKeep(const std::string& branch_name, + int32_t min_snapshots_to_keep) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->SetMinSnapshotsToKeep(branch_name, min_snapshots_to_keep); + return *this; +} + +SnapshotManager& SnapshotManager::SetMaxSnapshotAgeMs(const std::string& branch_name, + int64_t max_snapshot_age_ms) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->SetMaxSnapshotAgeMs(branch_name, max_snapshot_age_ms); + return *this; +} + +SnapshotManager& SnapshotManager::SetMaxRefAgeMs(const std::string& name, + int64_t max_ref_age_ms) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->SetMaxRefAgeMs(name, max_ref_age_ms); + return *this; +} + +Result> SnapshotManager::Apply() { return base().Snapshot(); } + +Status SnapshotManager::Commit() { + ICEBERG_RETURN_UNEXPECTED(CommitIfRefUpdatesExist()); + if (!is_external_transaction_) { + ICEBERG_RETURN_UNEXPECTED(transaction_->Commit()); + } + return {}; +} + +Result> +SnapshotManager::UpdateSnapshotReferencesOperation() { + if (update_snapshot_references_operation_ == nullptr) { + ICEBERG_ASSIGN_OR_RAISE(update_snapshot_references_operation_, + transaction_->NewUpdateSnapshotReference()); + } + return update_snapshot_references_operation_; +} + +Status SnapshotManager::CommitIfRefUpdatesExist() { + if (update_snapshot_references_operation_ != nullptr) { + ICEBERG_RETURN_UNEXPECTED(update_snapshot_references_operation_->Commit()); + update_snapshot_references_operation_ = nullptr; + } + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/update/snapshot_manager.h b/src/iceberg/update/snapshot_manager.h new file mode 100644 index 000000000..6315ea701 --- /dev/null +++ b/src/iceberg/update/snapshot_manager.h @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" +#include "iceberg/util/timepoint.h" + +namespace iceberg { + +/// \brief API for managing snapshots and snapshot references. +/// +/// Allows rolling table data back to a state at an older snapshot, cherry-picking +/// snapshots, and managing branches and tags. +class ICEBERG_EXPORT SnapshotManager : public PendingUpdate { + public: + /// \brief Create a SnapshotManager for a table. + /// + /// \param table_name The name of the table + /// \param table The table to manage snapshots for + /// \return A new SnapshotManager instance, or an error if the table doesn't exist + static Result> Make(const std::string& table_name, + std::shared_ptr
table); + + /// \brief Create a SnapshotManager from an existing transaction. + /// + /// \param transaction The transaction to use + /// \return A new SnapshotManager instance + static Result> Make( + std::shared_ptr transaction); + + ~SnapshotManager() override; + + // TODO(xxx): is this correct? + Kind kind() const final { return Kind::kUpdateSnapshotReference; } + + /// \brief Apply supported changes in given snapshot and create a new snapshot which + /// will be set as the current snapshot on commit. + /// + /// \param snapshot_id A snapshot ID whose changes to apply + /// \return Reference to this for method chaining + SnapshotManager& Cherrypick(int64_t snapshot_id); + + /// \brief Roll this table's data back to a specific Snapshot identified by id. + /// + /// \param snapshot_id Long id of the snapshot to roll back table data to + /// \return Reference to this for method chaining + SnapshotManager& SetCurrentSnapshot(int64_t snapshot_id); + + /// \brief Roll this table's data back to the last Snapshot before the given timestamp. + /// + /// \param timestamp_ms A timestamp in milliseconds + /// \return Reference to this for method chaining + SnapshotManager& RollbackToTime(TimePointMs timestamp_ms); + + /// \brief Rollback table's state to a specific Snapshot identified by id. + /// + /// \param snapshot_id Long id of snapshot id to roll back table to. Must be an ancestor + /// of the current snapshot + /// \return Reference to this for method chaining + SnapshotManager& RollbackTo(int64_t snapshot_id); + + /// \brief Create a new branch. The branch will point to current snapshot if the current + /// snapshot is not NULL. Otherwise, the branch will point to a newly created empty + /// snapshot. + /// + /// \param name Branch name + /// \return Reference to this for method chaining + SnapshotManager& CreateBranch(const std::string& name); + + /// \brief Create a new branch pointing to the given snapshot id. + /// + /// \param name Branch name + /// \param snapshot_id ID of the snapshot which will be the head of the branch + /// \return Reference to this for method chaining + SnapshotManager& CreateBranch(const std::string& name, int64_t snapshot_id); + + /// \brief Create a new tag pointing to the given snapshot id. + /// + /// \param name Tag name + /// \param snapshot_id Snapshot ID for the head of the new tag + /// \return Reference to this for method chaining + SnapshotManager& CreateTag(const std::string& name, int64_t snapshot_id); + + /// \brief Remove a branch by name. + /// + /// \param name Branch name + /// \return Reference to this for method chaining + SnapshotManager& RemoveBranch(const std::string& name); + + /// \brief Remove the tag with the given name. + /// + /// \param name Tag name + /// \return Reference to this for method chaining + SnapshotManager& RemoveTag(const std::string& name); + + /// \brief Replaces the tag with the given name to point to the specified snapshot. + /// + /// \param name Tag to replace + /// \param snapshot_id New snapshot id for the given tag + /// \return Reference to this for method chaining + SnapshotManager& ReplaceTag(const std::string& name, int64_t snapshot_id); + + /// \brief Replaces the branch with the given name to point to the specified snapshot. + /// + /// \param name Branch to replace + /// \param snapshot_id New snapshot id for the given branch + /// \return Reference to this for method chaining + SnapshotManager& ReplaceBranch(const std::string& name, int64_t snapshot_id); + + /// \brief Replaces the from branch to point to the to snapshot. The to will remain + /// unchanged, and from branch will retain its retention properties. If the from branch + /// does not exist, it will be created with default retention properties. + /// + /// \param from Branch to replace + /// \param to The branch from should be replaced with + /// \return Reference to this for method chaining + SnapshotManager& ReplaceBranch(const std::string& from, const std::string& to); + + /// \brief Performs a fast-forward of from up to the to snapshot if from is an ancestor + /// of to. The to will remain unchanged, and from will retain its retention properties. + /// If the from branch does not exist, it will be created with default retention + /// properties. + /// + /// \param from Branch to fast-forward + /// \param to Ref for the from branch to be fast forwarded to + /// \return Reference to this for method chaining + SnapshotManager& FastForwardBranch(const std::string& from, const std::string& to); + + /// \brief Rename a branch. + /// + /// \param name Name of branch to rename + /// \param new_name The desired new name of the branch + /// \return Reference to this for method chaining + SnapshotManager& RenameBranch(const std::string& name, const std::string& new_name); + + /// \brief Updates the minimum number of snapshots to keep for a branch. + /// + /// \param branch_name Branch name + /// \param min_snapshots_to_keep Minimum number of snapshots to retain on the branch + /// \return Reference to this for method chaining + SnapshotManager& SetMinSnapshotsToKeep(const std::string& branch_name, + int32_t min_snapshots_to_keep); + + /// \brief Updates the max snapshot age for a branch. + /// + /// \param branch_name Branch name + /// \param max_snapshot_age_ms Maximum snapshot age in milliseconds to retain on branch + /// \return Reference to this for method chaining + SnapshotManager& SetMaxSnapshotAgeMs(const std::string& branch_name, + int64_t max_snapshot_age_ms); + + /// \brief Updates the retention policy for a reference. + /// + /// \param name Reference name + /// \param max_ref_age_ms Retention age in milliseconds of the tag reference itself + /// \return Reference to this for method chaining + SnapshotManager& SetMaxRefAgeMs(const std::string& name, int64_t max_ref_age_ms); + + /// \brief Apply the pending changes and return the current snapshot. + /// + /// \return The current snapshot after applying changes, or an error + Result> Apply(); + + /// \brief Commit all pending changes. + /// + /// \return Status indicating success or failure + Status Commit() override; + + private: + /// \brief Constructor for creating a SnapshotManager with a transaction. + /// + /// \param transaction The transaction to use + /// \param is_external Whether this is an external transaction (true) or created + /// internally (false) + SnapshotManager(std::shared_ptr transaction, bool is_external); + + /// \brief Get or create the UpdateSnapshotReference operation. + Result> UpdateSnapshotReferencesOperation(); + + /// \brief Commit any pending reference updates if they exist. + Status CommitIfRefUpdatesExist(); + + bool is_external_transaction_; + std::shared_ptr update_snapshot_references_operation_; +}; + +} // namespace iceberg diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index 12c3b19dc..b26bc4c08 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -76,6 +76,18 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { return self; } + /// \brief Perform operations on a particular branch + /// + /// \param branch Which is name of SnapshotRef of type branch + /// \return Reference to this for method chaining + auto& ToBranch(this auto& self, const std::string& branch) { + auto status = self.SetTargetBranch(branch); + if (!status.has_value()) { + return self.AddError(status.error()); + } + return self; + } + /// \brief Set a summary property. /// /// \param property The property name