diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java index d54ff4b471..a022d33e84 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java @@ -84,7 +84,6 @@ private enum BigQueryJobType { QUERY, COPY, COPY_SNAPSHOT }; public static final String SQL_INPUT_CONFIG = "config"; public static final String SQL_INPUT_FIELDS = "fields"; public static final String SQL_INPUT_SCHEMA = "schema"; - public static final String BQ_COPY_SNAPSHOT_OP_TYPE = "SNAPSHOT"; private static final java.lang.reflect.Type LIST_OF_STRINGS_TYPE = new TypeToken>() { }.getType(); private static final String BQ_PUSHDOWN_OPERATION_TAG = "read"; @@ -214,24 +213,9 @@ private SQLReadResult readInternal(SQLReadRequest readRequest, tableTTL = Instant.now().toEpochMilli() + ttlMillis; } - // no Filter + no view + no material + no external - StandardTableDefinition tableDefinition = Objects.requireNonNull(sourceTable).getDefinition(); - Type type = tableDefinition.getType(); - if (!(type == Type.VIEW || type == Type.MATERIALIZED_VIEW || type == Type.EXTERNAL) - && sourceConfig.getFilter() == null) { - // TRY SNAPSHOT - JobConfiguration jobConfiguration = getBQSnapshotJobConf(sourceTableId, destinationTableId); - SQLReadResult snapshotResult = executeBigQueryJob(jobConfiguration, sourceTable, sourceTableId, - BigQueryJobType.COPY_SNAPSHOT, jobLocation); - if (snapshotResult.isSuccessful()) { - BigQuerySQLEngineUtils.updateTableExpiration(bigQuery, destinationTableId, tableTTL); - return snapshotResult; - } - LOG.warn("Big Query Snapshot process used for direct BigQuery read failed. Using fallback table copy strategy."); - } - - JobConfiguration queryConfig = getBQQueryJobConfiguration(sourceTable, sourceTableId, - fields, + // Create configuration for table copy job + JobConfiguration queryConfig = getBQQueryJobConfiguration(sourceTable, + sourceTableId, sourceConfig.getFilter(), sourceConfig.getPartitionFrom(), sourceConfig.getPartitionTo(), @@ -284,19 +268,19 @@ private SQLReadResult executeBigQueryJob(JobConfiguration jobConfiguration, return SQLReadResult.success(datasetName, this); } - JobConfiguration getBQQueryJobConfiguration(Table sourceTable, TableId sourceTableId, - List fields, - String filter, - String partitionFromDate, - String partitionToDate, - Long tableTTL) { + private JobConfiguration getBQQueryJobConfiguration(Table sourceTable, + TableId sourceTableId, + String filter, + String partitionFromDate, + String partitionToDate, + Long tableTTL) { BigQuerySQLEngineUtils.createEmptyTableWithSourceConfig(bigQuery, destinationTableId.getProject(), destinationTableId.getDataset(), destinationTableId.getTable(), sourceTable, tableTTL); - String query = String.format("SELECT %s FROM `%s.%s.%s`", - String.join(",", fields), + // Select all fields from source table into destination table + String query = String.format("SELECT * FROM `%s.%s.%s`", sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable()); @@ -389,16 +373,6 @@ QueryJobConfiguration.Builder getQueryBuilder(Table sourceTable, TableId sourceT .setLabels(BigQuerySQLEngineUtils.getJobTags(BQ_PUSHDOWN_OPERATION_TAG)); } - private JobConfiguration getBQSnapshotJobConf(TableId sourceTable, TableId destinationTable) { - CopyJobConfiguration copyJobConfiguration = - CopyJobConfiguration.newBuilder(destinationTable, sourceTable) - .setOperationType(BQ_COPY_SNAPSHOT_OP_TYPE) - .setLabels(BigQuerySQLEngineUtils.getJobTags(BQ_PUSHDOWN_OPERATION_TAG)) - .build(); - - return copyJobConfiguration; - } - /** * Try to delete this table while handling exception *