From dcab2bf27c2db2834bb03b73d88a885ccccaf6c9 Mon Sep 17 00:00:00 2001 From: jx2lee Date: Wed, 25 Feb 2026 05:45:09 +0900 Subject: [PATCH 1/3] implement commit table --- pyiceberg/catalog/bigquery_metastore.py | 119 +++++++++++++++++++++++- 1 file changed, 115 insertions(+), 4 deletions(-) diff --git a/pyiceberg/catalog/bigquery_metastore.py b/pyiceberg/catalog/bigquery_metastore.py index b762c1047c..f2b35872ab 100644 --- a/pyiceberg/catalog/bigquery_metastore.py +++ b/pyiceberg/catalog/bigquery_metastore.py @@ -26,7 +26,14 @@ from google.oauth2 import service_account from pyiceberg.catalog import WAREHOUSE_LOCATION, MetastoreCatalog, PropertiesUpdateSummary -from pyiceberg.exceptions import NamespaceAlreadyExistsError, NoSuchNamespaceError, NoSuchTableError, TableAlreadyExistsError +from pyiceberg.exceptions import ( + CommitFailedException, + CommitStateUnknownException, + NamespaceAlreadyExistsError, + NoSuchNamespaceError, + NoSuchTableError, + TableAlreadyExistsError, +) from pyiceberg.io import load_file_io from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema @@ -229,7 +236,88 @@ def drop_table(self, identifier: str | Identifier) -> None: def commit_table( self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...] ) -> CommitTableResponse: - raise NotImplementedError + table_identifier = table.name() + dataset_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError) + table_ref = TableReference( + dataset_ref=DatasetReference(project=self.project_id, dataset_id=dataset_name), + table_id=table_name, + ) + + current_bq_table: BQTable | None + current_table: Table | None + try: + current_bq_table = self.client.get_table(table_ref) + except NotFound: + current_bq_table = None + current_table = None + else: + current_table = self._convert_bigquery_table_to_iceberg_table(table_identifier, current_bq_table) + + updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates) + if current_table and updated_staged_table.metadata == current_table.metadata: + return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) + + self._write_metadata( + metadata=updated_staged_table.metadata, + io=updated_staged_table.io, + metadata_path=updated_staged_table.metadata_location, + ) + + commit_error: Exception | None = None + try: + if current_bq_table and current_table: + current_bq_table.external_catalog_table_options = self._create_external_catalog_table_options( + updated_staged_table.metadata.location, + self._create_table_parameters( + metadata_file_location=updated_staged_table.metadata_location, + table_metadata=updated_staged_table.metadata, + previous_metadata_location=current_table.metadata_location, + ), + ) + self.client.update_table(current_bq_table, ["external_catalog_table_options"]) + else: + self.client.create_table( + self._make_new_table( + updated_staged_table.metadata, + updated_staged_table.metadata_location, + table_ref, + ) + ) + except NotFound as e: + commit_error = ( + CommitFailedException(f"Table does not exist: {dataset_name}.{table_name}") + if current_table + else NoSuchNamespaceError(f"Namespace does not exist: {dataset_name}") + ) + commit_error.__cause__ = e + except Conflict as e: + commit_error = ( + CommitFailedException(f"Table has been updated by another process: {dataset_name}.{table_name}") + if current_table + else TableAlreadyExistsError(f"Table {table_name} already exists") + ) + commit_error.__cause__ = e + except Exception as e: + commit_error = e + finally: + if commit_error: + commit_status = self._check_bigquery_commit_status(table_ref, updated_staged_table.metadata_location) + if commit_status == "SUCCESS": + commit_error = None + elif commit_status == "UNKNOWN": + raise CommitStateUnknownException( + f"Commit state unknown for table {dataset_name}.{table_name}" + ) from commit_error + + if commit_error: + raise commit_error + + if current_table: + self._delete_old_metadata(updated_staged_table.io, current_table.metadata, updated_staged_table.metadata) + + return CommitTableResponse( + metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location + ) def rename_table(self, from_identifier: str | Identifier, to_identifier: str | Identifier) -> Table: raise NotImplementedError @@ -381,11 +469,20 @@ def _convert_bigquery_table_to_iceberg_table(self, identifier: str | Identifier, catalog=self, ) - def _create_table_parameters(self, metadata_file_location: str, table_metadata: TableMetadata) -> dict[str, Any]: - parameters: dict[str, Any] = table_metadata.properties + def _create_table_parameters( + self, + metadata_file_location: str, + table_metadata: TableMetadata, + previous_metadata_location: str | None = None, + ) -> dict[str, Any]: + parameters: dict[str, Any] = dict(table_metadata.properties) if table_metadata.table_uuid: parameters["uuid"] = str(table_metadata.table_uuid) parameters[METADATA_LOCATION_PROP] = metadata_file_location + if previous_metadata_location: + parameters[PREVIOUS_METADATA_LOCATION_PROP] = previous_metadata_location + else: + parameters.pop(PREVIOUS_METADATA_LOCATION_PROP, None) parameters[TABLE_TYPE_PROP] = ICEBERG_TABLE_TYPE_VALUE parameters["EXTERNAL"] = True @@ -405,6 +502,20 @@ def _create_table_parameters(self, metadata_file_location: str, table_metadata: return parameters + def _check_bigquery_commit_status(self, table_ref: TableReference, new_metadata_location: str) -> str: + try: + bq_table = self.client.get_table(table_ref) + parameters = ( + bq_table.external_catalog_table_options.parameters + if bq_table.external_catalog_table_options and bq_table.external_catalog_table_options.parameters + else {} + ) + return "SUCCESS" if parameters.get(METADATA_LOCATION_PROP) == new_metadata_location else "FAILURE" + except NotFound: + return "FAILURE" + except Exception: + return "UNKNOWN" + def _default_storage_location(self, location: str | None, dataset_ref: DatasetReference) -> str | None: if location: return location From 9596da48b3b1aaec4dea54f4c94259d8c067a50a Mon Sep 17 00:00:00 2001 From: jx2lee Date: Wed, 25 Feb 2026 05:45:17 +0900 Subject: [PATCH 2/3] testcode --- tests/catalog/test_bigquery_metastore.py | 101 ++++++++++++++++++++++- 1 file changed, 100 insertions(+), 1 deletion(-) diff --git a/tests/catalog/test_bigquery_metastore.py b/tests/catalog/test_bigquery_metastore.py index c8c7584262..6e60ec800a 100644 --- a/tests/catalog/test_bigquery_metastore.py +++ b/tests/catalog/test_bigquery_metastore.py @@ -17,13 +17,14 @@ import os from unittest.mock import MagicMock +import pytest from google.api_core.exceptions import NotFound from google.cloud.bigquery import Dataset, DatasetReference, Table, TableReference from google.cloud.bigquery.external_config import ExternalCatalogDatasetOptions, ExternalCatalogTableOptions from pytest_mock import MockFixture from pyiceberg.catalog.bigquery_metastore import ICEBERG_TABLE_TYPE_VALUE, TABLE_TYPE_PROP, BigQueryMetastoreCatalog -from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.exceptions import CommitStateUnknownException, NoSuchTableError from pyiceberg.schema import Schema @@ -178,3 +179,101 @@ def test_list_namespaces(mocker: MockFixture) -> None: assert ("dataset1",) in namespaces assert ("dataset2",) in namespaces client_mock.list_datasets.assert_called_once() + + +def test_commit_table_create_path_uses_create_table(mocker: MockFixture) -> None: + client_mock = MagicMock() + client_mock.get_table.side_effect = NotFound("missing") + mocker.patch("pyiceberg.catalog.bigquery_metastore.Client", return_value=client_mock) + mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"}) + + catalog = BigQueryMetastoreCatalog("test_catalog", **{"gcp.bigquery.project-id": "my-project"}) + table = MagicMock() + table.name.return_value = ("my-dataset", "my-table") + + staged = MagicMock() + staged.metadata = MagicMock() + staged.metadata_location = "gs://bucket/db/table/metadata/00001.metadata.json" + staged.io = MagicMock() + mocker.patch.object(catalog, "_update_and_stage_table", return_value=staged) + mocker.patch.object(catalog, "_write_metadata") + mocker.patch.object(catalog, "_make_new_table", return_value=MagicMock()) + commit_response = MagicMock() + commit_response.metadata_location = staged.metadata_location + mocker.patch("pyiceberg.catalog.bigquery_metastore.CommitTableResponse", return_value=commit_response) + + response = catalog.commit_table(table, requirements=(), updates=()) + + client_mock.create_table.assert_called_once() + client_mock.update_table.assert_not_called() + assert response.metadata_location == staged.metadata_location + + +def test_commit_table_update_path_uses_update_table(mocker: MockFixture) -> None: + client_mock = MagicMock() + current_bq_table = MagicMock() + client_mock.get_table.return_value = current_bq_table + mocker.patch("pyiceberg.catalog.bigquery_metastore.Client", return_value=client_mock) + mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"}) + + catalog = BigQueryMetastoreCatalog("test_catalog", **{"gcp.bigquery.project-id": "my-project"}) + table = MagicMock() + table.name.return_value = ("my-dataset", "my-table") + + current_table = MagicMock() + current_table.metadata = MagicMock() + current_table.metadata_location = "gs://bucket/db/table/metadata/00000.metadata.json" + mocker.patch.object(catalog, "_convert_bigquery_table_to_iceberg_table", return_value=current_table) + + staged = MagicMock() + staged.metadata = MagicMock() + staged.metadata.location = "gs://bucket/db/table" + staged.metadata_location = "gs://bucket/db/table/metadata/00001.metadata.json" + staged.io = MagicMock() + mocker.patch.object(catalog, "_update_and_stage_table", return_value=staged) + mocker.patch.object(catalog, "_write_metadata") + mocker.patch.object(catalog, "_create_table_parameters", return_value={"metadata_location": staged.metadata_location}) + mocker.patch.object(catalog, "_create_external_catalog_table_options", return_value=MagicMock()) + delete_old_metadata = mocker.patch.object(catalog, "_delete_old_metadata") + commit_response = MagicMock() + commit_response.metadata_location = staged.metadata_location + mocker.patch("pyiceberg.catalog.bigquery_metastore.CommitTableResponse", return_value=commit_response) + + response = catalog.commit_table(table, requirements=(), updates=()) + + client_mock.update_table.assert_called_once_with(current_bq_table, ["external_catalog_table_options"]) + client_mock.create_table.assert_not_called() + delete_old_metadata.assert_called_once_with(staged.io, current_table.metadata, staged.metadata) + assert response.metadata_location == staged.metadata_location + + +def test_commit_table_raises_unknown_when_commit_status_is_unknown(mocker: MockFixture) -> None: + client_mock = MagicMock() + current_bq_table = MagicMock() + client_mock.get_table.return_value = current_bq_table + client_mock.update_table.side_effect = RuntimeError("boom") + mocker.patch("pyiceberg.catalog.bigquery_metastore.Client", return_value=client_mock) + mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"}) + + catalog = BigQueryMetastoreCatalog("test_catalog", **{"gcp.bigquery.project-id": "my-project"}) + table = MagicMock() + table.name.return_value = ("my-dataset", "my-table") + + current_table = MagicMock() + current_table.metadata = MagicMock() + current_table.metadata_location = "gs://bucket/db/table/metadata/00000.metadata.json" + mocker.patch.object(catalog, "_convert_bigquery_table_to_iceberg_table", return_value=current_table) + + staged = MagicMock() + staged.metadata = MagicMock() + staged.metadata.location = "gs://bucket/db/table" + staged.metadata_location = "gs://bucket/db/table/metadata/00001.metadata.json" + staged.io = MagicMock() + mocker.patch.object(catalog, "_update_and_stage_table", return_value=staged) + mocker.patch.object(catalog, "_write_metadata") + mocker.patch.object(catalog, "_create_table_parameters", return_value={"metadata_location": staged.metadata_location}) + mocker.patch.object(catalog, "_create_external_catalog_table_options", return_value=MagicMock()) + mocker.patch.object(catalog, "_check_bigquery_commit_status", return_value="UNKNOWN") + + with pytest.raises(CommitStateUnknownException): + catalog.commit_table(table, requirements=(), updates=()) From bcdbfa975ae661f174a56ce743770013b035061d Mon Sep 17 00:00:00 2001 From: jx2lee Date: Wed, 25 Feb 2026 22:51:24 +0900 Subject: [PATCH 3/3] check history when other commit appended --- pyiceberg/catalog/bigquery_metastore.py | 17 +++++++++++++++- tests/catalog/test_bigquery_metastore.py | 26 ++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/pyiceberg/catalog/bigquery_metastore.py b/pyiceberg/catalog/bigquery_metastore.py index f2b35872ab..d02ecab440 100644 --- a/pyiceberg/catalog/bigquery_metastore.py +++ b/pyiceberg/catalog/bigquery_metastore.py @@ -510,7 +510,22 @@ def _check_bigquery_commit_status(self, table_ref: TableReference, new_metadata_ if bq_table.external_catalog_table_options and bq_table.external_catalog_table_options.parameters else {} ) - return "SUCCESS" if parameters.get(METADATA_LOCATION_PROP) == new_metadata_location else "FAILURE" + current_metadata_location = parameters.get(METADATA_LOCATION_PROP) + if current_metadata_location == new_metadata_location: + return "SUCCESS" + + if not current_metadata_location: + return "FAILURE" + + io = self._load_file_io(location=current_metadata_location) + current_metadata = FromInputFile.table_metadata(io.new_input(current_metadata_location)) + + previous_metadata_locations = {log.metadata_file for log in current_metadata.metadata_log} + previous_metadata_location = parameters.get(PREVIOUS_METADATA_LOCATION_PROP) + if previous_metadata_location: + previous_metadata_locations.add(previous_metadata_location) + + return "SUCCESS" if new_metadata_location in previous_metadata_locations else "FAILURE" except NotFound: return "FAILURE" except Exception: diff --git a/tests/catalog/test_bigquery_metastore.py b/tests/catalog/test_bigquery_metastore.py index 6e60ec800a..f278566b4f 100644 --- a/tests/catalog/test_bigquery_metastore.py +++ b/tests/catalog/test_bigquery_metastore.py @@ -277,3 +277,29 @@ def test_commit_table_raises_unknown_when_commit_status_is_unknown(mocker: MockF with pytest.raises(CommitStateUnknownException): catalog.commit_table(table, requirements=(), updates=()) + + +def test_check_bigquery_commit_status_returns_success_when_metadata_in_history(mocker: MockFixture) -> None: + client_mock = MagicMock() + bq_table = MagicMock() + bq_table.external_catalog_table_options = MagicMock( + parameters={"metadata_location": "gs://bucket/db/table/metadata/00002.metadata.json"} + ) + client_mock.get_table.return_value = bq_table + mocker.patch("pyiceberg.catalog.bigquery_metastore.Client", return_value=client_mock) + mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"}) + + catalog = BigQueryMetastoreCatalog("test_catalog", **{"gcp.bigquery.project-id": "my-project"}) + io_mock = MagicMock() + catalog._load_file_io = MagicMock(return_value=io_mock) # type: ignore[method-assign] + + current_metadata = MagicMock() + current_metadata.metadata_log = [MagicMock(metadata_file="gs://bucket/db/table/metadata/00001.metadata.json")] + mocker.patch("pyiceberg.catalog.bigquery_metastore.FromInputFile.table_metadata", return_value=current_metadata) + + status = catalog._check_bigquery_commit_status( + TableReference(dataset_ref=DatasetReference(project="my-project", dataset_id="my-dataset"), table_id="my-table"), + "gs://bucket/db/table/metadata/00001.metadata.json", + ) + + assert status == "SUCCESS"