From 18a28e47d2143225a41a61aa951e986e136845b3 Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Fri, 28 Nov 2025 09:57:37 -0500 Subject: [PATCH] feat(BigQueryWriteSchemaTransformProvider): add output collection names and error handling check Add support for failed rows output collections and modify bounded check to consider error handling configuration --- .../BigQueryWriteSchemaTransformProvider.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java index abab169d6932..a741c637a19e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java @@ -20,6 +20,8 @@ import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; import com.google.auto.service.AutoService; +import java.util.Arrays; +import java.util.List; import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.schemas.NoSuchSchemaException; @@ -47,6 +49,11 @@ public String identifier() { return getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE); } + @Override + public List outputCollectionNames() { + return Arrays.asList("FailedRows", "FailedRowsWithErrors", "errors"); + } + @Override protected SchemaTransform from(BigQueryWriteConfiguration configuration) { return new BigQueryWriteSchemaTransform(configuration); @@ -62,9 +69,10 @@ public static class BigQueryWriteSchemaTransform extends SchemaTransform { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { - if (input.getSinglePCollection().isBounded().equals(PCollection.IsBounded.BOUNDED)) { + if (input.getSinglePCollection().isBounded().equals(PCollection.IsBounded.BOUNDED) + && configuration.getErrorHandling() == null) { return input.apply(new BigQueryFileLoadsSchemaTransformProvider().from(configuration)); - } else { // UNBOUNDED + } else { // UNBOUNDED or error handling specified return input.apply( new BigQueryStorageWriteApiSchemaTransformProvider().from(configuration)); }