Skip to content

cdc: implement native Iceberg sink for changefeeds#4100

Open
BornChanger wants to merge 17 commits intopingcap:masterfrom
BornChanger:ticdc-sync-iceberg
Open

cdc: implement native Iceberg sink for changefeeds#4100
BornChanger wants to merge 17 commits intopingcap:masterfrom
BornChanger:ticdc-sync-iceberg

Conversation

@BornChanger
Copy link

@BornChanger BornChanger commented Jan 30, 2026

What problem does this PR solve?

TiCDC currently lacks a native sink for Apache Iceberg. This PR adds an iceberg:// sink so users can continuously replicate TiDB changes into query-ready Iceberg tables (Spark-first), using AWS Glue + S3 in production or a local Hadoop catalog + file warehouse for dev/testing.

Issue Number: close #4099

What is changed and how it works?

  • Add a new sink type: iceberg:// (wired into sink selection + changefeed config).
    • Catalog/warehouse support:
      • catalog=glue with warehouse=s3://... (Glue is auto-selected when warehouse is S3 and catalog not specified).
      • catalog=hadoop with warehouse=file://... (local/dev).
      • catalog=rest reserved (not implemented).
    • Commit model:
      • Buffer row changes per table and commit snapshots periodically (commit-interval, default 30s) using TiCDC resolved-ts as the correctness barrier.
      • Commit writes Parquet data/delete files, then atomically updates Iceberg metadata (per-table).
      • Commit watermark recorded in snapshot summary (tidb.committed_resolved_ts) to support restart-safe/idempotent progress.
    • Two table modes:
      • mode=append (default): append-only changelog table; default partitioning days(_tidb_commit_time) when emit-metadata-columns=true.
      • mode=upsert: “latest row” semantics using Iceberg v2 equality deletes (merge-on-read).
        • Requires PK / NOT NULL unique-key (“handle key”).
        • Partitioning must be none or derived only from handle-key columns (e.g. bucket(pk,16)) to keep equality deletes correct.
        • When key columns can be updated, recommend enable-old-value=true so delete keys are correct.
    • Schema + DDL:
      • TiDB → Iceberg schema mapping + Parquet encoding for common types.
      • Schema evolution guarded by schema-mode=strict|evolve.
      • DDL support: safe schema evolution via ensure-table, RENAME TABLE(S) for Glue, TRUNCATE TABLE as overwrite-to-empty snapshot.
      • Partition-related DDLs are blocked (fail-fast).
    • Operational features:
      • Single-writer ownership enforcement per table (allow-takeover=true for controlled migrations).
      • Optional checkpoint tables:
        • Per-table checkpoint index: enable-checkpoint-table=true
        • Global checkpoint list: enable-global-checkpoint-table=true
      • Bounded buffering knobs: max-buffered-* (fail-fast backpressure).
      • Metrics: ticdc_sink_iceberg_* (commit duration, lag, retries/conflicts, bytes/files written, buffer gauges, etc.).
      • Ops tools:
        • cmd/iceberg-bootstrap: snapshot one table into Iceberg and print a recommended start-ts.
        • cmd/iceberg-maintenance: generate Spark SQL maintenance templates for Glue tables.
    • Tests:
      • Unit tests for partitioning/type mapping/schema validation and table writer behavior.
      • Integration tests:
        • tests/integration_tests/iceberg_append_basic
        • tests/integration_tests/iceberg_upsert_basic
        • Optional Spark readback verification gated by ICEBERG_SPARK_READBACK=1 (uses tests/integration_tests/_utils/iceberg_spark_sql_scalar).
      • Added iceberg cases into tests/integration_tests/run_light_it_in_ci.sh iceberg groups (pipeline hookup is a follow-up; see TODO in script).

Check List

Tests

  • Unit test:
    • GOCACHE=/tmp/go-build-cache go test ./pkg/sink/iceberg -count=1
  • Integration test (local Hadoop catalog):
    • make integration_test_iceberg CASE="iceberg_append_basic iceberg_upsert_basic"
  • Integration test + Spark readback (optional but stronger):
    • ICEBERG_SPARK_READBACK=1 ICEBERG_SPARK_PACKAGES='org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.1' make integration_test_iceberg CASE="iceberg_append_basic iceberg_upsert_basic"
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
  • No behavior change for existing sinks; this introduces a new sink type only.
  • Upsert mode is opt-in and enforces correctness constraints (handle key required; safe partitioning only).
Do you need to update user documentation, design documentation or monitoring documentation?
  • Monitoring: new ticdc_sink_iceberg_* metrics added.

Release note

 Add an `iceberg://` sink that streams TiDB changefeed events into Apache Iceberg tables (Glue/Hadoop catalogs) with append and upsert (equality delete) modes, plus checkpoint tables and Iceberg-specific
  metrics.

Summary by CodeRabbit

  • New Features

    • Full Apache Iceberg sink: append/upsert modes, bootstrap and maintenance CLIs, Glue/Hadoop/REST catalog support, checkpoint/global-checkpoint tables, partitioning, equality-delete/upsert semantics, truncation and single-writer takeover.
  • Documentation

    • Added design doc, user guide, and upsert optimization plan with examples and operational guidance.
  • Metrics

    • New Iceberg-specific Prometheus metrics for per-changefeed and per-table observability.
  • Tests

    • Extensive unit and integration tests and Spark readback utilities covering append/upsert, partitioning, manifests, Parquet/Avro, and schema evolution.
  • Chores

    • CI and Makefile updates for Iceberg integration runs; dependency and test tooling improvements.
  • Style

    • .gitignore updated to ignore generated/temp directories.

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-triage-completed first-time-contributor Indicates that the PR was contributed by an external member and is a first-time contributor. release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Jan 30, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 30, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign flowbehappy for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@pingcap-cla-assistant
Copy link

pingcap-cla-assistant bot commented Jan 30, 2026

CLA assistant check
All committers have signed the CLA.

@coderabbitai
Copy link

coderabbitai bot commented Jan 30, 2026

📝 Walkthrough

Walkthrough

Adds a native Apache Iceberg sink end-to-end: new sink implementation, config and scheme wiring, TableWriter (append/upsert), Glue/Hadoop catalog integrations, Parquet/Avro encoding, partitioning, checkpoints, CLI tools, metrics, tests, CI workflow, and integration-test tooling. Changes are additive.

Changes

Cohort / File(s) Summary
Sink wiring & dispatcher
downstreamadapter/sink/iceberg/sink.go, downstreamadapter/sink/sink.go, coordinator/changefeed/changefeed.go
Register Iceberg scheme and sink type; include Iceberg in sink selection and checkpoint messaging.
Config & protocol
pkg/common/types.go, pkg/config/sink.go, pkg/config/sink_protocol.go, pkg/config/changefeed.go
Add IcebergSinkType, IcebergConfig struct, IcebergScheme constant, helper to detect scheme, and cleanup logic for unused iceberg config.
Core TableWriter & catalogs
pkg/sink/iceberg/hadoop_table.go, pkg/sink/iceberg/glue_catalog.go, pkg/sink/iceberg/*_test.go
Implement TableWriter, metadata lifecycle, Glue catalog operations (create/update/rename), ownership/takeover, and extensive unit/integration tests.
Upsert & equality deletes
pkg/sink/iceberg/upsert_table.go, pkg/sink/iceberg/equality.go
Add Upsert workflow, equality-delete generation, and equality-field ID validation.
Encoding & schemas
pkg/sink/iceberg/parquet.go, pkg/sink/iceberg/avro.go, pkg/sink/iceberg/iceberg_schemas.go, pkg/sink/iceberg/manifest_entry_schema.go
Parquet encoding via Arrow, Avro OCF writer, manifest/manifest-entry schema constants and builder.
Partitioning & schema validation
pkg/sink/iceberg/partitioning.go, pkg/sink/iceberg/partitioning_test.go, pkg/sink/iceberg/schema_validation.go, pkg/sink/iceberg/schema_validation_test.go
Partition transform parsing/resolution, partition value computation/grouping, compatibility checks, and schema-evolution validation with tests.
File splitting, checkpoints & DDL
pkg/sink/iceberg/file_split.go, pkg/sink/iceberg/checkpoint.go, pkg/sink/iceberg/ddl.go
Row chunking by target file size, checkpoint/global-checkpoint APIs, TRUNCATE/overwrite implementation and helpers.
Metrics
pkg/metrics/iceberg.go, pkg/metrics/sink.go
Add Iceberg-specific Prometheus metrics and register them in sink metrics init.
Runtime sink & adapter
downstreamadapter/sink/iceberg/sink.go, pkg/sink/iceberg/...
Full sink runtime: Verify/New, buffering, commit loop, DDL handling, DMLEvent→ChangeRow conversion, collapse/upsert logic, commit semantics and public sink API methods.
CLI & maintenance tools
cmd/iceberg-bootstrap/main.go, cmd/iceberg-maintenance/main.go
Add bootstrap CLI to load TiDB snapshots into Iceberg and maintenance tool to generate Glue/Spark SQL templates.
Build, deps & CI
go.mod, Makefile, .github/workflows/integration_test_iceberg.yaml
Add/adjust dependencies (arrow-go, smithy-go), Makefile target for integration_test_iceberg, and new GitHub Actions workflow for Iceberg integration tests.
Integration tests & utilities
tests/integration_tests/iceberg_*, tests/integration_tests/_utils/*, tests/scripts/download-integration-test-binaries*.sh, tests/integration_tests/run_light_it_in_ci.sh
Add append/upsert integration tests, Spark helpers, TiFlash startup guard, schema-registry fallback downloader, and CI group wiring.
Docs & guides
docs/design/*.md, tests/integration_tests/README.md
Add design doc, user guide, upsert optimization plan, and integration-test instructions.
Ignore
.gitignore
Add metastore_db/, .gocache/, .gopath/ to gitignore.
Large new source surface
pkg/sink/iceberg/*.go (many files)
Extensive new package implementing Iceberg writer, partitioning, type mapping, metrics, and tests — review concurrency, error paths, and compatibility.
Makefile & test tooling
Makefile
Safer RELEASE_VERSION assignment, jq resolution tweak, and new integration_test_iceberg phony target.

Sequence Diagram(s)

sequenceDiagram
    participant TiDB
    participant TiCDC as TiCDC
    participant Sink as IcebergSink
    participant Storage as ObjectStorage
    participant Glue as GlueCatalog
    participant Checkpoint as CheckpointTable

    TiDB->>TiCDC: DML events (I/U/D)
    TiCDC->>Sink: AddDMLEvent
    activate Sink
    Sink->>Sink: Buffer & convert to ChangeRow
    Sink->>Sink: Partitioning & collapse upserts
    Sink->>Sink: Encode Parquet & build manifests
    Sink->>Storage: Write data files
    Sink->>Storage: Write manifests & manifest list (Avro OCF)
    Sink->>Storage: Persist metadata.json (snapshot)
    alt Glue catalog
        Sink->>Glue: Update metadata_location
    end
    Sink->>Checkpoint: Record checkpoint(s)
    Sink-->>TiCDC: CommitResult (snapshot id, committed ts)
    deactivate Sink
Loading
sequenceDiagram
    participant TiCDC as TiCDC
    participant Sink as IcebergSink (Upsert)
    participant Partitioner as Partitioner
    participant Storage as ObjectStorage
    participant Glue as GlueCatalog

    TiCDC->>Sink: AddDMLEvent
    activate Sink
    Sink->>Partitioner: compute partition keys & equality deletes
    Partitioner-->>Sink: partitioned batches
    loop CommitInterval
        Sink->>Sink: encode data -> Parquet
        Sink->>Storage: upload data files
        Sink->>Sink: encode delete files -> Parquet
        Sink->>Storage: upload delete files
        Sink->>Sink: build manifests + manifest list
        Sink->>Storage: persist metadata.json
        alt Glue catalog
            Sink->>Glue: update metadata_location
        end
        Sink-->>TiCDC: CommitResult
    end
    deactivate Sink
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120+ minutes

Suggested labels

lgtm

Suggested reviewers

  • 3AceShowHand
  • asddongmen
  • flowbehappy

Poem

🐇 I nibbled bytes and wrapped them in Avro,
I hopped through partitions, parquet in tow,
Glue hummed softly as snapshots took flight,
Checkpoints blinked steady in warm candlelight,
Hooray — Iceberg landed, the data grows bright!

🚥 Pre-merge checks | ✅ 4 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 7.46% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed Title clearly summarizes the main objective: implementing a native Iceberg sink for TiCDC changefeeds, which is the core feature of this PR.
Description check ✅ Passed PR description follows template structure with problem statement, detailed implementation explanation, tests, and questions sections. All major sections are properly filled.
Linked Issues check ✅ Passed PR fully addresses issue #4099 by implementing a complete iceberg:// sink with all required features: catalog support, commit model, append/upsert modes, schema mapping, DDL handling, operational controls, and comprehensive tests.
Out of Scope Changes check ✅ Passed All changes are scoped to implementing the Iceberg sink feature (new packages, files, configuration types, and tests). Changes to existing sink infrastructure are minimal and focused on integrating the new sink type.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot ti-chi-bot bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Jan 30, 2026
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Note

Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.

🤖 Fix all issues with AI agents
In `@downstreamadapter/sink/iceberg/sink.go`:
- Around line 1188-1191: The enum and set branches currently serialize numeric
indices (using strconv.FormatUint on d.GetMysqlEnum().Value /
d.GetMysqlSet().Value) but must produce the semantic string names; replace those
conversions by first extracting the numeric value (e.g., v :=
d.GetMysqlEnum().Value or v := d.GetMysqlSet().Value) and then call
types.ParseEnumValue(ft.GetElems(), v) for mysql.TypeEnum and
types.ParseSetValue(ft.GetElems(), v) for mysql.TypeSet, assigning the returned
string to value (handling any returned error as appropriate) so the output
matches the declared Iceberg string schema.

In `@pkg/sink/iceberg/hadoop_table_test.go`:
- Line 1: This new file is missing the standard PingCAP Apache license header;
add the project's canonical Apache-2.0/PingCAP header comment block at the very
top of the file (above the package iceberg line), matching the same wording and
year/owner format used in other repo files so the check-copyright CI passes.

In `@pkg/sink/iceberg/partitioning.go`:
- Line 1: This new file (partitioning.go) is missing the standard PingCAP Apache
header required by the repository; add the project's canonical Apache license
header comment at the very top of pkg/sink/iceberg/partitioning.go above the
package iceberg declaration so the file includes the required copyright and
license block.
- Around line 515-705: The stdlib parse errors returned directly must be wrapped
with cerror.WrapError(cerror.ErrSinkURIInvalid, err) to preserve stack traces;
update the error returns at each parse call site: in parsePartitionTimeValue
wrap the error from time.ParseInLocation, in parseIdentityPartitionValue wrap
errors from strconv.ParseInt, strconv.ParseFloat, strconv.ParseBool and
time.ParseInLocation, in truncatePartitionValue wrap errors from
strconv.ParseInt, in bucketHashBytes wrap errors from strconv.ParseInt, and also
wrap the error in parseEpochDaysString from strconv.ParseInt—replace direct err
returns with cerror.WrapError(cerror.ErrSinkURIInvalid, err) at each of these
unique call sites.

In `@pkg/sink/iceberg/type_mapping_test.go`:
- Line 1: Add the standard PingCAP Apache license header to the top of the new
test file type_mapping_test.go (package iceberg) so it matches the project's
required header format; insert the full multi-line Apache license comment block
exactly as used in other repository files above the "package iceberg"
declaration to satisfy the CI check-copyright.
🟡 Minor comments (28)
pkg/sink/iceberg/equality.go-1-8 (1)

1-8: ⚠️ Potential issue | 🟡 Minor

Add missing copyright header.

The pipeline failure indicates this file is missing the required copyright header. Add it at the top of the file.

Proposed fix
+// Copyright 2025 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package iceberg
pkg/sink/iceberg/avro.go-1-7 (1)

1-7: ⚠️ Potential issue | 🟡 Minor

Add missing copyright header.

The pipeline failure indicates this file is missing the required copyright header.

Proposed fix
+// Copyright 2025 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package iceberg
 
 import (
 	"bytes"
 
+	"github.com/pingcap/errors"
 	"github.com/linkedin/goavro/v2"
 )
pkg/sink/iceberg/type_mapping.go-1-1 (1)

1-1: ⚠️ Potential issue | 🟡 Minor

Add the required copyright header.

The pipeline check failed because this file is missing the standard copyright header.

pkg/sink/iceberg/manifest_entry_schema.go-1-1 (1)

1-1: ⚠️ Potential issue | 🟡 Minor

Add the required copyright header.

The pipeline check failed because this file is missing the standard copyright header.

pkg/sink/iceberg/iceberg_schemas.go-1-1 (1)

1-1: ⚠️ Potential issue | 🟡 Minor

Add the required copyright header.

The pipeline check failed because this file is missing the standard copyright header.

cmd/iceberg-maintenance/main.go-1-1 (1)

1-1: ⚠️ Potential issue | 🟡 Minor

Add the required copyright header.

The pipeline check failed because this file is missing the standard copyright header. Add the appropriate copyright notice at the top of the file.

pkg/sink/iceberg/schema_validation.go-1-1 (1)

1-1: ⚠️ Potential issue | 🟡 Minor

Add the required copyright header.

The pipeline check failed because this file is missing the standard copyright header.

pkg/sink/iceberg/config.go-1-1 (1)

1-1: ⚠️ Potential issue | 🟡 Minor

Add the required copyright header.

The pipeline indicates that the copyright information is missing.

Proposed fix
+// Copyright 2024 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package iceberg
pkg/sink/iceberg/glue_catalog.go-388-391 (1)

388-391: ⚠️ Potential issue | 🟡 Minor

Avoid double-wrapping errors with cerror.Trace.

Same issue as in ensureGlueTable - the error is already wrapped inside the retry function.

Proposed fix
 		retry.WithIsRetryableErr(isGlueRetryableError),
 	)
-	return cerror.Trace(err)
+	return err
 }
pkg/sink/iceberg/hadoop_table.go-1-1 (1)

1-1: ⚠️ Potential issue | 🟡 Minor

Add the required copyright header.

The pipeline indicates that the copyright information is missing.

Proposed fix
+// Copyright 2024 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package iceberg
pkg/metrics/iceberg.go-1-1 (1)

1-1: ⚠️ Potential issue | 🟡 Minor

Add the required copyright header.

The pipeline indicates that the copyright information is missing.

Proposed fix
+// Copyright 2024 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package metrics
pkg/sink/iceberg/glue_catalog.go-1-1 (1)

1-1: ⚠️ Potential issue | 🟡 Minor

Add the required copyright header.

The pipeline indicates that the copyright information is missing.

Proposed fix
+// Copyright 2024 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package iceberg
pkg/sink/iceberg/glue_catalog.go-227-230 (1)

227-230: ⚠️ Potential issue | 🟡 Minor

Avoid double-wrapping errors with cerror.Trace.

The error returned from retry.Do is already wrapped with cerror.Trace inside the retry function (lines 119, 133, 144, 165, 219). Wrapping again at line 229 adds redundant stack traces. As per coding guidelines, upstream callers should propagate wrapped errors without wrapping again.

Proposed fix
 		retry.WithIsRetryableErr(isGlueRetryableError),
 	)
-	return cerror.Trace(err)
+	return err
 }
pkg/sink/iceberg/hadoop_table.go-880-902 (1)

880-902: ⚠️ Potential issue | 🟡 Minor

Wrap storage read operation errors.

Errors from w.storage.FileExists and w.storage.ReadFile calls should be wrapped with cerror.Trace.

Proposed fix
 	exists, err := w.storage.FileExists(ctx, hintRel)
 	if err != nil {
-		return 0, nil, err
+		return 0, nil, cerror.Trace(err)
 	}
 	if !exists {
 		return 0, nil, nil
 	}

 	hintBytes, err := w.storage.ReadFile(ctx, hintRel)
 	if err != nil {
-		return 0, nil, err
+		return 0, nil, cerror.Trace(err)
 	}
 	metadataBytes, err := w.storage.ReadFile(ctx, metadataRel)
 	if err != nil {
-		return 0, nil, err
+		return 0, nil, cerror.Trace(err)
 	}
pkg/sink/iceberg/file_split.go-1-1 (1)

1-1: ⚠️ Potential issue | 🟡 Minor

Add the required copyright header.

The pipeline indicates that the copyright information is missing.

Proposed fix
+// Copyright 2024 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package iceberg
pkg/sink/iceberg/parquet.go-1-1 (1)

1-1: ⚠️ Potential issue | 🟡 Minor

Add the required copyright header.

The pipeline indicates that the copyright information is missing. Add the standard Apache 2.0 license header at the top of the file.

Proposed fix
+// Copyright 2024 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package iceberg
pkg/sink/iceberg/parquet.go-371-374 (1)

371-374: ⚠️ Potential issue | 🟡 Minor

Wrap time.Parse error.

The error from time.Parse should be wrapped to provide better stack traces for debugging.

Proposed fix
 	t, err := time.Parse(time.RFC3339Nano, raw)
 	if err != nil {
-		return err
+		return cerror.Trace(err)
 	}
pkg/sink/iceberg/hadoop_table.go-538-540 (1)

538-540: ⚠️ Potential issue | 🟡 Minor

Wrap storage operation errors with cerror.Trace.

Errors from w.storage.WriteFile calls are returned unwrapped. Based on coding guidelines, errors from external calls should be wrapped to attach stack traces for debugging.

Example fix for one occurrence (apply similar pattern to others)
 	if err := w.storage.WriteFile(ctx, dataRel, dataBytes); err != nil {
-		return nil, err
+		return nil, cerror.Trace(err)
 	}

Also applies to: 608-610, 663-665, 789-791

pkg/sink/iceberg/parquet.go-186-191 (1)

186-191: ⚠️ Potential issue | 🟡 Minor

Wrap third-party library error with cerror.Trace.

The error from pqarrow.WriteTable is a third-party library call and should be wrapped to attach a stack trace. As per coding guidelines, errors from library calls should be wrapped immediately.

Proposed fix
 	props := parquet.NewWriterProperties(parquet.WithCompression(compress.Codecs.Zstd))
 	if err := pqarrow.WriteTable(tbl, &buf, int64(len(rows)), props, pqarrow.DefaultWriterProps()); err != nil {
-		return nil, err
+		return nil, cerror.Trace(err)
 	}
pkg/sink/iceberg/parquet.go-352-355 (1)

352-355: ⚠️ Potential issue | 🟡 Minor

Wrap strconv.ParseUint error.

The error from strconv.ParseUint is not wrapped. Based on coding guidelines, errors from library calls should be wrapped with errors.Trace or WrapError.

Proposed fix
 	u, err := strconv.ParseUint(raw, 10, 64)
 	if err != nil {
-		return err
+		return cerror.Trace(err)
 	}
tests/integration_tests/iceberg_append_basic/run.sh-140-141 (1)

140-141: ⚠️ Potential issue | 🟡 Minor

Use "$@" instead of $* to prevent whitespace issues.

Same issue as in the upsert test script.

Proposed fix
-prepare $*
-iceberg_check_append_basic $*
+prepare "$@"
+iceberg_check_append_basic "$@"
tests/integration_tests/iceberg_upsert_basic/run.sh-144-145 (1)

144-145: ⚠️ Potential issue | 🟡 Minor

Use "$@" instead of $* to prevent whitespace issues.

The static analysis correctly identifies that $* can cause problems with arguments containing whitespace.

Proposed fix
-prepare $*
-iceberg_check_upsert_basic $*
+prepare "$@"
+iceberg_check_upsert_basic "$@"
pkg/sink/iceberg/ddl.go-1-11 (1)

1-11: ⚠️ Potential issue | 🟡 Minor

Add missing copyright header.

The pipeline failure indicates this file is missing the required copyright header. Add the standard TiCDC copyright notice at the top of the file.

Proposed fix
+// Copyright 2024 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package iceberg
cmd/iceberg-bootstrap/main.go-1-27 (1)

1-27: ⚠️ Potential issue | 🟡 Minor

Add missing copyright header.

The pipeline failure indicates this file is missing the required copyright header.

Proposed fix
+// Copyright 2024 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package main
pkg/sink/iceberg/checkpoint.go-1-14 (1)

1-14: ⚠️ Potential issue | 🟡 Minor

Add missing copyright header.

The pipeline failure indicates this file is missing the required copyright header.

Proposed fix
+// Copyright 2024 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package iceberg
pkg/sink/iceberg/upsert_table.go-1-13 (1)

1-13: ⚠️ Potential issue | 🟡 Minor

Add missing copyright header.

The pipeline failure indicates this file is missing the required copyright header.

Proposed fix
+// Copyright 2024 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package iceberg
pkg/sink/iceberg/partitioning_test.go-1-1 (1)

1-1: ⚠️ Potential issue | 🟡 Minor

Missing copyright header causes CI failure.

The pipeline reports check-copyright: The copyright information of this file is incorrect. Add the standard copyright header at the top of the file to resolve the CI failure.

Proposed fix
+// Copyright 2025 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package iceberg
downstreamadapter/sink/iceberg/sink.go-1-1 (1)

1-1: ⚠️ Potential issue | 🟡 Minor

Missing copyright header causes CI failure.

The pipeline reports check-copyright: The copyright information of this file is incorrect. Add the standard copyright header at the top of the file.

Proposed fix
+// Copyright 2025 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package iceberg
🧹 Nitpick comments (18)
pkg/sink/iceberg/equality.go (1)

13-19: Consider using a more semantically appropriate error type.

ErrSinkURIInvalid is intended for sink URI parsing/validation errors, but these errors relate to table schema validation (nil table info, missing handle key columns). Using it here makes error messages confusing for operators debugging issues.

Consider defining or using a more appropriate error type (e.g., ErrIcebergTableSchema or similar) that better describes the actual problem domain.

tests/integration_tests/README.md (1)

111-116: Fix blockquote formatting to resolve markdown linting warning.

The blank line between blockquotes (line 112) triggers a markdown lint warning. Consider consolidating into a single blockquote block.

Proposed fix
-   > `Iceberg sink` can be tested with `make integration_test_iceberg CASE=iceberg_append_basic`.
-
-   > Iceberg Spark readback (optional): set `ICEBERG_SPARK_READBACK=1` and provide Iceberg Spark runtime via
+   > `Iceberg sink` can be tested with `make integration_test_iceberg CASE=iceberg_append_basic`.
+   >
+   > Iceberg Spark readback (optional): set `ICEBERG_SPARK_READBACK=1` and provide Iceberg Spark runtime via
pkg/sink/iceberg/avro.go (1)

16-38: Wrap third-party library errors with stack traces.

Per coding guidelines, errors from third-party library calls should be wrapped with errors.Trace(err) to attach stack traces for debugging.

Proposed fix
 func writeOCF(schema string, meta map[string][]byte, compressionName string, records []any) ([]byte, error) {
 	codec, err := goavro.NewCodec(schema)
 	if err != nil {
-		return nil, err
+		return nil, errors.Trace(err)
 	}
 
 	buf := bytes.NewBuffer(nil)
 	writer, err := goavro.NewOCFWriter(goavro.OCFConfig{
 		W:               buf,
 		Codec:           codec,
 		CompressionName: compressionName,
 		MetaData:        meta,
 	})
 	if err != nil {
-		return nil, err
+		return nil, errors.Trace(err)
 	}
 
 	if len(records) > 0 {
 		if err := writer.Append(records); err != nil {
-			return nil, err
+			return nil, errors.Trace(err)
 		}
 	}
 	return buf.Bytes(), nil
 }

As per coding guidelines: "When an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err) or errors.WrapError(...) to attach a stack trace."

cmd/iceberg-maintenance/main.go (2)

101-103: Wrap the AWS API error for stack trace context.

Per coding guidelines, errors from library/third-party calls should be wrapped with errors.Trace(err) to attach a stack trace.

Proposed fix
+	"github.com/pingcap/errors"
 		if err != nil {
-			return nil, err
+			return nil, errors.Trace(err)
 		}

62-87: Consider caching trimmed flag values to reduce repetition.

strings.TrimSpace(*sparkCatalog) and strings.TrimSpace(*database) are called repeatedly. Trim once after parsing flags for cleaner code.

Proposed refactor
 	flag.Parse()
 
-	if strings.TrimSpace(*database) == "" {
+	dbName := strings.TrimSpace(*database)
+	catalogName := strings.TrimSpace(*sparkCatalog)
+
+	if dbName == "" {
 		fmt.Fprintln(os.Stderr, "missing required flag: --database")
 		os.Exit(2)
 	}

Then use dbName and catalogName throughout.

pkg/sink/iceberg/type_mapping.go (1)

96-105: Consider adding scale <= prec validation for decimal parsing.

The validation checks prec > 0 && scale >= 0 but doesn't verify scale <= prec. While this would only matter for malformed input, adding this check would make the validation consistent with mapTiDBFieldType (line 46).

Proposed fix
-				if err1 == nil && err2 == nil && prec > 0 && scale >= 0 {
+				if err1 == nil && err2 == nil && prec > 0 && scale >= 0 && scale <= prec {
pkg/sink/iceberg/manifest_entry_schema.go (1)

68-71: Wrap the JSON marshaling error.

Per coding guidelines, errors from library calls should be wrapped with errors.Trace(err) to attach a stack trace.

Proposed fix
+	"github.com/pingcap/errors"
 	schemaBytes, err := json.Marshal(schema)
 	if err != nil {
-		return "", err
+		return "", errors.Trace(err)
 	}
pkg/sink/iceberg/schema_validation.go (1)

51-52: Remove the empty default case.

An empty default case serves no purpose. Simply omit it for cleaner idiomatic Go.

Proposed fix
 	case "float":
 		return newType == "double"
-	default:
 	}
tests/integration_tests/_utils/iceberg_spark_sql_scalar (1)

126-129: Filtering lines with whitespace may skip valid scalar values.

The filter if (line ~ /[ \t]/) next skips any line containing spaces or tabs. While this helps filter log lines, it would also skip legitimate scalar values containing spaces (e.g., string results). If this script is only used for numeric scalars (counts, sums), this is fine; otherwise, consider documenting this limitation.

pkg/sink/iceberg/config.go (2)

264-273: Consider consolidating duplicate query parameter handling.

The code handles both schema-mode and schema_mode query parameters. While this provides compatibility, consider adding a comment explaining why both are supported to prevent future maintainers from removing one.

Suggested documentation
+	// Support both hyphenated and underscored parameter names for compatibility
 	if v := strings.TrimSpace(query.Get("schema-mode")); v != "" {
 		if err := c.setSchemaMode(v); err != nil {
 			return err
 		}
 	}
+	// Legacy parameter name support
 	if v := strings.TrimSpace(query.Get("schema_mode")); v != "" {

383-396: Silent clamping of commit interval may surprise users.

The setCommitInterval function silently clamps values outside the valid range instead of returning an error. Consider logging a warning when clamping occurs so users are aware their requested value was adjusted.

Makefile (1)

3-8: Add integration_test_iceberg to .PHONY for consistency.

This keeps the target consistent with the other integration_test_* entries.

♻️ Suggested tweak
-.PHONY: clean fmt check tidy \
+.PHONY: clean fmt check tidy \
 	generate-protobuf generate_mock \
 	cdc kafka_consumer storage_consumer pulsar_consumer filter_helper \
 	prepare_test_binaries \
-	unit_test_in_verify_ci integration_test_build integration_test_build_fast integration_test_mysql integration_test_kafka integration_test_storage integration_test_pulsar \
+	unit_test_in_verify_ci integration_test_build integration_test_build_fast integration_test_mysql integration_test_kafka integration_test_storage integration_test_pulsar integration_test_iceberg \
 	generate-next-gen-grafana

Also applies to: 246-247

pkg/sink/iceberg/upsert_table.go (1)

53-55: Redundant safety check for equality deletes.

The isSafeForEqualityDeletes check at line 134 is redundant since the same condition was already validated at line 53, which returns an error if unsafe. The else branch (line 137) is unreachable dead code.

Proposed simplification
-	var (
-		deleteGroups []partitionGroup
-		groupErr     error
-	)
-	if resolvedSpec.isSafeForEqualityDeletes(equalityFieldIDs) {
-		deleteGroups, groupErr = resolvedSpec.groupRows(deleteRows)
-	} else {
-		deleteGroups = []partitionGroup{{partition: resolvedSpec.emptyPartitionRecord(), rows: deleteRows}}
-	}
-	if groupErr != nil {
-		return nil, groupErr
-	}
+	deleteGroups, err := resolvedSpec.groupRows(deleteRows)
+	if err != nil {
+		return nil, err
+	}

Also applies to: 134-138

tests/integration_tests/iceberg_upsert_basic/run.sh (1)

33-46: Consider extracting wait_file_exists to shared utilities.

This helper function is duplicated verbatim in iceberg_append_basic/run.sh. Consider moving it to tests/integration_tests/_utils/ for reuse.

cmd/iceberg-bootstrap/main.go (2)

260-281: Consider wrapping errors from library calls.

Per coding guidelines, errors from third-party or library calls should be wrapped with errors.Trace(err) or errors.WrapError(...) to attach stack traces. The getSnapshotPoint function returns unwrapped errors from db.QueryRowContext, strconv.ParseUint, and time.ParseInLocation.

Proposed fix
 func getSnapshotPoint(ctx context.Context, db *sql.DB) (uint64, string, error) {
 	var tsoStr string
 	if err := db.QueryRowContext(ctx, "select @@tidb_current_ts").Scan(&tsoStr); err != nil {
-		return 0, "", err
+		return 0, "", errors.Trace(err)
 	}
 	tsoStr = strings.TrimSpace(tsoStr)
 	tso, err := strconv.ParseUint(tsoStr, 10, 64)
 	if err != nil {
-		return 0, "", err
+		return 0, "", errors.Trace(err)
 	}

 	var tsStr string
 	if err := db.QueryRowContext(ctx, "select TIDB_PARSE_TSO(?)", tsoStr).Scan(&tsStr); err != nil {
-		return 0, "", err
+		return 0, "", errors.Trace(err)
 	}

 	parsed, err := time.ParseInLocation("2006-01-02 15:04:05.999999", strings.TrimSpace(tsStr), time.UTC)
 	if err != nil {
-		return 0, "", err
+		return 0, "", errors.Trace(err)
 	}
 	return tso, parsed.UTC().Format(time.RFC3339Nano), nil
 }

Based on coding guidelines: "When an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err) or errors.WrapError(...) to attach a stack trace."


283-296: Wrap errors in setTiDBSnapshot and clearTiDBSnapshot.

These functions return unwrapped errors from standard library calls.

Proposed fix
 func setTiDBSnapshot(ctx context.Context, db *sql.DB, snapshotTimeRFC3339 string) error {
 	t, err := time.Parse(time.RFC3339Nano, snapshotTimeRFC3339)
 	if err != nil {
-		return err
+		return errors.Trace(err)
 	}
 	snapshot := t.UTC().Format("2006-01-02 15:04:05.999999")
 	_, err = db.ExecContext(ctx, "set @@tidb_snapshot = ?", snapshot)
-	return err
+	return errors.Trace(err)
 }

 func clearTiDBSnapshot(ctx context.Context, db *sql.DB) error {
 	_, err := db.ExecContext(ctx, "set @@tidb_snapshot = ''")
-	return err
+	return errors.Trace(err)
 }

Based on coding guidelines: "When an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err)."

downstreamadapter/sink/iceberg/sink.go (2)

87-124: Consider extracting common validation logic.

The validation logic for catalog and mode (lines 92-110) is duplicated in the New function (lines 137-155). Consider extracting to a helper function to reduce duplication.

Suggested helper function
func validateConfig(cfg *sinkiceberg.Config) error {
	switch cfg.Catalog {
	case sinkiceberg.CatalogHadoop, sinkiceberg.CatalogGlue:
	default:
		return errors.ErrSinkURIInvalid.GenWithStackByArgs(fmt.Sprintf("iceberg catalog is not supported yet: %s", cfg.Catalog))
	}
	if cfg.Catalog == sinkiceberg.CatalogGlue {
		warehouseURL, err := url.Parse(cfg.WarehouseURI)
		if err != nil {
			return errors.WrapError(errors.ErrSinkURIInvalid, err)
		}
		if warehouseURL.Scheme != "s3" {
			return errors.ErrSinkURIInvalid.GenWithStackByArgs("iceberg glue catalog requires a s3 warehouse")
		}
	}
	switch cfg.Mode {
	case sinkiceberg.ModeAppend, sinkiceberg.ModeUpsert:
	default:
		return errors.ErrSinkURIInvalid.GenWithStackByArgs(fmt.Sprintf("iceberg mode is not supported yet: %s", cfg.Mode))
	}
	return nil
}

436-451: Consider using a more appropriate error type for buffer overflow.

ErrSinkURIInvalid is semantically a configuration/validation error, but here it's used for runtime backpressure conditions. Consider using or creating a more appropriate error type like ErrBufferFull or ErrBackpressure to better distinguish configuration errors from runtime overflow conditions.

@gemini-code-assist
Copy link

Summary of Changes

Hello @BornChanger, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a new Iceberg sink to TiCDC, allowing users to replicate TiDB changes into Apache Iceberg tables. It supports various catalog types, commit models, table modes, schema evolution, and operational features, along with accompanying tools and tests. The primary goal is to provide a native sink for Apache Iceberg, enabling continuous replication of TiDB changes into query-ready Iceberg tables.

Highlights

  • Iceberg Sink Implementation: Introduces a new iceberg:// sink for TiCDC, enabling users to continuously replicate TiDB changes into Apache Iceberg tables.
  • Catalog and Warehouse Support: Supports catalog=glue with warehouse=s3://... for AWS Glue and catalog=hadoop with warehouse=file://... for local/dev environments.
  • Commit Model: Implements a commit model that buffers row changes per table and commits snapshots periodically using TiCDC resolved-ts as a correctness barrier.
  • Table Modes: Offers two table modes: mode=append for append-only changelog tables and mode=upsert for "latest row" semantics using Iceberg v2 equality deletes.
  • DDL Support: Provides safe schema evolution via ensure-table, RENAME TABLE(S) for Glue, and TRUNCATE TABLE as overwrite-to-empty snapshot, while blocking partition-related DDLs.
  • Operational Features: Includes single-writer ownership enforcement, optional checkpoint tables, bounded buffering knobs, and ticdc_sink_iceberg_* metrics.
  • Ops Tools: Adds cmd/iceberg-bootstrap for snapshotting tables into Iceberg and cmd/iceberg-maintenance for generating Spark SQL maintenance templates.
  • Integration Tests: Adds integration tests for append and upsert modes, with optional Spark readback verification.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This is an impressive and extensive pull request that introduces a new Iceberg sink to TiCDC. The implementation is comprehensive, covering data replication in both append and upsert modes, integration with Hadoop and AWS Glue catalogs, schema evolution, partitioning, and operational tooling. The code is well-structured and includes new metrics and integration tests. My main feedback is on the logic for collapsing transactions in upsert mode, which could be fragile due to its assumptions about event ordering. Overall, this is a significant and well-executed feature addition.

Signed-off-by: BornChanger <dawn_catcher@126.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In @.github/workflows/integration_test_iceberg.yaml:
- Around line 38-52: Update the GitHub Actions steps to use the current v5 of
the checkout and cache actions: change the uses entries for actions/checkout@v3
and actions/cache@v3 to actions/checkout@v5 and actions/cache@v5 respectively,
leaving actions/setup-go@v6 unchanged; ensure the Cache Tools step (id:
cache-tools) keeps its existing inputs (path and key) so caching behavior
remains the same after the version bump.

In `@downstreamadapter/sink/iceberg/sink.go`:
- Around line 545-557: The current buildKey function concatenates key column
values with a fixed separator which can collide if a column contains that byte;
change buildKey (the anonymous func assigned to buildKey that takes
sinkiceberg.ChangeRow and iterates keyColumnNames) to encode each key part with
a length-prefix before the raw bytes (e.g., write the uint32/varint length then
the value bytes) instead of using a separator, keep the existing nil/missing
checks on row.Columns[name], and return the combined length-prefixed string so
keys are unambiguous and collision-free.
🧹 Nitpick comments (12)
pkg/sink/iceberg/schema_validation.go (1)

54-57: Redundant whitespace trimming.

The input parameters oldType and newType are already trimmed by the caller at lines 38-39 before being passed to isSafeIcebergTypeWidening. The trim operation on line 55 is redundant.

♻️ Suggested simplification
 func isSafeIcebergTypeWidening(oldType string, newType string) bool {
-	if strings.TrimSpace(oldType) == strings.TrimSpace(newType) {
+	if oldType == newType {
 		return true
 	}
tests/integration_tests/_utils/iceberg_spark_sql_scalar (1)

107-134: Clarify fallback scalar extraction behavior.

The fallback AWK parser (lines 109-133) continuously reassigns val=line, meaning it returns the last non-log, non-whitespace-containing line rather than the first. This works if the desired scalar appears at the end, but could be surprising if multiple candidate lines exist.

Consider documenting this behavior or using exit after the first valid match if "first scalar" semantics are intended:

♻️ Alternative: exit on first match
 			# Prefer scalar-like lines without whitespace.
 			if (line ~ /[ \t]/) next
-			val=line
+			print line
+			exit
 		}
-		END { print val }
 	'
tests/integration_tests/README.md (1)

111-117: Fix blank line inside blockquote (markdownlint MD028).

There's a blank line between blockquotes at line 112, which violates markdown best practices. Consider merging these into a single blockquote or removing the blank line.

Suggested fix
-   > `Iceberg sink` can be tested with `make integration_test_iceberg CASE=iceberg_append_basic`.
-
-   > Iceberg Spark readback (optional): set `ICEBERG_SPARK_READBACK=1` and provide Iceberg Spark runtime via
+   > `Iceberg sink` can be tested with `make integration_test_iceberg CASE=iceberg_append_basic`.
+   >
+   > Iceberg Spark readback (optional): set `ICEBERG_SPARK_READBACK=1` and provide Iceberg Spark runtime via
pkg/config/sink.go (1)

707-764: Well-structured configuration with comprehensive field documentation.

The IcebergConfig struct covers all necessary configuration aspects for the Iceberg sink. Consider adding a validate() method similar to PulsarConfig.validate() to perform early validation of configuration values (e.g., valid mode values, valid catalog types, positive buffer limits).

pkg/sink/iceberg/parquet.go (1)

311-324: Silent fallback on base64 decode failure may mask data issues.

When base64.StdEncoding.DecodeString fails, the code silently uses the raw string bytes. This could hide upstream data corruption or encoding mismatches. Consider logging a warning or tracking a metric when this fallback occurs.

Suggested improvement
 case *array.BinaryBuilder:
 	return func(v *string) error {
 		if v == nil {
 			builder.AppendNull()
 			return nil
 		}
 		decoded, err := base64.StdEncoding.DecodeString(*v)
 		if err != nil {
+			// Log at debug level to help troubleshoot encoding issues
+			// without failing the pipeline for non-base64 binary data.
 			builder.Append([]byte(*v))
 			return nil
 		}
 		builder.Append(decoded)
 		return nil
 	}, nil
pkg/sink/iceberg/upsert_table.go (1)

147-154: Redundant safety check for equality deletes.

The isSafeForEqualityDeletes check at line 147 is redundant because the same check at line 66 already returns an error if the condition fails. The else branch at line 150 creating deleteGroups with an empty partition is effectively unreachable for partitioned tables.

If this is intentional as a defensive measure for future changes, consider adding a comment. Otherwise, simplify:

♻️ Suggested simplification
-		var (
-			deleteGroups []partitionGroup
-			groupErr     error
-		)
-		if resolvedSpec.isSafeForEqualityDeletes(equalityFieldIDs) {
-			deleteGroups, groupErr = resolvedSpec.groupRows(deleteRows)
-		} else {
-			deleteGroups = []partitionGroup{{partition: resolvedSpec.emptyPartitionRecord(), rows: deleteRows}}
-		}
-		if groupErr != nil {
-			return nil, groupErr
-		}
+		// Safe for equality deletes was validated at function entry
+		deleteGroups, err := resolvedSpec.groupRows(deleteRows)
+		if err != nil {
+			return nil, err
+		}
pkg/sink/iceberg/checkpoint.go (1)

121-150: Unused error field in sync.Once pattern.

The checkpointTableErr field is declared but never assigned in the sync.Once block. If common.WrapTableInfo can return an error, it should be captured. If it cannot fail, the error field can be removed.

♻️ If WrapTableInfo cannot fail, simplify the return
-	return w.checkpointTableInfo, w.checkpointTableErr
+	return w.checkpointTableInfo, nil

Or if it can fail, capture the error inside the Do block.

pkg/sink/iceberg/config.go (1)

396-409: Silent clamping of commit interval may surprise users.

When the user provides an interval outside bounds, the value is silently adjusted. Consider logging a warning so users are aware their configuration was modified.

♻️ Add warning log for clamped values
 func (c *Config) setCommitInterval(intervalStr string) error {
 	interval, err := time.ParseDuration(intervalStr)
 	if err != nil {
 		return cerror.WrapError(cerror.ErrSinkURIInvalid, err)
 	}
 	if interval < minCommitInterval {
+		log.Warn("commit interval below minimum, using minimum",
+			zap.Duration("requested", interval),
+			zap.Duration("minimum", minCommitInterval))
 		interval = minCommitInterval
 	}
 	if interval > maxCommitInterval {
+		log.Warn("commit interval above maximum, using maximum",
+			zap.Duration("requested", interval),
+			zap.Duration("maximum", maxCommitInterval))
 		interval = maxCommitInterval
 	}
 	c.CommitInterval = interval
 	return nil
 }
pkg/sink/iceberg/glue_catalog.go (1)

121-148: Variable shadowing may cause confusion.

At line 126, err is redeclared with := inside the retry function, which shadows the outer err variable. While Go allows this, it can be confusing. Consider using = instead since err is already in scope.

♻️ Avoid variable shadowing
+		var getDBErr error
-		_, err = client.GetDatabase(ctx, &glue.GetDatabaseInput{
+		_, getDBErr = client.GetDatabase(ctx, &glue.GetDatabaseInput{
 			Name: aws.String(dbName),
 		})
-		if err != nil {
+		if getDBErr != nil {
 			var notFound *gluetypes.EntityNotFoundException
-			if !cerror.As(err, &notFound) {
-				return cerror.Trace(err)
+			if !cerror.As(getDBErr, &notFound) {
+				return cerror.Trace(getDBErr)
 			}

Alternatively, keep the pattern but add a comment explaining the intentional shadowing within the closure scope.

pkg/sink/iceberg/hadoop_table.go (2)

269-274: Returning nil, nil for empty rows is ambiguous.

When rows is empty, the function returns nil, nil. This makes it indistinguishable from an error condition at the call site without additional checks. Consider either:

  1. Returning a zero-value CommitResult with BytesWritten: 0
  2. Documenting this behavior clearly in a function comment
♻️ Option: Return zero-value result instead
 func (w *TableWriter) AppendChangelog(
 	...
 ) (*CommitResult, error) {
 	if len(rows) == 0 {
-		return nil, nil
+		return &CommitResult{}, nil
 	}

841-861: Magic number for version overflow limit.

The value 1024 for maximum metadata version iterations should be a named constant for clarity and maintainability.

♻️ Extract constant
+const maxMetadataVersionRetries = 1024
+
 func (w *TableWriter) nextAvailableMetadataVersion(ctx context.Context, metadataDirRel string, startVersion int) (int, error) {
 	...
-	for i := 0; i < 1024; i++ {
+	for i := 0; i < maxMetadataVersionRetries; i++ {
downstreamadapter/sink/iceberg/sink.go (1)

99-191: Consider extracting shared config validation between Verify and New.

Both functions repeat catalog/mode/warehouse checks, which is easy to let drift. A small helper would keep behavior consistent.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🤖 Fix all issues with AI agents
In `@cmd/iceberg-bootstrap/main.go`:
- Around line 273-309: The helper functions getSnapshotPoint, setTiDBSnapshot,
and clearTiDBSnapshot currently return raw errors from database and stdlib
calls; wrap all returned errors with errors.Trace() so callers receive traced
errors (e.g., change returns like "return 0, \"\", err" and "return err" inside
getSnapshotPoint, setTiDBSnapshot, and clearTiDBSnapshot to use
errors.Trace(err)); update every early-return and final return in
getSnapshotPoint (all Scan/Parse errors), setTiDBSnapshot (time.Parse and
ExecContext errors), and clearTiDBSnapshot (ExecContext error) to return
errors.Trace(err) instead of the raw err so stack traces are preserved.

In `@pkg/sink/iceberg/glue_catalog.go`:
- Around line 150-179: When handling CreateTable in the Glue catalog code, don't
swallow gluetypes.AlreadyExistsException; instead, on detecting the
AlreadyExistsException from client.CreateTable re-fetch the table via
client.GetTable (the same path used earlier) so you can proceed to the existing
metadata update logic (the code that updates "metadata_location" and
StorageDescriptor Location). Concretely: in the CreateTable error branch that
checks cerror.As(err, &exists), replace the early return with a GetTable retry
and fall through to the existing update flow that inspects/sets metadataLocation
and parameters, ensuring the stale metadata_location is corrected after the
race. Ensure you still return other errors as cerror.Trace(err).

In `@pkg/sink/iceberg/hadoop_table.go`:
- Around line 276-306: The snapshotID generated with now.UnixMilli() can
collide; update the snapshot creation logic (where snapshotID is set) to ensure
monotonic unique IDs by comparing against currentMetadata.CurrentSnapshotID
(when currentMetadata != nil) and, if now.UnixMilli() <=
currentMetadata.CurrentSnapshotID, set snapshotID to
currentMetadata.CurrentSnapshotID + 1; keep snapshotID as the same integer type
used by currentSnapshot()/metadata to avoid type mismatches and preserve
uniqueness for the snapshot lookup logic.

In `@pkg/sink/iceberg/parquet.go`:
- Around line 311-324: The BinaryBuilder case currently swallows base64 decode
failures and appends raw bytes (in the case handling inside the closure using
base64.StdEncoding.DecodeString and builder.Append), which can hide data issues;
modify the closure to surface decode errors instead of silently falling back:
either return the decode error from the closure so callers can handle it, or
emit a clear warning via the package logger (include the original string and
error) before deciding to append raw bytes, and ensure the change is applied to
the anonymous func returned in the case *array.BinaryBuilder so callers of that
func will see the error or the logged warning.

In `@pkg/sink/iceberg/partitioning.go`:
- Around line 16-30: The file's import formatting is out of sync with the
project's formatter; run the project's formatting step (make fmt) to apply
gci/gofmt rules to pkg/sink/iceberg/partitioning.go so imports are
ordered/grouped and the file matches repo style (ensure the import block and
overall file formatting are updated and commit the changes).
- Around line 359-395: The partition key building in
resolvedPartitionSpec.groupRows currently uses strings.Join(keyParts, "|") with
"null" sentinel which can collide with real values; replace this with a
collision-safe encoding for each key part (include a type tag and length prefix)
when constructing keyParts so distinct values (including the string "null" and
values containing '|') never collide—e.g., for each computed raw from
computePartitionValue/wrapUnion produce a serialized token like
"<type>:<length>:<value>" or "<nil>" for nil, append those tokens to keyParts
and then join; ensure you update the logic around key, groups[key], and order so
grouping behavior is unchanged other than fixing collisions in groupRows.

In `@pkg/sink/iceberg/upsert_table.go`:
- Around line 28-43: Add a nil-check at the top of TableWriter.Upsert to guard
against a nil receiver or nil config before any use of w.cfg (e.g., before
accessing w.cfg.EmitMetadataColumns and w.cfg.TargetFileSizeBytes); if w == nil
|| w.cfg == nil return an appropriate error (consistent with other functions,
e.g., cerror.ErrSinkURIInvalid.GenWithStackByArgs("table writer or cfg is nil"))
so the function fails fast instead of causing a nil pointer dereference.
🧹 Nitpick comments (12)
Makefile (1)

246-248: Consider adding to .PHONY declaration.

The new integration_test_iceberg target follows the established pattern. However, the .PHONY declaration at line 3-8 includes other integration test targets but doesn't include integration_test_iceberg. While this may work correctly, adding it to .PHONY ensures consistent behavior with other targets.

♻️ Suggested addition to .PHONY

Update line 7 to include the new target:

-.PHONY: clean fmt check tidy \
-	generate-protobuf generate_mock \
-	cdc kafka_consumer storage_consumer pulsar_consumer filter_helper \
-	prepare_test_binaries \
-	unit_test_in_verify_ci integration_test_build integration_test_build_fast integration_test_mysql integration_test_kafka integration_test_storage integration_test_pulsar \
-	generate-next-gen-grafana
+.PHONY: clean fmt check tidy \
+	generate-protobuf generate_mock \
+	cdc kafka_consumer storage_consumer pulsar_consumer filter_helper \
+	prepare_test_binaries \
+	unit_test_in_verify_ci integration_test_build integration_test_build_fast integration_test_mysql integration_test_kafka integration_test_storage integration_test_pulsar integration_test_iceberg \
+	generate-next-gen-grafana
tests/integration_tests/README.md (1)

111-117: Fix markdown blockquote formatting.

The static analysis tool flagged a blank line inside the blockquote (between lines 111 and 113). Consider restructuring to avoid the blank line within the blockquote block.

📝 Suggested fix
-   > `Iceberg sink` can be tested with `make integration_test_iceberg CASE=iceberg_append_basic`.
-
-   > Iceberg Spark readback (optional): set `ICEBERG_SPARK_READBACK=1` and provide Iceberg Spark runtime via
+   > `Iceberg sink` can be tested with `make integration_test_iceberg CASE=iceberg_append_basic`.
+   >
+   > Iceberg Spark readback (optional): set `ICEBERG_SPARK_READBACK=1` and provide Iceberg Spark runtime via
cmd/iceberg-maintenance/main.go (1)

51-55: Consider wrapping AWS SDK errors with stack traces.

Per coding guidelines, errors from third-party or library calls should be wrapped with errors.Trace(err) or errors.WrapError(...) to attach a stack trace. While this is a CLI tool where the impact is limited, wrapping these errors would provide better debugging context.

♻️ Suggested fix

Add the import and wrap errors:

 import (
 	"context"
 	"flag"
 	"fmt"
 	"os"
 	"sort"
 	"strings"

 	"github.com/aws/aws-sdk-go-v2/aws"
 	awsconfig "github.com/aws/aws-sdk-go-v2/config"
 	"github.com/aws/aws-sdk-go-v2/service/glue"
 	gluetypes "github.com/aws/aws-sdk-go-v2/service/glue/types"
+	"github.com/pingcap/ticdc/pkg/errors"
 )

Then update error handling:

 	cfg, err := awsconfig.LoadDefaultConfig(ctx, opts...)
 	if err != nil {
-		fmt.Fprintf(os.Stderr, "load aws config failed: %v\n", err)
+		fmt.Fprintf(os.Stderr, "load aws config failed: %v\n", errors.Trace(err))
 		os.Exit(1)
 	}
 	names, err := listIcebergGlueTables(ctx, client, strings.TrimSpace(*database))
 	if err != nil {
-		fmt.Fprintf(os.Stderr, "list glue tables failed: %v\n", err)
+		fmt.Fprintf(os.Stderr, "list glue tables failed: %v\n", errors.Trace(err))
 		os.Exit(1)
 	}

As per coding guidelines: "When an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err) or errors.WrapError(...) to attach a stack trace."

Also applies to: 63-67

pkg/sink/iceberg/avro.go (1)

29-51: Wrap errors from the goavro library.

Per coding guidelines, errors from third-party library calls should be wrapped with errors.Trace(err) to attach a stack trace for better debugging.

♻️ Suggested fix
 import (
 	"bytes"

 	"github.com/linkedin/goavro/v2"
+	"github.com/pingcap/ticdc/pkg/errors"
 )

 func writeOCF(schema string, meta map[string][]byte, compressionName string, records []any) ([]byte, error) {
 	codec, err := goavro.NewCodec(schema)
 	if err != nil {
-		return nil, err
+		return nil, errors.Trace(err)
 	}

 	buf := bytes.NewBuffer(nil)
 	writer, err := goavro.NewOCFWriter(goavro.OCFConfig{
 		W:               buf,
 		Codec:           codec,
 		CompressionName: compressionName,
 		MetaData:        meta,
 	})
 	if err != nil {
-		return nil, err
+		return nil, errors.Trace(err)
 	}

 	if len(records) > 0 {
 		if err := writer.Append(records); err != nil {
-			return nil, err
+			return nil, errors.Trace(err)
 		}
 	}
 	return buf.Bytes(), nil
 }

As per coding guidelines: "When an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err) or errors.WrapError(...) to attach a stack trace."

pkg/sink/iceberg/schema_validation.go (1)

54-79: Minor: Redundant whitespace trimming and empty switch case.

The function trims whitespace on line 55, but the caller validateSchemaEvolution already trims the types before passing them in (lines 38-39). The empty default: case on lines 64-65 can be removed since the function falls through to the decimal check regardless.

♻️ Suggested cleanup
 func isSafeIcebergTypeWidening(oldType string, newType string) bool {
-	if strings.TrimSpace(oldType) == strings.TrimSpace(newType) {
+	if oldType == newType {
 		return true
 	}
 
 	switch oldType {
 	case "int":
 		return newType == "long"
 	case "float":
 		return newType == "double"
-	default:
 	}
pkg/sink/iceberg/checkpoint.go (2)

121-150: checkpointTableErr is never set, making the error return path dead code.

The sync.Once block on lines 126-147 only assigns w.checkpointTableInfo but never assigns w.checkpointTableErr. The returned error on line 149 will always be nil. If initialization can fail (e.g., if common.WrapTableInfo could return an error in the future), consider capturing and returning the error properly.

♻️ Suggested fix if error handling is needed

If WrapTableInfo cannot fail, remove the error field from the struct. Otherwise:

 	w.checkpointOnce.Do(func() {
 		ft := func() types.FieldType {
 			return *types.NewFieldType(mysql.TypeVarchar)
 		}
-		w.checkpointTableInfo = common.WrapTableInfo(checkpointSchemaName, &timodel.TableInfo{
+		w.checkpointTableInfo, w.checkpointTableErr = common.WrapTableInfo(checkpointSchemaName, &timodel.TableInfo{
 			// ...
 		})
 	})

152-175: Same issue: globalCheckpointTableErr is never set.

Similar to getCheckpointTableInfo, the globalCheckpointTableErr field is never assigned within the sync.Once block, so line 174 always returns nil.

pkg/sink/iceberg/upsert_table.go (1)

147-154: Redundant safety check for equality deletes.

Line 66 already validates isSafeForEqualityDeletes and returns an error if unsafe. The check on line 147 is therefore redundant since we wouldn't reach this code path if the partition spec wasn't safe. Consider removing the redundant branch or adding a comment explaining why it's kept defensively.

pkg/config/sink.go (1)

707-764: Add validate() method to IcebergConfig in pkg/config/sink.go for consistency and early validation.

Other sink configs like PulsarConfig (line 658) and CSVConfig (line 274) have validation methods called during SinkConfig.validateAndAdjust(). While validation logic for IcebergConfig exists in pkg/sink/iceberg/config.go's Apply() method, it should be integrated into pkg/config/sink.go to validate during changefeed creation, consistent with the pattern for other sinks. This validates:

  • Mode: "append" or "upsert"
  • Catalog: "hadoop", "glue", or "rest"
  • SchemaMode: "strict" or "evolve"
  • CommitInterval: valid duration string
tests/integration_tests/_utils/iceberg_spark_sql_scalar (1)

56-64: Guard against both ICEBERG_SPARK_PACKAGES and ICEBERG_SPARK_JARS being set.

Right now ICEBERG_SPARK_PACKAGES silently wins when both are set, which can mask misconfiguration. Consider failing fast with an explicit error to avoid ambiguous runs.

Suggested update
 sparkArgs=()
-if [[ -n "${ICEBERG_SPARK_PACKAGES:-}" ]]; then
+if [[ -n "${ICEBERG_SPARK_PACKAGES:-}" && -n "${ICEBERG_SPARK_JARS:-}" ]]; then
+	echo "Set only one of ICEBERG_SPARK_PACKAGES or ICEBERG_SPARK_JARS." >&2
+	exit 1
+fi
+if [[ -n "${ICEBERG_SPARK_PACKAGES:-}" ]]; then
 	sparkArgs+=(--packages "${ICEBERG_SPARK_PACKAGES}")
 elif [[ -n "${ICEBERG_SPARK_JARS:-}" ]]; then
 	sparkArgs+=(--jars "${ICEBERG_SPARK_JARS}")
 else
tests/integration_tests/iceberg_upsert_basic/run.sh (1)

1-4: Add pipefail to surface pipeline failures.

set -e won’t fail a pipeline if an earlier command errors (e.g., ls | sort | tail). Enabling pipefail makes test failures more visible.

♻️ Suggested change
-set -eu
+set -euo pipefail
pkg/sink/iceberg/hadoop_table_test.go (1)

277-378: Make manifest-list assertions order-agnostic.

If entry ordering changes, these index-based checks can become flaky. Consider asserting by content instead of position.

♻️ Suggested change
-	require.Equal(t, int32(0), records[0]["content"])
-	require.Equal(t, int64(1), records[0]["sequence_number"])
-	require.Equal(t, int64(1), records[0]["min_sequence_number"])
-
-	require.Equal(t, int32(1), records[1]["content"])
-	require.Equal(t, int64(1), records[1]["sequence_number"])
-	require.Equal(t, int64(0), records[1]["min_sequence_number"])
+	var dataRec, deleteRec map[string]any
+	for _, rec := range records {
+		switch rec["content"].(int32) {
+		case dataFileContentData:
+			dataRec = rec
+		case dataFileContentEqualityDeletes:
+			deleteRec = rec
+		}
+	}
+	require.NotNil(t, dataRec)
+	require.NotNil(t, deleteRec)
+	require.Equal(t, int64(1), dataRec["sequence_number"])
+	require.Equal(t, int64(1), dataRec["min_sequence_number"])
+	require.Equal(t, int64(1), deleteRec["sequence_number"])
+	require.Equal(t, int64(0), deleteRec["min_sequence_number"])
As per coding guidelines: Use unit test files named `*_test.go` in Go; favor deterministic tests and use `testify/require`.

Signed-off-by: BornChanger <dawn_catcher@126.com>
@BornChanger BornChanger changed the title cdc: to export to iceberg cdc: implement native Iceberg sink for changefeeds Feb 1, 2026
Signed-off-by: BornChanger <dawn_catcher@126.com>
Signed-off-by: BornChanger <dawn_catcher@126.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@go.mod`:
- Line 13: The go.mod pins "github.com/apache/arrow-go/v18" to v18.0.0 (and a
replace to the fork joechenrh/arrow-go/v18 at a v18.0.0 pseudo-version); update
the required version to the latest stable (v18.5.1) and either remove or update
the replace directive to the corresponding fork version if the fork is still
needed, then run "go get github.com/apache/arrow-go/v18@v18.5.1" and "go mod
tidy" to refresh dependencies and run tests; if the replace to the fork is
intentional and must remain, add a concise inline comment next to the
replace/require explaining why the fork is required (e.g., bugfix/feature not
upstreamed) and the minimum version constraint.

Signed-off-by: BornChanger <dawn_catcher@126.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@pkg/sink/iceberg/config.go`:
- Around line 139-358: The code currently always injects defaultPartitioning
when Partitioning is empty, preventing users from explicitly setting an
unpartitioned table; add a local boolean (e.g., partitioningSpecified) and set
it when the partitioning key is provided by the user (for query params use
presence check: _, ok := query["partitioning"]; for sink config set the flag
when the IcebergConfig explicitly contains the Partitioning field), and then
change the final defaulting condition to apply defaultPartitioning only when
partitioningSpecified is false (instead of when Partitioning is empty) while
preserving the existing check for c.EmitMetadataColumns and c.Mode == ModeAppend
and references to Partitioning, EmitMetadataColumns, ModeAppend, and
defaultPartitioning.

In `@pkg/sink/iceberg/hadoop_table.go`:
- Around line 1057-1094: The function loadTableMetadataFromHint currently
returns nil when version-hint.text is missing, which orphaned tables; update it
to fall back to scanning the metadataDirRel for the highest v*.metadata.json and
load that version instead of returning nil. Implement a helper (e.g.,
findLatestMetadataVersion or loadLatestMetadataFromDir) that lists files under
metadataDirRel (using the storage API's directory listing/WalkDir), parses
filenames matching the regexp `^v(\d+)\.metadata\.json$`, selects the largest
version, reads and unmarshals that metadata into tableMetadata (set
SelfMetadataLocation via joinLocation) and return it from
loadTableMetadataFromHint; keep the existing hint-path flow when version-hint
exists and only invoke the helper when the hint file is absent or unreadable.
🧹 Nitpick comments (1)
pkg/sink/iceberg/upsert_table.go (1)

88-214: Optional: skip empty commits when there’s nothing to write.

If both dataRows and deleteRows are empty, you can return early to avoid creating a no-op snapshot and extra metadata churn.

♻️ Suggested tweak
 	if len(equalityFieldIDs) == 0 {
 		return nil, cerror.ErrSinkURIInvalid.GenWithStackByArgs("equality field ids are empty")
 	}
+	if len(dataRows) == 0 && len(deleteRows) == 0 {
+		return nil, nil
+	}

Signed-off-by: BornChanger <dawn_catcher@126.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@log.txt`:
- Around line 1-12: The test TestAppendChangelogPartitionsByCommitTimeDay is
failing because only one manifest entry is produced instead of two; inspect and
fix the partitioning and manifest generation path (functions/methods involved in
computing partition keys and creating manifests — e.g., the partition key
computation used by AppendChangelog/appendChangelog, any computePartition or
partitionForRecord function, and the manifest creation logic such as
generateManifests/createManifest). Ensure records are grouped by the
commit_time_day partition key correctly (computePartition/partitionForRecord
returns distinct keys for different commit days) and that
generateManifests/createManifest creates distinct manifest entries per partition
group rather than merging them; update grouping logic and manifest emission so
the TestAppendChangelogPartitionsByCommitTimeDay produces two manifest entries.
🧹 Nitpick comments (2)
docs/design/2026-02-01-ticdc-iceberg-upsert-optimization-plan.md (1)

9-18: Optional: render task list as Markdown checkboxes.
Use - [ ] so GitHub renders these as interactive task items.

Suggested edit
- [ ] Review design docs and upsert code path; identify the highest-cost conversion and metadata steps.
+ - [ ] Review design docs and upsert code path; identify the highest-cost conversion and metadata steps.
pkg/sink/iceberg/upsert_table.go (1)

69-165: Remove redundant equality-delete safety check.
isSafeForEqualityDeletes is already enforced earlier, so the else branch here is dead code. Consider simplifying.

Suggested simplification
-		if resolvedSpec.isSafeForEqualityDeletes(equalityFieldIDs) {
-			deleteGroups, groupErr = resolvedSpec.groupRows(deleteRows)
-		} else {
-			deleteGroups = []partitionGroup{{partition: resolvedSpec.emptyPartitionRecord(), rows: deleteRows}}
-		}
+		deleteGroups, groupErr = resolvedSpec.groupRows(deleteRows)

Signed-off-by: BornChanger <dawn_catcher@126.com>
Signed-off-by: BornChanger <dawn_catcher@126.com>
Signed-off-by: BornChanger <dawn_catcher@126.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @.github/workflows/integration_test_iceberg.yaml:
- Around line 64-73: The yq download step writes directly to /usr/local/bin/yq
without sudo, causing permission failures on GitHub runners; update the step
that sets YQ_VERSION and the curl command that outputs to /usr/local/bin/yq (and
its subsequent chmod) to use elevated permissions — e.g., download to /tmp or
use sudo when writing/moving to /usr/local/bin and ensure sudo chmod +x
/usr/local/bin/yq is used so the yq install matches the oras install pattern
(refer to ORAS_VERSION, YQ_VERSION and the curl -> /usr/local/bin/yq
invocation).
🧹 Nitpick comments (1)
.github/workflows/integration_test_iceberg.yaml (1)

86-109: Consider pinning the external script for supply chain security.

The download script is fetched from an external repository at runtime (line 89). For improved supply chain security, consider either vendoring the script or verifying its checksum.

💡 Example checksum verification
          script="${RUNNER_TEMP}/download_pingcap_oci_artifact.sh"
          curl -fsSL "https://raw.githubusercontent.com/PingCAP-QE/ci/main/scripts/artifacts/download_pingcap_oci_artifact.sh" -o "$script"
+         echo "<expected_sha256>  $script" | sha256sum -c -
          chmod +x "$script"

@BornChanger BornChanger force-pushed the ticdc-sync-iceberg branch 2 times, most recently from 150fa2a to 38fa40e Compare February 2, 2026 08:39
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @.github/workflows/integration_test_iceberg.yaml:
- Around line 64-74: Update the hardcoded tool versions in the GitHub Actions
step that installs OCI download tools by bumping ORAS_VERSION and YQ_VERSION to
current stable releases; specifically edit the block that defines
ORAS_VERSION="1.2.0" and YQ_VERSION="4.44.3" in the step named "Install OCI
download tools" and set them to the newer stable tags (e.g.,
ORAS_VERSION="1.3.0" and YQ_VERSION="4.52.2" or whatever the latest stable
release is at commit time), then verify the download URLs and filenames still
match the new release artifacts used by the curl/tar/mv commands (the variables
ORAS_VERSION and YQ_VERSION are the unique symbols to change).
🧹 Nitpick comments (1)
.github/workflows/integration_test_iceberg.yaml (1)

93-104: Inline Python patch is fragile and may break silently if upstream script changes.

The script patches a specific string in the downloaded script. If the upstream download_pingcap_oci_artifact.sh changes this line, the workflow will fail with a cryptic error. Consider adding a more informative error message or tracking an issue to upstream the fix.

💡 Suggested improvement
          if needle not in text:
-             raise SystemExit("failed to patch download script: target not found")
+             raise SystemExit(
+                 "failed to patch download script: target string not found.\n"
+                 f"Expected: {needle!r}\n"
+                 "The upstream script may have changed. Please review and update the patch."
+             )
          path.write_text(text.replace(needle, replacement))

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@tests/integration_tests/iceberg_upsert_basic/run.sh`:
- Around line 8-14: The script reads the positional arg into SINK_TYPE using
SINK_TYPE=$1 which will fail under set -u if no argument is supplied; modify the
top of the script to safely default/validate the positional parameter before
using it (e.g., check if $1 is set or use a parameter expansion default) and
emit the same "skip iceberg integration test..." message and exit when no valid
sink type is provided so SINK_TYPE is always defined when referenced.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@tests/integration_tests/iceberg_append_basic/run.sh`:
- Around line 1-14: Run shfmt -w on the script to fix formatting (e.g., run
shfmt -w tests/integration_tests/iceberg_append_basic/run.sh) and update the
script to consistently quote variables to avoid word-splitting (quote
occurrences of CUR, BASH_SOURCE, OUT_DIR, TEST_NAME, CDC_BINARY, and SINK_TYPE
in run.sh; e.g., use "$CUR" and "$SINK_TYPE"). Ensure set -eu remains and the
behavior of the SINK_TYPE check is unchanged.
🧹 Nitpick comments (1)
tests/integration_tests/iceberg_append_basic/run.sh (1)

44-84: Consider extracting duplicated utilities to _utils.

Both wait_file_exists and wait_changefeed_table_assigned are identical to the implementations in iceberg_upsert_basic/run.sh. Extracting these to a shared utility file (e.g., tests/integration_tests/_utils/iceberg_utils) would reduce duplication and simplify maintenance across Iceberg integration tests.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@tests/integration_tests/iceberg_upsert_basic/run.sh`:
- Around line 1-186: The script fails shfmt checks; run shfmt (e.g., shfmt -w)
on the file to normalize indentation, spacing, and quoting throughout, then
commit the formatted file; pay special attention to the function blocks prepare,
wait_file_exists, wait_changefeed_table_assigned, and iceberg_check_upsert_basic
so their braces, indentation, and compound commands (if/curl loops and
heredoc-like multiline commands) are formatted consistently and any stray inline
commands (e.g., the processLogger-like single-line commands and concatenated
statements) are placed on separate lines to satisfy shfmt.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@tests/integration_tests/iceberg_append_basic/run.sh`:
- Around line 3-14: The script uses "set -u" so referencing the positional
parameter as SINK_TYPE="$1" can fail when no arg is passed; change the logic to
first guard the positional argument (e.g., test $# -lt 1 or use a default via
SINK_TYPE="${1:-}") before the comparison, then perform the if [ "$SINK_TYPE" !=
"iceberg" ] check; update the SINK_TYPE assignment and the conditional in run.sh
to avoid an unbound-variable error when no argument is provided.

Signed-off-by: BornChanger <dawn_catcher@126.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-triage-completed first-time-contributor Indicates that the PR was contributed by an external member and is a first-time contributor. release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

ticdc natively syncs to apache iceberg

1 participant