cdc: implement native Iceberg sink for changefeeds#4100
cdc: implement native Iceberg sink for changefeeds#4100BornChanger wants to merge 17 commits intopingcap:masterfrom
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughAdds a native Apache Iceberg sink end-to-end: new sink implementation, config and scheme wiring, TableWriter (append/upsert), Glue/Hadoop catalog integrations, Parquet/Avro encoding, partitioning, checkpoints, CLI tools, metrics, tests, CI workflow, and integration-test tooling. Changes are additive. Changes
Sequence Diagram(s)sequenceDiagram
participant TiDB
participant TiCDC as TiCDC
participant Sink as IcebergSink
participant Storage as ObjectStorage
participant Glue as GlueCatalog
participant Checkpoint as CheckpointTable
TiDB->>TiCDC: DML events (I/U/D)
TiCDC->>Sink: AddDMLEvent
activate Sink
Sink->>Sink: Buffer & convert to ChangeRow
Sink->>Sink: Partitioning & collapse upserts
Sink->>Sink: Encode Parquet & build manifests
Sink->>Storage: Write data files
Sink->>Storage: Write manifests & manifest list (Avro OCF)
Sink->>Storage: Persist metadata.json (snapshot)
alt Glue catalog
Sink->>Glue: Update metadata_location
end
Sink->>Checkpoint: Record checkpoint(s)
Sink-->>TiCDC: CommitResult (snapshot id, committed ts)
deactivate Sink
sequenceDiagram
participant TiCDC as TiCDC
participant Sink as IcebergSink (Upsert)
participant Partitioner as Partitioner
participant Storage as ObjectStorage
participant Glue as GlueCatalog
TiCDC->>Sink: AddDMLEvent
activate Sink
Sink->>Partitioner: compute partition keys & equality deletes
Partitioner-->>Sink: partitioned batches
loop CommitInterval
Sink->>Sink: encode data -> Parquet
Sink->>Storage: upload data files
Sink->>Sink: encode delete files -> Parquet
Sink->>Storage: upload delete files
Sink->>Sink: build manifests + manifest list
Sink->>Storage: persist metadata.json
alt Glue catalog
Sink->>Glue: update metadata_location
end
Sink-->>TiCDC: CommitResult
end
deactivate Sink
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120+ minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
Note
Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.
🤖 Fix all issues with AI agents
In `@downstreamadapter/sink/iceberg/sink.go`:
- Around line 1188-1191: The enum and set branches currently serialize numeric
indices (using strconv.FormatUint on d.GetMysqlEnum().Value /
d.GetMysqlSet().Value) but must produce the semantic string names; replace those
conversions by first extracting the numeric value (e.g., v :=
d.GetMysqlEnum().Value or v := d.GetMysqlSet().Value) and then call
types.ParseEnumValue(ft.GetElems(), v) for mysql.TypeEnum and
types.ParseSetValue(ft.GetElems(), v) for mysql.TypeSet, assigning the returned
string to value (handling any returned error as appropriate) so the output
matches the declared Iceberg string schema.
In `@pkg/sink/iceberg/hadoop_table_test.go`:
- Line 1: This new file is missing the standard PingCAP Apache license header;
add the project's canonical Apache-2.0/PingCAP header comment block at the very
top of the file (above the package iceberg line), matching the same wording and
year/owner format used in other repo files so the check-copyright CI passes.
In `@pkg/sink/iceberg/partitioning.go`:
- Line 1: This new file (partitioning.go) is missing the standard PingCAP Apache
header required by the repository; add the project's canonical Apache license
header comment at the very top of pkg/sink/iceberg/partitioning.go above the
package iceberg declaration so the file includes the required copyright and
license block.
- Around line 515-705: The stdlib parse errors returned directly must be wrapped
with cerror.WrapError(cerror.ErrSinkURIInvalid, err) to preserve stack traces;
update the error returns at each parse call site: in parsePartitionTimeValue
wrap the error from time.ParseInLocation, in parseIdentityPartitionValue wrap
errors from strconv.ParseInt, strconv.ParseFloat, strconv.ParseBool and
time.ParseInLocation, in truncatePartitionValue wrap errors from
strconv.ParseInt, in bucketHashBytes wrap errors from strconv.ParseInt, and also
wrap the error in parseEpochDaysString from strconv.ParseInt—replace direct err
returns with cerror.WrapError(cerror.ErrSinkURIInvalid, err) at each of these
unique call sites.
In `@pkg/sink/iceberg/type_mapping_test.go`:
- Line 1: Add the standard PingCAP Apache license header to the top of the new
test file type_mapping_test.go (package iceberg) so it matches the project's
required header format; insert the full multi-line Apache license comment block
exactly as used in other repository files above the "package iceberg"
declaration to satisfy the CI check-copyright.
🟡 Minor comments (28)
pkg/sink/iceberg/equality.go-1-8 (1)
1-8:⚠️ Potential issue | 🟡 MinorAdd missing copyright header.
The pipeline failure indicates this file is missing the required copyright header. Add it at the top of the file.
Proposed fix
+// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package icebergpkg/sink/iceberg/avro.go-1-7 (1)
1-7:⚠️ Potential issue | 🟡 MinorAdd missing copyright header.
The pipeline failure indicates this file is missing the required copyright header.
Proposed fix
+// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package iceberg import ( "bytes" + "github.com/pingcap/errors" "github.com/linkedin/goavro/v2" )pkg/sink/iceberg/type_mapping.go-1-1 (1)
1-1:⚠️ Potential issue | 🟡 MinorAdd the required copyright header.
The pipeline check failed because this file is missing the standard copyright header.
pkg/sink/iceberg/manifest_entry_schema.go-1-1 (1)
1-1:⚠️ Potential issue | 🟡 MinorAdd the required copyright header.
The pipeline check failed because this file is missing the standard copyright header.
pkg/sink/iceberg/iceberg_schemas.go-1-1 (1)
1-1:⚠️ Potential issue | 🟡 MinorAdd the required copyright header.
The pipeline check failed because this file is missing the standard copyright header.
cmd/iceberg-maintenance/main.go-1-1 (1)
1-1:⚠️ Potential issue | 🟡 MinorAdd the required copyright header.
The pipeline check failed because this file is missing the standard copyright header. Add the appropriate copyright notice at the top of the file.
pkg/sink/iceberg/schema_validation.go-1-1 (1)
1-1:⚠️ Potential issue | 🟡 MinorAdd the required copyright header.
The pipeline check failed because this file is missing the standard copyright header.
pkg/sink/iceberg/config.go-1-1 (1)
1-1:⚠️ Potential issue | 🟡 MinorAdd the required copyright header.
The pipeline indicates that the copyright information is missing.
Proposed fix
+// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package icebergpkg/sink/iceberg/glue_catalog.go-388-391 (1)
388-391:⚠️ Potential issue | 🟡 MinorAvoid double-wrapping errors with
cerror.Trace.Same issue as in
ensureGlueTable- the error is already wrapped inside the retry function.Proposed fix
retry.WithIsRetryableErr(isGlueRetryableError), ) - return cerror.Trace(err) + return err }pkg/sink/iceberg/hadoop_table.go-1-1 (1)
1-1:⚠️ Potential issue | 🟡 MinorAdd the required copyright header.
The pipeline indicates that the copyright information is missing.
Proposed fix
+// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package icebergpkg/metrics/iceberg.go-1-1 (1)
1-1:⚠️ Potential issue | 🟡 MinorAdd the required copyright header.
The pipeline indicates that the copyright information is missing.
Proposed fix
+// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package metricspkg/sink/iceberg/glue_catalog.go-1-1 (1)
1-1:⚠️ Potential issue | 🟡 MinorAdd the required copyright header.
The pipeline indicates that the copyright information is missing.
Proposed fix
+// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package icebergpkg/sink/iceberg/glue_catalog.go-227-230 (1)
227-230:⚠️ Potential issue | 🟡 MinorAvoid double-wrapping errors with
cerror.Trace.The error returned from
retry.Dois already wrapped withcerror.Traceinside the retry function (lines 119, 133, 144, 165, 219). Wrapping again at line 229 adds redundant stack traces. As per coding guidelines, upstream callers should propagate wrapped errors without wrapping again.Proposed fix
retry.WithIsRetryableErr(isGlueRetryableError), ) - return cerror.Trace(err) + return err }pkg/sink/iceberg/hadoop_table.go-880-902 (1)
880-902:⚠️ Potential issue | 🟡 MinorWrap storage read operation errors.
Errors from
w.storage.FileExistsandw.storage.ReadFilecalls should be wrapped withcerror.Trace.Proposed fix
exists, err := w.storage.FileExists(ctx, hintRel) if err != nil { - return 0, nil, err + return 0, nil, cerror.Trace(err) } if !exists { return 0, nil, nil } hintBytes, err := w.storage.ReadFile(ctx, hintRel) if err != nil { - return 0, nil, err + return 0, nil, cerror.Trace(err) }metadataBytes, err := w.storage.ReadFile(ctx, metadataRel) if err != nil { - return 0, nil, err + return 0, nil, cerror.Trace(err) }pkg/sink/iceberg/file_split.go-1-1 (1)
1-1:⚠️ Potential issue | 🟡 MinorAdd the required copyright header.
The pipeline indicates that the copyright information is missing.
Proposed fix
+// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package icebergpkg/sink/iceberg/parquet.go-1-1 (1)
1-1:⚠️ Potential issue | 🟡 MinorAdd the required copyright header.
The pipeline indicates that the copyright information is missing. Add the standard Apache 2.0 license header at the top of the file.
Proposed fix
+// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package icebergpkg/sink/iceberg/parquet.go-371-374 (1)
371-374:⚠️ Potential issue | 🟡 MinorWrap
time.Parseerror.The error from
time.Parseshould be wrapped to provide better stack traces for debugging.Proposed fix
t, err := time.Parse(time.RFC3339Nano, raw) if err != nil { - return err + return cerror.Trace(err) }pkg/sink/iceberg/hadoop_table.go-538-540 (1)
538-540:⚠️ Potential issue | 🟡 MinorWrap storage operation errors with
cerror.Trace.Errors from
w.storage.WriteFilecalls are returned unwrapped. Based on coding guidelines, errors from external calls should be wrapped to attach stack traces for debugging.Example fix for one occurrence (apply similar pattern to others)
if err := w.storage.WriteFile(ctx, dataRel, dataBytes); err != nil { - return nil, err + return nil, cerror.Trace(err) }Also applies to: 608-610, 663-665, 789-791
pkg/sink/iceberg/parquet.go-186-191 (1)
186-191:⚠️ Potential issue | 🟡 MinorWrap third-party library error with
cerror.Trace.The error from
pqarrow.WriteTableis a third-party library call and should be wrapped to attach a stack trace. As per coding guidelines, errors from library calls should be wrapped immediately.Proposed fix
props := parquet.NewWriterProperties(parquet.WithCompression(compress.Codecs.Zstd)) if err := pqarrow.WriteTable(tbl, &buf, int64(len(rows)), props, pqarrow.DefaultWriterProps()); err != nil { - return nil, err + return nil, cerror.Trace(err) }pkg/sink/iceberg/parquet.go-352-355 (1)
352-355:⚠️ Potential issue | 🟡 MinorWrap
strconv.ParseUinterror.The error from
strconv.ParseUintis not wrapped. Based on coding guidelines, errors from library calls should be wrapped witherrors.TraceorWrapError.Proposed fix
u, err := strconv.ParseUint(raw, 10, 64) if err != nil { - return err + return cerror.Trace(err) }tests/integration_tests/iceberg_append_basic/run.sh-140-141 (1)
140-141:⚠️ Potential issue | 🟡 MinorUse
"$@"instead of$*to prevent whitespace issues.Same issue as in the upsert test script.
Proposed fix
-prepare $* -iceberg_check_append_basic $* +prepare "$@" +iceberg_check_append_basic "$@"tests/integration_tests/iceberg_upsert_basic/run.sh-144-145 (1)
144-145:⚠️ Potential issue | 🟡 MinorUse
"$@"instead of$*to prevent whitespace issues.The static analysis correctly identifies that
$*can cause problems with arguments containing whitespace.Proposed fix
-prepare $* -iceberg_check_upsert_basic $* +prepare "$@" +iceberg_check_upsert_basic "$@"pkg/sink/iceberg/ddl.go-1-11 (1)
1-11:⚠️ Potential issue | 🟡 MinorAdd missing copyright header.
The pipeline failure indicates this file is missing the required copyright header. Add the standard TiCDC copyright notice at the top of the file.
Proposed fix
+// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package icebergcmd/iceberg-bootstrap/main.go-1-27 (1)
1-27:⚠️ Potential issue | 🟡 MinorAdd missing copyright header.
The pipeline failure indicates this file is missing the required copyright header.
Proposed fix
+// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package mainpkg/sink/iceberg/checkpoint.go-1-14 (1)
1-14:⚠️ Potential issue | 🟡 MinorAdd missing copyright header.
The pipeline failure indicates this file is missing the required copyright header.
Proposed fix
+// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package icebergpkg/sink/iceberg/upsert_table.go-1-13 (1)
1-13:⚠️ Potential issue | 🟡 MinorAdd missing copyright header.
The pipeline failure indicates this file is missing the required copyright header.
Proposed fix
+// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package icebergpkg/sink/iceberg/partitioning_test.go-1-1 (1)
1-1:⚠️ Potential issue | 🟡 MinorMissing copyright header causes CI failure.
The pipeline reports
check-copyright: The copyright information of this file is incorrect. Add the standard copyright header at the top of the file to resolve the CI failure.Proposed fix
+// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package icebergdownstreamadapter/sink/iceberg/sink.go-1-1 (1)
1-1:⚠️ Potential issue | 🟡 MinorMissing copyright header causes CI failure.
The pipeline reports
check-copyright: The copyright information of this file is incorrect. Add the standard copyright header at the top of the file.Proposed fix
+// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package iceberg
🧹 Nitpick comments (18)
pkg/sink/iceberg/equality.go (1)
13-19: Consider using a more semantically appropriate error type.
ErrSinkURIInvalidis intended for sink URI parsing/validation errors, but these errors relate to table schema validation (nil table info, missing handle key columns). Using it here makes error messages confusing for operators debugging issues.Consider defining or using a more appropriate error type (e.g.,
ErrIcebergTableSchemaor similar) that better describes the actual problem domain.tests/integration_tests/README.md (1)
111-116: Fix blockquote formatting to resolve markdown linting warning.The blank line between blockquotes (line 112) triggers a markdown lint warning. Consider consolidating into a single blockquote block.
Proposed fix
- > `Iceberg sink` can be tested with `make integration_test_iceberg CASE=iceberg_append_basic`. - - > Iceberg Spark readback (optional): set `ICEBERG_SPARK_READBACK=1` and provide Iceberg Spark runtime via + > `Iceberg sink` can be tested with `make integration_test_iceberg CASE=iceberg_append_basic`. + > + > Iceberg Spark readback (optional): set `ICEBERG_SPARK_READBACK=1` and provide Iceberg Spark runtime viapkg/sink/iceberg/avro.go (1)
16-38: Wrap third-party library errors with stack traces.Per coding guidelines, errors from third-party library calls should be wrapped with
errors.Trace(err)to attach stack traces for debugging.Proposed fix
func writeOCF(schema string, meta map[string][]byte, compressionName string, records []any) ([]byte, error) { codec, err := goavro.NewCodec(schema) if err != nil { - return nil, err + return nil, errors.Trace(err) } buf := bytes.NewBuffer(nil) writer, err := goavro.NewOCFWriter(goavro.OCFConfig{ W: buf, Codec: codec, CompressionName: compressionName, MetaData: meta, }) if err != nil { - return nil, err + return nil, errors.Trace(err) } if len(records) > 0 { if err := writer.Append(records); err != nil { - return nil, err + return nil, errors.Trace(err) } } return buf.Bytes(), nil }As per coding guidelines: "When an error comes from a third-party or library call in Go, wrap it immediately with
errors.Trace(err)orerrors.WrapError(...)to attach a stack trace."cmd/iceberg-maintenance/main.go (2)
101-103: Wrap the AWS API error for stack trace context.Per coding guidelines, errors from library/third-party calls should be wrapped with
errors.Trace(err)to attach a stack trace.Proposed fix
+ "github.com/pingcap/errors"if err != nil { - return nil, err + return nil, errors.Trace(err) }
62-87: Consider caching trimmed flag values to reduce repetition.
strings.TrimSpace(*sparkCatalog)andstrings.TrimSpace(*database)are called repeatedly. Trim once after parsing flags for cleaner code.Proposed refactor
flag.Parse() - if strings.TrimSpace(*database) == "" { + dbName := strings.TrimSpace(*database) + catalogName := strings.TrimSpace(*sparkCatalog) + + if dbName == "" { fmt.Fprintln(os.Stderr, "missing required flag: --database") os.Exit(2) }Then use
dbNameandcatalogNamethroughout.pkg/sink/iceberg/type_mapping.go (1)
96-105: Consider addingscale <= precvalidation for decimal parsing.The validation checks
prec > 0 && scale >= 0but doesn't verifyscale <= prec. While this would only matter for malformed input, adding this check would make the validation consistent withmapTiDBFieldType(line 46).Proposed fix
- if err1 == nil && err2 == nil && prec > 0 && scale >= 0 { + if err1 == nil && err2 == nil && prec > 0 && scale >= 0 && scale <= prec {pkg/sink/iceberg/manifest_entry_schema.go (1)
68-71: Wrap the JSON marshaling error.Per coding guidelines, errors from library calls should be wrapped with
errors.Trace(err)to attach a stack trace.Proposed fix
+ "github.com/pingcap/errors"schemaBytes, err := json.Marshal(schema) if err != nil { - return "", err + return "", errors.Trace(err) }pkg/sink/iceberg/schema_validation.go (1)
51-52: Remove the emptydefaultcase.An empty
defaultcase serves no purpose. Simply omit it for cleaner idiomatic Go.Proposed fix
case "float": return newType == "double" - default: }tests/integration_tests/_utils/iceberg_spark_sql_scalar (1)
126-129: Filtering lines with whitespace may skip valid scalar values.The filter
if (line ~ /[ \t]/) nextskips any line containing spaces or tabs. While this helps filter log lines, it would also skip legitimate scalar values containing spaces (e.g., string results). If this script is only used for numeric scalars (counts, sums), this is fine; otherwise, consider documenting this limitation.pkg/sink/iceberg/config.go (2)
264-273: Consider consolidating duplicate query parameter handling.The code handles both
schema-modeandschema_modequery parameters. While this provides compatibility, consider adding a comment explaining why both are supported to prevent future maintainers from removing one.Suggested documentation
+ // Support both hyphenated and underscored parameter names for compatibility if v := strings.TrimSpace(query.Get("schema-mode")); v != "" { if err := c.setSchemaMode(v); err != nil { return err } } + // Legacy parameter name support if v := strings.TrimSpace(query.Get("schema_mode")); v != "" {
383-396: Silent clamping of commit interval may surprise users.The
setCommitIntervalfunction silently clamps values outside the valid range instead of returning an error. Consider logging a warning when clamping occurs so users are aware their requested value was adjusted.Makefile (1)
3-8: Addintegration_test_icebergto.PHONYfor consistency.This keeps the target consistent with the other integration_test_* entries.
♻️ Suggested tweak
-.PHONY: clean fmt check tidy \ +.PHONY: clean fmt check tidy \ generate-protobuf generate_mock \ cdc kafka_consumer storage_consumer pulsar_consumer filter_helper \ prepare_test_binaries \ - unit_test_in_verify_ci integration_test_build integration_test_build_fast integration_test_mysql integration_test_kafka integration_test_storage integration_test_pulsar \ + unit_test_in_verify_ci integration_test_build integration_test_build_fast integration_test_mysql integration_test_kafka integration_test_storage integration_test_pulsar integration_test_iceberg \ generate-next-gen-grafanaAlso applies to: 246-247
pkg/sink/iceberg/upsert_table.go (1)
53-55: Redundant safety check for equality deletes.The
isSafeForEqualityDeletescheck at line 134 is redundant since the same condition was already validated at line 53, which returns an error if unsafe. The else branch (line 137) is unreachable dead code.Proposed simplification
- var ( - deleteGroups []partitionGroup - groupErr error - ) - if resolvedSpec.isSafeForEqualityDeletes(equalityFieldIDs) { - deleteGroups, groupErr = resolvedSpec.groupRows(deleteRows) - } else { - deleteGroups = []partitionGroup{{partition: resolvedSpec.emptyPartitionRecord(), rows: deleteRows}} - } - if groupErr != nil { - return nil, groupErr - } + deleteGroups, err := resolvedSpec.groupRows(deleteRows) + if err != nil { + return nil, err + }Also applies to: 134-138
tests/integration_tests/iceberg_upsert_basic/run.sh (1)
33-46: Consider extractingwait_file_existsto shared utilities.This helper function is duplicated verbatim in
iceberg_append_basic/run.sh. Consider moving it totests/integration_tests/_utils/for reuse.cmd/iceberg-bootstrap/main.go (2)
260-281: Consider wrapping errors from library calls.Per coding guidelines, errors from third-party or library calls should be wrapped with
errors.Trace(err)orerrors.WrapError(...)to attach stack traces. ThegetSnapshotPointfunction returns unwrapped errors fromdb.QueryRowContext,strconv.ParseUint, andtime.ParseInLocation.Proposed fix
func getSnapshotPoint(ctx context.Context, db *sql.DB) (uint64, string, error) { var tsoStr string if err := db.QueryRowContext(ctx, "select @@tidb_current_ts").Scan(&tsoStr); err != nil { - return 0, "", err + return 0, "", errors.Trace(err) } tsoStr = strings.TrimSpace(tsoStr) tso, err := strconv.ParseUint(tsoStr, 10, 64) if err != nil { - return 0, "", err + return 0, "", errors.Trace(err) } var tsStr string if err := db.QueryRowContext(ctx, "select TIDB_PARSE_TSO(?)", tsoStr).Scan(&tsStr); err != nil { - return 0, "", err + return 0, "", errors.Trace(err) } parsed, err := time.ParseInLocation("2006-01-02 15:04:05.999999", strings.TrimSpace(tsStr), time.UTC) if err != nil { - return 0, "", err + return 0, "", errors.Trace(err) } return tso, parsed.UTC().Format(time.RFC3339Nano), nil }Based on coding guidelines: "When an error comes from a third-party or library call in Go, wrap it immediately with
errors.Trace(err)orerrors.WrapError(...)to attach a stack trace."
283-296: Wrap errors insetTiDBSnapshotandclearTiDBSnapshot.These functions return unwrapped errors from standard library calls.
Proposed fix
func setTiDBSnapshot(ctx context.Context, db *sql.DB, snapshotTimeRFC3339 string) error { t, err := time.Parse(time.RFC3339Nano, snapshotTimeRFC3339) if err != nil { - return err + return errors.Trace(err) } snapshot := t.UTC().Format("2006-01-02 15:04:05.999999") _, err = db.ExecContext(ctx, "set @@tidb_snapshot = ?", snapshot) - return err + return errors.Trace(err) } func clearTiDBSnapshot(ctx context.Context, db *sql.DB) error { _, err := db.ExecContext(ctx, "set @@tidb_snapshot = ''") - return err + return errors.Trace(err) }Based on coding guidelines: "When an error comes from a third-party or library call in Go, wrap it immediately with
errors.Trace(err)."downstreamadapter/sink/iceberg/sink.go (2)
87-124: Consider extracting common validation logic.The validation logic for catalog and mode (lines 92-110) is duplicated in the
Newfunction (lines 137-155). Consider extracting to a helper function to reduce duplication.Suggested helper function
func validateConfig(cfg *sinkiceberg.Config) error { switch cfg.Catalog { case sinkiceberg.CatalogHadoop, sinkiceberg.CatalogGlue: default: return errors.ErrSinkURIInvalid.GenWithStackByArgs(fmt.Sprintf("iceberg catalog is not supported yet: %s", cfg.Catalog)) } if cfg.Catalog == sinkiceberg.CatalogGlue { warehouseURL, err := url.Parse(cfg.WarehouseURI) if err != nil { return errors.WrapError(errors.ErrSinkURIInvalid, err) } if warehouseURL.Scheme != "s3" { return errors.ErrSinkURIInvalid.GenWithStackByArgs("iceberg glue catalog requires a s3 warehouse") } } switch cfg.Mode { case sinkiceberg.ModeAppend, sinkiceberg.ModeUpsert: default: return errors.ErrSinkURIInvalid.GenWithStackByArgs(fmt.Sprintf("iceberg mode is not supported yet: %s", cfg.Mode)) } return nil }
436-451: Consider using a more appropriate error type for buffer overflow.
ErrSinkURIInvalidis semantically a configuration/validation error, but here it's used for runtime backpressure conditions. Consider using or creating a more appropriate error type likeErrBufferFullorErrBackpressureto better distinguish configuration errors from runtime overflow conditions.
Summary of ChangesHello @BornChanger, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a new Iceberg sink to TiCDC, allowing users to replicate TiDB changes into Apache Iceberg tables. It supports various catalog types, commit models, table modes, schema evolution, and operational features, along with accompanying tools and tests. The primary goal is to provide a native sink for Apache Iceberg, enabling continuous replication of TiDB changes into query-ready Iceberg tables. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This is an impressive and extensive pull request that introduces a new Iceberg sink to TiCDC. The implementation is comprehensive, covering data replication in both append and upsert modes, integration with Hadoop and AWS Glue catalogs, schema evolution, partitioning, and operational tooling. The code is well-structured and includes new metrics and integration tests. My main feedback is on the logic for collapsing transactions in upsert mode, which could be fragile due to its assumptions about event ordering. Overall, this is a significant and well-executed feature addition.
480089e to
61ff556
Compare
Signed-off-by: BornChanger <dawn_catcher@126.com>
61ff556 to
a92b0e6
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In @.github/workflows/integration_test_iceberg.yaml:
- Around line 38-52: Update the GitHub Actions steps to use the current v5 of
the checkout and cache actions: change the uses entries for actions/checkout@v3
and actions/cache@v3 to actions/checkout@v5 and actions/cache@v5 respectively,
leaving actions/setup-go@v6 unchanged; ensure the Cache Tools step (id:
cache-tools) keeps its existing inputs (path and key) so caching behavior
remains the same after the version bump.
In `@downstreamadapter/sink/iceberg/sink.go`:
- Around line 545-557: The current buildKey function concatenates key column
values with a fixed separator which can collide if a column contains that byte;
change buildKey (the anonymous func assigned to buildKey that takes
sinkiceberg.ChangeRow and iterates keyColumnNames) to encode each key part with
a length-prefix before the raw bytes (e.g., write the uint32/varint length then
the value bytes) instead of using a separator, keep the existing nil/missing
checks on row.Columns[name], and return the combined length-prefixed string so
keys are unambiguous and collision-free.
🧹 Nitpick comments (12)
pkg/sink/iceberg/schema_validation.go (1)
54-57: Redundant whitespace trimming.The input parameters
oldTypeandnewTypeare already trimmed by the caller at lines 38-39 before being passed toisSafeIcebergTypeWidening. The trim operation on line 55 is redundant.♻️ Suggested simplification
func isSafeIcebergTypeWidening(oldType string, newType string) bool { - if strings.TrimSpace(oldType) == strings.TrimSpace(newType) { + if oldType == newType { return true }tests/integration_tests/_utils/iceberg_spark_sql_scalar (1)
107-134: Clarify fallback scalar extraction behavior.The fallback AWK parser (lines 109-133) continuously reassigns
val=line, meaning it returns the last non-log, non-whitespace-containing line rather than the first. This works if the desired scalar appears at the end, but could be surprising if multiple candidate lines exist.Consider documenting this behavior or using
exitafter the first valid match if "first scalar" semantics are intended:♻️ Alternative: exit on first match
# Prefer scalar-like lines without whitespace. if (line ~ /[ \t]/) next - val=line + print line + exit } - END { print val } 'tests/integration_tests/README.md (1)
111-117: Fix blank line inside blockquote (markdownlint MD028).There's a blank line between blockquotes at line 112, which violates markdown best practices. Consider merging these into a single blockquote or removing the blank line.
Suggested fix
- > `Iceberg sink` can be tested with `make integration_test_iceberg CASE=iceberg_append_basic`. - - > Iceberg Spark readback (optional): set `ICEBERG_SPARK_READBACK=1` and provide Iceberg Spark runtime via + > `Iceberg sink` can be tested with `make integration_test_iceberg CASE=iceberg_append_basic`. + > + > Iceberg Spark readback (optional): set `ICEBERG_SPARK_READBACK=1` and provide Iceberg Spark runtime viapkg/config/sink.go (1)
707-764: Well-structured configuration with comprehensive field documentation.The
IcebergConfigstruct covers all necessary configuration aspects for the Iceberg sink. Consider adding avalidate()method similar toPulsarConfig.validate()to perform early validation of configuration values (e.g., valid mode values, valid catalog types, positive buffer limits).pkg/sink/iceberg/parquet.go (1)
311-324: Silent fallback on base64 decode failure may mask data issues.When
base64.StdEncoding.DecodeStringfails, the code silently uses the raw string bytes. This could hide upstream data corruption or encoding mismatches. Consider logging a warning or tracking a metric when this fallback occurs.Suggested improvement
case *array.BinaryBuilder: return func(v *string) error { if v == nil { builder.AppendNull() return nil } decoded, err := base64.StdEncoding.DecodeString(*v) if err != nil { + // Log at debug level to help troubleshoot encoding issues + // without failing the pipeline for non-base64 binary data. builder.Append([]byte(*v)) return nil } builder.Append(decoded) return nil }, nilpkg/sink/iceberg/upsert_table.go (1)
147-154: Redundant safety check for equality deletes.The
isSafeForEqualityDeletescheck at line 147 is redundant because the same check at line 66 already returns an error if the condition fails. The else branch at line 150 creatingdeleteGroupswith an empty partition is effectively unreachable for partitioned tables.If this is intentional as a defensive measure for future changes, consider adding a comment. Otherwise, simplify:
♻️ Suggested simplification
- var ( - deleteGroups []partitionGroup - groupErr error - ) - if resolvedSpec.isSafeForEqualityDeletes(equalityFieldIDs) { - deleteGroups, groupErr = resolvedSpec.groupRows(deleteRows) - } else { - deleteGroups = []partitionGroup{{partition: resolvedSpec.emptyPartitionRecord(), rows: deleteRows}} - } - if groupErr != nil { - return nil, groupErr - } + // Safe for equality deletes was validated at function entry + deleteGroups, err := resolvedSpec.groupRows(deleteRows) + if err != nil { + return nil, err + }pkg/sink/iceberg/checkpoint.go (1)
121-150: Unused error field in sync.Once pattern.The
checkpointTableErrfield is declared but never assigned in thesync.Onceblock. Ifcommon.WrapTableInfocan return an error, it should be captured. If it cannot fail, the error field can be removed.♻️ If WrapTableInfo cannot fail, simplify the return
- return w.checkpointTableInfo, w.checkpointTableErr + return w.checkpointTableInfo, nilOr if it can fail, capture the error inside the
Doblock.pkg/sink/iceberg/config.go (1)
396-409: Silent clamping of commit interval may surprise users.When the user provides an interval outside bounds, the value is silently adjusted. Consider logging a warning so users are aware their configuration was modified.
♻️ Add warning log for clamped values
func (c *Config) setCommitInterval(intervalStr string) error { interval, err := time.ParseDuration(intervalStr) if err != nil { return cerror.WrapError(cerror.ErrSinkURIInvalid, err) } if interval < minCommitInterval { + log.Warn("commit interval below minimum, using minimum", + zap.Duration("requested", interval), + zap.Duration("minimum", minCommitInterval)) interval = minCommitInterval } if interval > maxCommitInterval { + log.Warn("commit interval above maximum, using maximum", + zap.Duration("requested", interval), + zap.Duration("maximum", maxCommitInterval)) interval = maxCommitInterval } c.CommitInterval = interval return nil }pkg/sink/iceberg/glue_catalog.go (1)
121-148: Variable shadowing may cause confusion.At line 126,
erris redeclared with:=inside the retry function, which shadows the outererrvariable. While Go allows this, it can be confusing. Consider using=instead sinceerris already in scope.♻️ Avoid variable shadowing
+ var getDBErr error - _, err = client.GetDatabase(ctx, &glue.GetDatabaseInput{ + _, getDBErr = client.GetDatabase(ctx, &glue.GetDatabaseInput{ Name: aws.String(dbName), }) - if err != nil { + if getDBErr != nil { var notFound *gluetypes.EntityNotFoundException - if !cerror.As(err, ¬Found) { - return cerror.Trace(err) + if !cerror.As(getDBErr, ¬Found) { + return cerror.Trace(getDBErr) }Alternatively, keep the pattern but add a comment explaining the intentional shadowing within the closure scope.
pkg/sink/iceberg/hadoop_table.go (2)
269-274: Returningnil, nilfor empty rows is ambiguous.When
rowsis empty, the function returnsnil, nil. This makes it indistinguishable from an error condition at the call site without additional checks. Consider either:
- Returning a zero-value
CommitResultwithBytesWritten: 0- Documenting this behavior clearly in a function comment
♻️ Option: Return zero-value result instead
func (w *TableWriter) AppendChangelog( ... ) (*CommitResult, error) { if len(rows) == 0 { - return nil, nil + return &CommitResult{}, nil }
841-861: Magic number for version overflow limit.The value
1024for maximum metadata version iterations should be a named constant for clarity and maintainability.♻️ Extract constant
+const maxMetadataVersionRetries = 1024 + func (w *TableWriter) nextAvailableMetadataVersion(ctx context.Context, metadataDirRel string, startVersion int) (int, error) { ... - for i := 0; i < 1024; i++ { + for i := 0; i < maxMetadataVersionRetries; i++ {downstreamadapter/sink/iceberg/sink.go (1)
99-191: Consider extracting shared config validation betweenVerifyandNew.Both functions repeat catalog/mode/warehouse checks, which is easy to let drift. A small helper would keep behavior consistent.
There was a problem hiding this comment.
Actionable comments posted: 7
🤖 Fix all issues with AI agents
In `@cmd/iceberg-bootstrap/main.go`:
- Around line 273-309: The helper functions getSnapshotPoint, setTiDBSnapshot,
and clearTiDBSnapshot currently return raw errors from database and stdlib
calls; wrap all returned errors with errors.Trace() so callers receive traced
errors (e.g., change returns like "return 0, \"\", err" and "return err" inside
getSnapshotPoint, setTiDBSnapshot, and clearTiDBSnapshot to use
errors.Trace(err)); update every early-return and final return in
getSnapshotPoint (all Scan/Parse errors), setTiDBSnapshot (time.Parse and
ExecContext errors), and clearTiDBSnapshot (ExecContext error) to return
errors.Trace(err) instead of the raw err so stack traces are preserved.
In `@pkg/sink/iceberg/glue_catalog.go`:
- Around line 150-179: When handling CreateTable in the Glue catalog code, don't
swallow gluetypes.AlreadyExistsException; instead, on detecting the
AlreadyExistsException from client.CreateTable re-fetch the table via
client.GetTable (the same path used earlier) so you can proceed to the existing
metadata update logic (the code that updates "metadata_location" and
StorageDescriptor Location). Concretely: in the CreateTable error branch that
checks cerror.As(err, &exists), replace the early return with a GetTable retry
and fall through to the existing update flow that inspects/sets metadataLocation
and parameters, ensuring the stale metadata_location is corrected after the
race. Ensure you still return other errors as cerror.Trace(err).
In `@pkg/sink/iceberg/hadoop_table.go`:
- Around line 276-306: The snapshotID generated with now.UnixMilli() can
collide; update the snapshot creation logic (where snapshotID is set) to ensure
monotonic unique IDs by comparing against currentMetadata.CurrentSnapshotID
(when currentMetadata != nil) and, if now.UnixMilli() <=
currentMetadata.CurrentSnapshotID, set snapshotID to
currentMetadata.CurrentSnapshotID + 1; keep snapshotID as the same integer type
used by currentSnapshot()/metadata to avoid type mismatches and preserve
uniqueness for the snapshot lookup logic.
In `@pkg/sink/iceberg/parquet.go`:
- Around line 311-324: The BinaryBuilder case currently swallows base64 decode
failures and appends raw bytes (in the case handling inside the closure using
base64.StdEncoding.DecodeString and builder.Append), which can hide data issues;
modify the closure to surface decode errors instead of silently falling back:
either return the decode error from the closure so callers can handle it, or
emit a clear warning via the package logger (include the original string and
error) before deciding to append raw bytes, and ensure the change is applied to
the anonymous func returned in the case *array.BinaryBuilder so callers of that
func will see the error or the logged warning.
In `@pkg/sink/iceberg/partitioning.go`:
- Around line 16-30: The file's import formatting is out of sync with the
project's formatter; run the project's formatting step (make fmt) to apply
gci/gofmt rules to pkg/sink/iceberg/partitioning.go so imports are
ordered/grouped and the file matches repo style (ensure the import block and
overall file formatting are updated and commit the changes).
- Around line 359-395: The partition key building in
resolvedPartitionSpec.groupRows currently uses strings.Join(keyParts, "|") with
"null" sentinel which can collide with real values; replace this with a
collision-safe encoding for each key part (include a type tag and length prefix)
when constructing keyParts so distinct values (including the string "null" and
values containing '|') never collide—e.g., for each computed raw from
computePartitionValue/wrapUnion produce a serialized token like
"<type>:<length>:<value>" or "<nil>" for nil, append those tokens to keyParts
and then join; ensure you update the logic around key, groups[key], and order so
grouping behavior is unchanged other than fixing collisions in groupRows.
In `@pkg/sink/iceberg/upsert_table.go`:
- Around line 28-43: Add a nil-check at the top of TableWriter.Upsert to guard
against a nil receiver or nil config before any use of w.cfg (e.g., before
accessing w.cfg.EmitMetadataColumns and w.cfg.TargetFileSizeBytes); if w == nil
|| w.cfg == nil return an appropriate error (consistent with other functions,
e.g., cerror.ErrSinkURIInvalid.GenWithStackByArgs("table writer or cfg is nil"))
so the function fails fast instead of causing a nil pointer dereference.
🧹 Nitpick comments (12)
Makefile (1)
246-248: Consider adding to.PHONYdeclaration.The new
integration_test_icebergtarget follows the established pattern. However, the.PHONYdeclaration at line 3-8 includes other integration test targets but doesn't includeintegration_test_iceberg. While this may work correctly, adding it to.PHONYensures consistent behavior with other targets.♻️ Suggested addition to .PHONY
Update line 7 to include the new target:
-.PHONY: clean fmt check tidy \ - generate-protobuf generate_mock \ - cdc kafka_consumer storage_consumer pulsar_consumer filter_helper \ - prepare_test_binaries \ - unit_test_in_verify_ci integration_test_build integration_test_build_fast integration_test_mysql integration_test_kafka integration_test_storage integration_test_pulsar \ - generate-next-gen-grafana +.PHONY: clean fmt check tidy \ + generate-protobuf generate_mock \ + cdc kafka_consumer storage_consumer pulsar_consumer filter_helper \ + prepare_test_binaries \ + unit_test_in_verify_ci integration_test_build integration_test_build_fast integration_test_mysql integration_test_kafka integration_test_storage integration_test_pulsar integration_test_iceberg \ + generate-next-gen-grafanatests/integration_tests/README.md (1)
111-117: Fix markdown blockquote formatting.The static analysis tool flagged a blank line inside the blockquote (between lines 111 and 113). Consider restructuring to avoid the blank line within the blockquote block.
📝 Suggested fix
- > `Iceberg sink` can be tested with `make integration_test_iceberg CASE=iceberg_append_basic`. - - > Iceberg Spark readback (optional): set `ICEBERG_SPARK_READBACK=1` and provide Iceberg Spark runtime via + > `Iceberg sink` can be tested with `make integration_test_iceberg CASE=iceberg_append_basic`. + > + > Iceberg Spark readback (optional): set `ICEBERG_SPARK_READBACK=1` and provide Iceberg Spark runtime viacmd/iceberg-maintenance/main.go (1)
51-55: Consider wrapping AWS SDK errors with stack traces.Per coding guidelines, errors from third-party or library calls should be wrapped with
errors.Trace(err)orerrors.WrapError(...)to attach a stack trace. While this is a CLI tool where the impact is limited, wrapping these errors would provide better debugging context.♻️ Suggested fix
Add the import and wrap errors:
import ( "context" "flag" "fmt" "os" "sort" "strings" "github.com/aws/aws-sdk-go-v2/aws" awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/glue" gluetypes "github.com/aws/aws-sdk-go-v2/service/glue/types" + "github.com/pingcap/ticdc/pkg/errors" )Then update error handling:
cfg, err := awsconfig.LoadDefaultConfig(ctx, opts...) if err != nil { - fmt.Fprintf(os.Stderr, "load aws config failed: %v\n", err) + fmt.Fprintf(os.Stderr, "load aws config failed: %v\n", errors.Trace(err)) os.Exit(1) }names, err := listIcebergGlueTables(ctx, client, strings.TrimSpace(*database)) if err != nil { - fmt.Fprintf(os.Stderr, "list glue tables failed: %v\n", err) + fmt.Fprintf(os.Stderr, "list glue tables failed: %v\n", errors.Trace(err)) os.Exit(1) }As per coding guidelines: "When an error comes from a third-party or library call in Go, wrap it immediately with
errors.Trace(err)orerrors.WrapError(...)to attach a stack trace."Also applies to: 63-67
pkg/sink/iceberg/avro.go (1)
29-51: Wrap errors from the goavro library.Per coding guidelines, errors from third-party library calls should be wrapped with
errors.Trace(err)to attach a stack trace for better debugging.♻️ Suggested fix
import ( "bytes" "github.com/linkedin/goavro/v2" + "github.com/pingcap/ticdc/pkg/errors" ) func writeOCF(schema string, meta map[string][]byte, compressionName string, records []any) ([]byte, error) { codec, err := goavro.NewCodec(schema) if err != nil { - return nil, err + return nil, errors.Trace(err) } buf := bytes.NewBuffer(nil) writer, err := goavro.NewOCFWriter(goavro.OCFConfig{ W: buf, Codec: codec, CompressionName: compressionName, MetaData: meta, }) if err != nil { - return nil, err + return nil, errors.Trace(err) } if len(records) > 0 { if err := writer.Append(records); err != nil { - return nil, err + return nil, errors.Trace(err) } } return buf.Bytes(), nil }As per coding guidelines: "When an error comes from a third-party or library call in Go, wrap it immediately with
errors.Trace(err)orerrors.WrapError(...)to attach a stack trace."pkg/sink/iceberg/schema_validation.go (1)
54-79: Minor: Redundant whitespace trimming and empty switch case.The function trims whitespace on line 55, but the caller
validateSchemaEvolutionalready trims the types before passing them in (lines 38-39). The emptydefault:case on lines 64-65 can be removed since the function falls through to the decimal check regardless.♻️ Suggested cleanup
func isSafeIcebergTypeWidening(oldType string, newType string) bool { - if strings.TrimSpace(oldType) == strings.TrimSpace(newType) { + if oldType == newType { return true } switch oldType { case "int": return newType == "long" case "float": return newType == "double" - default: }pkg/sink/iceberg/checkpoint.go (2)
121-150:checkpointTableErris never set, making the error return path dead code.The
sync.Onceblock on lines 126-147 only assignsw.checkpointTableInfobut never assignsw.checkpointTableErr. The returned error on line 149 will always benil. If initialization can fail (e.g., ifcommon.WrapTableInfocould return an error in the future), consider capturing and returning the error properly.♻️ Suggested fix if error handling is needed
If
WrapTableInfocannot fail, remove the error field from the struct. Otherwise:w.checkpointOnce.Do(func() { ft := func() types.FieldType { return *types.NewFieldType(mysql.TypeVarchar) } - w.checkpointTableInfo = common.WrapTableInfo(checkpointSchemaName, &timodel.TableInfo{ + w.checkpointTableInfo, w.checkpointTableErr = common.WrapTableInfo(checkpointSchemaName, &timodel.TableInfo{ // ... }) })
152-175: Same issue:globalCheckpointTableErris never set.Similar to
getCheckpointTableInfo, theglobalCheckpointTableErrfield is never assigned within thesync.Onceblock, so line 174 always returnsnil.pkg/sink/iceberg/upsert_table.go (1)
147-154: Redundant safety check for equality deletes.Line 66 already validates
isSafeForEqualityDeletesand returns an error if unsafe. The check on line 147 is therefore redundant since we wouldn't reach this code path if the partition spec wasn't safe. Consider removing the redundant branch or adding a comment explaining why it's kept defensively.pkg/config/sink.go (1)
707-764: Addvalidate()method toIcebergConfiginpkg/config/sink.gofor consistency and early validation.Other sink configs like
PulsarConfig(line 658) andCSVConfig(line 274) have validation methods called duringSinkConfig.validateAndAdjust(). While validation logic forIcebergConfigexists inpkg/sink/iceberg/config.go'sApply()method, it should be integrated intopkg/config/sink.goto validate during changefeed creation, consistent with the pattern for other sinks. This validates:
Mode: "append" or "upsert"Catalog: "hadoop", "glue", or "rest"SchemaMode: "strict" or "evolve"CommitInterval: valid duration stringtests/integration_tests/_utils/iceberg_spark_sql_scalar (1)
56-64: Guard against both ICEBERG_SPARK_PACKAGES and ICEBERG_SPARK_JARS being set.Right now
ICEBERG_SPARK_PACKAGESsilently wins when both are set, which can mask misconfiguration. Consider failing fast with an explicit error to avoid ambiguous runs.Suggested update
sparkArgs=() -if [[ -n "${ICEBERG_SPARK_PACKAGES:-}" ]]; then +if [[ -n "${ICEBERG_SPARK_PACKAGES:-}" && -n "${ICEBERG_SPARK_JARS:-}" ]]; then + echo "Set only one of ICEBERG_SPARK_PACKAGES or ICEBERG_SPARK_JARS." >&2 + exit 1 +fi +if [[ -n "${ICEBERG_SPARK_PACKAGES:-}" ]]; then sparkArgs+=(--packages "${ICEBERG_SPARK_PACKAGES}") elif [[ -n "${ICEBERG_SPARK_JARS:-}" ]]; then sparkArgs+=(--jars "${ICEBERG_SPARK_JARS}") elsetests/integration_tests/iceberg_upsert_basic/run.sh (1)
1-4: Addpipefailto surface pipeline failures.
set -ewon’t fail a pipeline if an earlier command errors (e.g.,ls | sort | tail). Enablingpipefailmakes test failures more visible.♻️ Suggested change
-set -eu +set -euo pipefailpkg/sink/iceberg/hadoop_table_test.go (1)
277-378: Make manifest-list assertions order-agnostic.If entry ordering changes, these index-based checks can become flaky. Consider asserting by content instead of position.
As per coding guidelines: Use unit test files named `*_test.go` in Go; favor deterministic tests and use `testify/require`.♻️ Suggested change
- require.Equal(t, int32(0), records[0]["content"]) - require.Equal(t, int64(1), records[0]["sequence_number"]) - require.Equal(t, int64(1), records[0]["min_sequence_number"]) - - require.Equal(t, int32(1), records[1]["content"]) - require.Equal(t, int64(1), records[1]["sequence_number"]) - require.Equal(t, int64(0), records[1]["min_sequence_number"]) + var dataRec, deleteRec map[string]any + for _, rec := range records { + switch rec["content"].(int32) { + case dataFileContentData: + dataRec = rec + case dataFileContentEqualityDeletes: + deleteRec = rec + } + } + require.NotNil(t, dataRec) + require.NotNil(t, deleteRec) + require.Equal(t, int64(1), dataRec["sequence_number"]) + require.Equal(t, int64(1), dataRec["min_sequence_number"]) + require.Equal(t, int64(1), deleteRec["sequence_number"]) + require.Equal(t, int64(0), deleteRec["min_sequence_number"])
Signed-off-by: BornChanger <dawn_catcher@126.com>
Signed-off-by: BornChanger <dawn_catcher@126.com>
Signed-off-by: BornChanger <dawn_catcher@126.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@go.mod`:
- Line 13: The go.mod pins "github.com/apache/arrow-go/v18" to v18.0.0 (and a
replace to the fork joechenrh/arrow-go/v18 at a v18.0.0 pseudo-version); update
the required version to the latest stable (v18.5.1) and either remove or update
the replace directive to the corresponding fork version if the fork is still
needed, then run "go get github.com/apache/arrow-go/v18@v18.5.1" and "go mod
tidy" to refresh dependencies and run tests; if the replace to the fork is
intentional and must remain, add a concise inline comment next to the
replace/require explaining why the fork is required (e.g., bugfix/feature not
upstreamed) and the minimum version constraint.
Signed-off-by: BornChanger <dawn_catcher@126.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@pkg/sink/iceberg/config.go`:
- Around line 139-358: The code currently always injects defaultPartitioning
when Partitioning is empty, preventing users from explicitly setting an
unpartitioned table; add a local boolean (e.g., partitioningSpecified) and set
it when the partitioning key is provided by the user (for query params use
presence check: _, ok := query["partitioning"]; for sink config set the flag
when the IcebergConfig explicitly contains the Partitioning field), and then
change the final defaulting condition to apply defaultPartitioning only when
partitioningSpecified is false (instead of when Partitioning is empty) while
preserving the existing check for c.EmitMetadataColumns and c.Mode == ModeAppend
and references to Partitioning, EmitMetadataColumns, ModeAppend, and
defaultPartitioning.
In `@pkg/sink/iceberg/hadoop_table.go`:
- Around line 1057-1094: The function loadTableMetadataFromHint currently
returns nil when version-hint.text is missing, which orphaned tables; update it
to fall back to scanning the metadataDirRel for the highest v*.metadata.json and
load that version instead of returning nil. Implement a helper (e.g.,
findLatestMetadataVersion or loadLatestMetadataFromDir) that lists files under
metadataDirRel (using the storage API's directory listing/WalkDir), parses
filenames matching the regexp `^v(\d+)\.metadata\.json$`, selects the largest
version, reads and unmarshals that metadata into tableMetadata (set
SelfMetadataLocation via joinLocation) and return it from
loadTableMetadataFromHint; keep the existing hint-path flow when version-hint
exists and only invoke the helper when the hint file is absent or unreadable.
🧹 Nitpick comments (1)
pkg/sink/iceberg/upsert_table.go (1)
88-214: Optional: skip empty commits when there’s nothing to write.If both
dataRowsanddeleteRowsare empty, you can return early to avoid creating a no-op snapshot and extra metadata churn.♻️ Suggested tweak
if len(equalityFieldIDs) == 0 { return nil, cerror.ErrSinkURIInvalid.GenWithStackByArgs("equality field ids are empty") } + if len(dataRows) == 0 && len(deleteRows) == 0 { + return nil, nil + }
Signed-off-by: BornChanger <dawn_catcher@126.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@log.txt`:
- Around line 1-12: The test TestAppendChangelogPartitionsByCommitTimeDay is
failing because only one manifest entry is produced instead of two; inspect and
fix the partitioning and manifest generation path (functions/methods involved in
computing partition keys and creating manifests — e.g., the partition key
computation used by AppendChangelog/appendChangelog, any computePartition or
partitionForRecord function, and the manifest creation logic such as
generateManifests/createManifest). Ensure records are grouped by the
commit_time_day partition key correctly (computePartition/partitionForRecord
returns distinct keys for different commit days) and that
generateManifests/createManifest creates distinct manifest entries per partition
group rather than merging them; update grouping logic and manifest emission so
the TestAppendChangelogPartitionsByCommitTimeDay produces two manifest entries.
🧹 Nitpick comments (2)
docs/design/2026-02-01-ticdc-iceberg-upsert-optimization-plan.md (1)
9-18: Optional: render task list as Markdown checkboxes.
Use- [ ]so GitHub renders these as interactive task items.Suggested edit
- [ ] Review design docs and upsert code path; identify the highest-cost conversion and metadata steps. + - [ ] Review design docs and upsert code path; identify the highest-cost conversion and metadata steps.pkg/sink/iceberg/upsert_table.go (1)
69-165: Remove redundant equality-delete safety check.
isSafeForEqualityDeletesis already enforced earlier, so theelsebranch here is dead code. Consider simplifying.Suggested simplification
- if resolvedSpec.isSafeForEqualityDeletes(equalityFieldIDs) { - deleteGroups, groupErr = resolvedSpec.groupRows(deleteRows) - } else { - deleteGroups = []partitionGroup{{partition: resolvedSpec.emptyPartitionRecord(), rows: deleteRows}} - } + deleteGroups, groupErr = resolvedSpec.groupRows(deleteRows)
Signed-off-by: BornChanger <dawn_catcher@126.com>
Signed-off-by: BornChanger <dawn_catcher@126.com>
Signed-off-by: BornChanger <dawn_catcher@126.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @.github/workflows/integration_test_iceberg.yaml:
- Around line 64-73: The yq download step writes directly to /usr/local/bin/yq
without sudo, causing permission failures on GitHub runners; update the step
that sets YQ_VERSION and the curl command that outputs to /usr/local/bin/yq (and
its subsequent chmod) to use elevated permissions — e.g., download to /tmp or
use sudo when writing/moving to /usr/local/bin and ensure sudo chmod +x
/usr/local/bin/yq is used so the yq install matches the oras install pattern
(refer to ORAS_VERSION, YQ_VERSION and the curl -> /usr/local/bin/yq
invocation).
🧹 Nitpick comments (1)
.github/workflows/integration_test_iceberg.yaml (1)
86-109: Consider pinning the external script for supply chain security.The download script is fetched from an external repository at runtime (line 89). For improved supply chain security, consider either vendoring the script or verifying its checksum.
💡 Example checksum verification
script="${RUNNER_TEMP}/download_pingcap_oci_artifact.sh" curl -fsSL "https://raw.githubusercontent.com/PingCAP-QE/ci/main/scripts/artifacts/download_pingcap_oci_artifact.sh" -o "$script" + echo "<expected_sha256> $script" | sha256sum -c - chmod +x "$script"
150fa2a to
38fa40e
Compare
38fa40e to
5c51ee0
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @.github/workflows/integration_test_iceberg.yaml:
- Around line 64-74: Update the hardcoded tool versions in the GitHub Actions
step that installs OCI download tools by bumping ORAS_VERSION and YQ_VERSION to
current stable releases; specifically edit the block that defines
ORAS_VERSION="1.2.0" and YQ_VERSION="4.44.3" in the step named "Install OCI
download tools" and set them to the newer stable tags (e.g.,
ORAS_VERSION="1.3.0" and YQ_VERSION="4.52.2" or whatever the latest stable
release is at commit time), then verify the download URLs and filenames still
match the new release artifacts used by the curl/tar/mv commands (the variables
ORAS_VERSION and YQ_VERSION are the unique symbols to change).
🧹 Nitpick comments (1)
.github/workflows/integration_test_iceberg.yaml (1)
93-104: Inline Python patch is fragile and may break silently if upstream script changes.The script patches a specific string in the downloaded script. If the upstream
download_pingcap_oci_artifact.shchanges this line, the workflow will fail with a cryptic error. Consider adding a more informative error message or tracking an issue to upstream the fix.💡 Suggested improvement
if needle not in text: - raise SystemExit("failed to patch download script: target not found") + raise SystemExit( + "failed to patch download script: target string not found.\n" + f"Expected: {needle!r}\n" + "The upstream script may have changed. Please review and update the patch." + ) path.write_text(text.replace(needle, replacement))
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@tests/integration_tests/iceberg_upsert_basic/run.sh`:
- Around line 8-14: The script reads the positional arg into SINK_TYPE using
SINK_TYPE=$1 which will fail under set -u if no argument is supplied; modify the
top of the script to safely default/validate the positional parameter before
using it (e.g., check if $1 is set or use a parameter expansion default) and
emit the same "skip iceberg integration test..." message and exit when no valid
sink type is provided so SINK_TYPE is always defined when referenced.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@tests/integration_tests/iceberg_append_basic/run.sh`:
- Around line 1-14: Run shfmt -w on the script to fix formatting (e.g., run
shfmt -w tests/integration_tests/iceberg_append_basic/run.sh) and update the
script to consistently quote variables to avoid word-splitting (quote
occurrences of CUR, BASH_SOURCE, OUT_DIR, TEST_NAME, CDC_BINARY, and SINK_TYPE
in run.sh; e.g., use "$CUR" and "$SINK_TYPE"). Ensure set -eu remains and the
behavior of the SINK_TYPE check is unchanged.
🧹 Nitpick comments (1)
tests/integration_tests/iceberg_append_basic/run.sh (1)
44-84: Consider extracting duplicated utilities to_utils.Both
wait_file_existsandwait_changefeed_table_assignedare identical to the implementations iniceberg_upsert_basic/run.sh. Extracting these to a shared utility file (e.g.,tests/integration_tests/_utils/iceberg_utils) would reduce duplication and simplify maintenance across Iceberg integration tests.
0d60eba to
90198d7
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@tests/integration_tests/iceberg_upsert_basic/run.sh`:
- Around line 1-186: The script fails shfmt checks; run shfmt (e.g., shfmt -w)
on the file to normalize indentation, spacing, and quoting throughout, then
commit the formatted file; pay special attention to the function blocks prepare,
wait_file_exists, wait_changefeed_table_assigned, and iceberg_check_upsert_basic
so their braces, indentation, and compound commands (if/curl loops and
heredoc-like multiline commands) are formatted consistently and any stray inline
commands (e.g., the processLogger-like single-line commands and concatenated
statements) are placed on separate lines to satisfy shfmt.
90198d7 to
48076b0
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@tests/integration_tests/iceberg_append_basic/run.sh`:
- Around line 3-14: The script uses "set -u" so referencing the positional
parameter as SINK_TYPE="$1" can fail when no arg is passed; change the logic to
first guard the positional argument (e.g., test $# -lt 1 or use a default via
SINK_TYPE="${1:-}") before the comparison, then perform the if [ "$SINK_TYPE" !=
"iceberg" ] check; update the SINK_TYPE assignment and the conditional in run.sh
to avoid an unbound-variable error when no argument is provided.
Signed-off-by: BornChanger <dawn_catcher@126.com>
48076b0 to
86eefe0
Compare
What problem does this PR solve?
TiCDC currently lacks a native sink for Apache Iceberg. This PR adds an
iceberg://sink so users can continuously replicate TiDB changes into query-ready Iceberg tables (Spark-first), using AWS Glue + S3 in production or a local Hadoop catalog + file warehouse for dev/testing.Issue Number: close #4099
What is changed and how it works?
iceberg://(wired into sink selection + changefeed config).catalog=gluewithwarehouse=s3://...(Glue is auto-selected when warehouse is S3 and catalog not specified).catalog=hadoopwithwarehouse=file://...(local/dev).catalog=restreserved (not implemented).commit-interval, default30s) using TiCDCresolved-tsas the correctness barrier.tidb.committed_resolved_ts) to support restart-safe/idempotent progress.mode=append(default): append-only changelog table; default partitioningdays(_tidb_commit_time)whenemit-metadata-columns=true.mode=upsert: “latest row” semantics using Iceberg v2 equality deletes (merge-on-read).noneor derived only from handle-key columns (e.g.bucket(pk,16)) to keep equality deletes correct.enable-old-value=trueso delete keys are correct.schema-mode=strict|evolve.RENAME TABLE(S)for Glue,TRUNCATE TABLEas overwrite-to-empty snapshot.allow-takeover=truefor controlled migrations).enable-checkpoint-table=trueenable-global-checkpoint-table=truemax-buffered-*(fail-fast backpressure).ticdc_sink_iceberg_*(commit duration, lag, retries/conflicts, bytes/files written, buffer gauges, etc.).cmd/iceberg-bootstrap: snapshot one table into Iceberg and print a recommendedstart-ts.cmd/iceberg-maintenance: generate Spark SQL maintenance templates for Glue tables.tests/integration_tests/iceberg_append_basictests/integration_tests/iceberg_upsert_basicICEBERG_SPARK_READBACK=1(usestests/integration_tests/_utils/iceberg_spark_sql_scalar).tests/integration_tests/run_light_it_in_ci.shiceberg groups (pipeline hookup is a follow-up; see TODO in script).Check List
Tests
GOCACHE=/tmp/go-build-cache go test ./pkg/sink/iceberg -count=1make integration_test_iceberg CASE="iceberg_append_basic iceberg_upsert_basic"ICEBERG_SPARK_READBACK=1 ICEBERG_SPARK_PACKAGES='org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.1' make integration_test_iceberg CASE="iceberg_append_basic iceberg_upsert_basic"Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
ticdc_sink_iceberg_*metrics added.Release note
Summary by CodeRabbit
New Features
Documentation
Metrics
Tests
Chores
Style