diff --git a/src/Core/ProtocolDefines.h b/src/Core/ProtocolDefines.h index 95258a2b320c..b8bbf2c9e945 100644 --- a/src/Core/ProtocolDefines.h +++ b/src/Core/ProtocolDefines.h @@ -35,8 +35,10 @@ static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54 static constexpr auto DBMS_CLUSTER_INITIAL_PROCESSING_PROTOCOL_VERSION = 1; static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_METADATA = 2; -static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA = 3; -static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 3; +static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_METADATA = 3; +static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO = 4; +static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA = 5; +static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA; static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3; static constexpr auto DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD = 4; diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 2f0f02f63491..88550adb0dda 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6789,6 +6789,19 @@ Both database and table names have to be unquoted - only simple identifiers are )", 0) \ DECLARE(Bool, allow_general_join_planning, true, R"( Allows a more general join planning algorithm that can handle more complex conditions, but only works with hash join. If hash join is not enabled, then the usual join planning algorithm is used regardless of the value of this setting. +)", 0) \ + DECLARE(ObjectStorageGranularityLevel, cluster_table_function_split_granularity, ObjectStorageGranularityLevel::FILE, R"( +Controls how data is split into tasks when executing a CLUSTER TABLE FUNCTION. + +This setting defines the granularity of work distribution across the cluster: +- `file` — each task processes an entire file. +- `bucket` — tasks are created per internal data block within a file (for example, Parquet row groups). + +Choosing finer granularity (like `bucket`) can improve parallelism when working with a small number of large files. +For instance, if a Parquet file contains multiple row groups, enabling `bucket` granularity allows each group to be processed independently by different workers. +)", 0) \ + DECLARE(UInt64, cluster_table_function_buckets_batch_size, 0, R"( +Defines the approximate size of a batch (in bytes) used in distributed processing of tasks in cluster table functions with `bucket` split granularity. The system accumulates data until at least this amount is reached. The actual size may be slightly larger to align with data boundaries. )", 0) \ DECLARE(UInt64, merge_table_max_tables_to_look_for_schema_inference, 1000, R"( When creating a `Merge` table without an explicit schema or when using the `merge` table function, infer schema as a union of not more than the specified number of matching tables. diff --git a/src/Core/Settings.h b/src/Core/Settings.h index c70767e21aca..49d50d9ce3b5 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -109,7 +109,8 @@ class WriteBuffer; M(CLASS_NAME, UInt64Auto) \ M(CLASS_NAME, URI) \ M(CLASS_NAME, VectorSearchFilterStrategy) \ - M(CLASS_NAME, GeoToH3ArgumentOrder) + M(CLASS_NAME, GeoToH3ArgumentOrder) \ + M(CLASS_NAME, ObjectStorageGranularityLevel) COMMON_SETTINGS_SUPPORTED_TYPES(Settings, DECLARE_SETTING_TRAIT) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 09bdf3a3bcb7..238dc1e3a7e2 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -58,6 +58,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}, {"export_merge_tree_part_max_bytes_per_file", 0, 0, "New setting."}, {"export_merge_tree_part_max_rows_per_file", 0, 0, "New setting."}, + {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}, + {"cluster_table_function_split_granularity", "file", "file", "New setting."}, + {"cluster_table_function_buckets_batch_size", 0, 0, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.8", { diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index 2fb4f1668ed4..6dd69453d703 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -372,4 +372,10 @@ IMPLEMENT_SETTING_ENUM( {"manifest_file_entry", IcebergMetadataLogLevel::ManifestFileEntry}}) IMPLEMENT_SETTING_AUTO_ENUM(MergeTreePartExportFileAlreadyExistsPolicy, ErrorCodes::BAD_ARGUMENTS); + +IMPLEMENT_SETTING_ENUM( + ObjectStorageGranularityLevel, + ErrorCodes::BAD_ARGUMENTS, + {{"file", ObjectStorageGranularityLevel::FILE}, + {"bucket", ObjectStorageGranularityLevel::BUCKET}}) } diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index 935e0ee8b615..225358d7541d 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -481,6 +481,14 @@ enum class IcebergMetadataLogLevel : uint8_t DECLARE_SETTING_ENUM(IcebergMetadataLogLevel) +enum class ObjectStorageGranularityLevel : uint8_t +{ + FILE = 0, + BUCKET = 1, +}; + +DECLARE_SETTING_ENUM(ObjectStorageGranularityLevel) + enum class MergeTreePartExportFileAlreadyExistsPolicy : uint8_t { skip, diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index 292a8de13537..a6093d4d92f9 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -29,6 +29,7 @@ #include #include +#include #include #include @@ -149,6 +150,8 @@ struct PathWithMetadata std::optional absolute_path; ObjectStoragePtr object_storage_to_use = nullptr; + FileBucketInfoPtr file_bucket_info; + PathWithMetadata() = default; explicit PathWithMetadata( @@ -188,6 +191,14 @@ struct PathWithMetadata void loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file = true); ObjectStoragePtr getObjectStorage() const { return object_storage_to_use; } + + String getIdentifier() const + { + String result = getAbsolutePath().value_or(getPath()); + if (file_bucket_info) + result += file_bucket_info->getIdentifier(); + return result; + } }; struct ObjectKeyWithMetadata diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 4b955510a608..2e89173e05b8 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -374,6 +374,20 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se return format_settings; } +FileBucketInfoPtr FormatFactory::getFileBucketInfo(const String & format) +{ + auto creator = getCreators(format); + return creator.file_bucket_info_creator(); +} + +void FormatFactory::registerFileBucketInfo(const String & format, FileBucketInfoCreator bucket_info) +{ + chassert(bucket_info); + auto & creators = getOrCreateCreators(format); + if (creators.file_bucket_info_creator) + throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Bucket splitter for format {} is already registered", format); + creators.file_bucket_info_creator = std::move(bucket_info); +} InputFormatPtr FormatFactory::getInput( const String & name, @@ -694,6 +708,21 @@ void FormatFactory::registerInputFormat(const String & name, InputCreator input_ KnownFormatNames::instance().add(name, /* case_insensitive = */ true); } +void FormatFactory::registerSplitter(const String & format, BucketSplitterCreator splitter) +{ + chassert(splitter); + auto & creators = getOrCreateCreators(format); + if (creators.bucket_splitter_creator) + throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Bucket splitter for format {} is already registered", format); + creators.bucket_splitter_creator = std::move(splitter); +} + +BucketSplitter FormatFactory::getSplitter(const String & format) +{ + auto creator = getCreators(format); + return creator.bucket_splitter_creator(); +} + void FormatFactory::registerRandomAccessInputFormat(const String & name, RandomAccessInputCreator input_creator) { chassert(input_creator); diff --git a/src/Formats/FormatFactory.h b/src/Formats/FormatFactory.h index fb8204b7fcb3..1aab5426a319 100644 --- a/src/Formats/FormatFactory.h +++ b/src/Formats/FormatFactory.h @@ -10,6 +10,8 @@ #include #include +#include + #include #include @@ -94,6 +96,10 @@ class FormatFactory final : private boost::noncopyable const RowInputFormatParams & params, const FormatSettings & settings)>; + using FileBucketInfoCreator = std::function; + + using BucketSplitterCreator = std::function; + // Incompatible with FileSegmentationEngine. using RandomAccessInputCreator = std::function #include #include +#include #include +#include +#include namespace DB { @@ -40,6 +43,7 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o const bool send_over_whole_archive = !context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes]; path = send_over_whole_archive ? object->getPathOrPathToArchiveIfArchive() : object->getPath(); absolute_path = object->getAbsolutePath(); + file_bucket_info = object->file_bucket_info; } } @@ -58,7 +62,9 @@ ObjectInfoPtr ClusterFunctionReadTaskResponse::getObjectInfo() const object->file_meta_info = file_meta_info; if (absolute_path.has_value() && !absolute_path.value().empty()) object->absolute_path = absolute_path; - + + object->file_bucket_info = file_bucket_info; + return object; } @@ -76,6 +82,21 @@ void ClusterFunctionReadTaskResponse::serialize(WriteBuffer & out, size_t protoc ActionsDAG().serialize(out, registry); } + if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO) + { + if (file_bucket_info) + { + /// Write format name so we can create appropriate file bucket info during deserialization. + writeStringBinary(file_bucket_info->getFormatName(), out); + file_bucket_info->serialize(out); + } + else + { + /// Write empty string as format name if file_bucket_info is not set. + writeStringBinary("", out); + } + } + if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA) { /// This info is not used when optimization is disabled, so there is no need to send it. @@ -111,6 +132,17 @@ void ClusterFunctionReadTaskResponse::deserialize(ReadBuffer & in) } } + if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO) + { + String format; + readStringBinary(format, in); + if (!format.empty()) + { + file_bucket_info = FormatFactory::instance().getFileBucketInfo(format); + file_bucket_info->deserialize(in); + } + } + if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA) { auto info = std::make_shared(DataFileMetaInfo::deserialize(in)); diff --git a/src/Interpreters/ClusterFunctionReadTask.h b/src/Interpreters/ClusterFunctionReadTask.h index b5e2123cbc4f..7e6af60424f8 100644 --- a/src/Interpreters/ClusterFunctionReadTask.h +++ b/src/Interpreters/ClusterFunctionReadTask.h @@ -1,5 +1,8 @@ #pragma once #include +#include +#include +#include #include #include @@ -18,8 +21,12 @@ struct ClusterFunctionReadTaskResponse /// Data path (object path, in case of object storage). String path; + /// Absolute path (including storage type prefix). std::optional absolute_path; + + FileBucketInfoPtr file_bucket_info; + /// Object metadata path, in case of data lake object. DataLakeObjectMetadata data_lake_metadata; /// File's columns info diff --git a/src/Processors/Formats/IInputFormat.cpp b/src/Processors/Formats/IInputFormat.cpp index 4311e8bcaf97..3ed63fdbbaa2 100644 --- a/src/Processors/Formats/IInputFormat.cpp +++ b/src/Processors/Formats/IInputFormat.cpp @@ -1,7 +1,11 @@ +#include #include #include #include #include +#include +#include +#include namespace DB { @@ -70,4 +74,9 @@ void IInputFormat::onFinish() { resetReadBuffer(); } + +void IInputFormat::setBucketsToRead(const FileBucketInfoPtr & /*buckets_to_read*/) +{ +} + } diff --git a/src/Processors/Formats/IInputFormat.h b/src/Processors/Formats/IInputFormat.h index cb5feea18646..bf0eccf2fa88 100644 --- a/src/Processors/Formats/IInputFormat.h +++ b/src/Processors/Formats/IInputFormat.h @@ -4,6 +4,8 @@ #include #include #include +#include +#include #include #include #include @@ -46,6 +48,29 @@ struct ChunkInfoRowNumbers : public ChunkInfo std::optional applied_filter; }; +/// Structure for storing information about buckets that IInputFormat needs to read. +struct FileBucketInfo +{ + virtual void serialize(WriteBuffer & buffer) = 0; + virtual void deserialize(ReadBuffer & buffer) = 0; + virtual String getIdentifier() const = 0; + virtual String getFormatName() const = 0; + + virtual ~FileBucketInfo() = default; +}; +using FileBucketInfoPtr = std::shared_ptr; + +/// Interface for splitting a file into buckets. +struct IBucketSplitter +{ + /// Splits a file into buckets using the given read buffer and format settings. + /// Returns information about the resulting buckets (see the structure above for details). + virtual std::vector splitToBuckets(size_t bucket_size, ReadBuffer & buf, const FormatSettings & format_settings_) = 0; + + virtual ~IBucketSplitter() = default; +}; +using BucketSplitter = std::shared_ptr; + /** Input format is a source, that reads data from ReadBuffer. */ class IInputFormat : public ISource @@ -69,6 +94,7 @@ class IInputFormat : public ISource /// All data reading from the read buffer must be performed by this method. virtual Chunk read() = 0; + virtual void setBucketsToRead(const FileBucketInfoPtr & buckets_to_read); /** In some usecase (hello Kafka) we need to read a lot of tiny streams in exactly the same format. * The recreating of parser for each small stream takes too long, so we introduce a method * resetParser() which allow to reset the state of parser to continue reading of diff --git a/src/Processors/Formats/Impl/Parquet/ReadManager.cpp b/src/Processors/Formats/Impl/Parquet/ReadManager.cpp index 0cea3ce8e407..fc6a8c3d75c5 100644 --- a/src/Processors/Formats/Impl/Parquet/ReadManager.cpp +++ b/src/Processors/Formats/Impl/Parquet/ReadManager.cpp @@ -6,7 +6,9 @@ #include #include +#include #include +#include namespace DB::ErrorCodes { @@ -41,11 +43,18 @@ std::optional AtomicBitSet::findFirst() return std::nullopt; } -void ReadManager::init(FormatParserSharedResourcesPtr parser_shared_resources_) +void ReadManager::init(FormatParserSharedResourcesPtr parser_shared_resources_, const std::optional> & buckets_to_read_) { parser_shared_resources = parser_shared_resources_; reader.file_metadata = Reader::readFileMetaData(reader.prefetcher); - reader.prefilterAndInitRowGroups(); + + if (buckets_to_read_) + { + row_groups_to_read = std::unordered_set{}; + for (auto rg : *buckets_to_read_) + row_groups_to_read->insert(rg); + } + reader.prefilterAndInitRowGroups(row_groups_to_read); reader.preparePrewhere(); ProfileEvents::increment(ProfileEvents::ParquetReadRowGroups, reader.row_groups.size()); @@ -337,8 +346,8 @@ void ReadManager::finishRowSubgroupStage(size_t row_group_idx, size_t row_subgro case ReadStage::PrewhereData: { chassert(!reader.prewhere_steps.empty()); - reader.applyPrewhere(row_subgroup); - size_t prev = row_group.prewhere_ptr.exchange(row_subgroup_idx + 1); + reader.applyPrewhere(row_subgroup, row_group); + size_t prev = row_group.prewhere_ptr.exchange(row_subgroup_idx + 1); /// NOLINT(clang-analyzer-deadcode.DeadStores) chassert(prev == row_subgroup_idx); if (row_subgroup_idx + 1 < row_group.subgroups.size()) { diff --git a/src/Processors/Formats/Impl/Parquet/ReadManager.h b/src/Processors/Formats/Impl/Parquet/ReadManager.h index 291f81a2cb4a..113d4430ee2a 100644 --- a/src/Processors/Formats/Impl/Parquet/ReadManager.h +++ b/src/Processors/Formats/Impl/Parquet/ReadManager.h @@ -40,7 +40,7 @@ class ReadManager /// (I'm trying this style because the usual pattern of passing-through lots of arguments through /// layers of constructors seems bad. This seems better but still not great, hopefully there's an /// even better way.) - void init(FormatParserSharedResourcesPtr parser_shared_resources_); + void init(FormatParserSharedResourcesPtr parser_shared_resources_, const std::optional> & buckets_to_read_); ~ReadManager(); @@ -109,6 +109,8 @@ class ReadManager std::priority_queue, Task::Comparator> delivery_queue; std::condition_variable delivery_cv; std::exception_ptr exception; + /// Nullopt means that ReadManager reads all row groups + std::optional> row_groups_to_read; void scheduleTask(Task task, bool is_first_in_group, MemoryUsageDiff & diff, std::vector & out_tasks); void runTask(Task task, bool last_in_batch, MemoryUsageDiff & diff); diff --git a/src/Processors/Formats/Impl/Parquet/Reader.cpp b/src/Processors/Formats/Impl/Parquet/Reader.cpp index fdd0a433d789..5f2ecbbb1140 100644 --- a/src/Processors/Formats/Impl/Parquet/Reader.cpp +++ b/src/Processors/Formats/Impl/Parquet/Reader.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #if USE_SNAPPY @@ -262,7 +263,7 @@ void Reader::getHyperrectangleForRowGroup(const parq::RowGroup * meta, Hyperrect } } -void Reader::prefilterAndInitRowGroups() +void Reader::prefilterAndInitRowGroups(const std::optional> & row_groups_to_read) { extended_sample_block = *sample_block; for (const auto & col : format_filter_info->additional_columns) @@ -330,6 +331,7 @@ void Reader::prefilterAndInitRowGroups() RowGroup & row_group = row_groups.emplace_back(); row_group.meta = meta; + row_group.need_to_process = !row_groups_to_read.has_value() || row_groups_to_read->contains(row_group_idx); row_group.row_group_idx = row_group_idx; row_group.start_global_row_idx = total_rows - size_t(meta->num_rows); row_group.columns.resize(primitive_columns.size()); @@ -1016,8 +1018,8 @@ void Reader::intersectColumnIndexResultsAndInitSubgroups(RowGroup & row_group) RowSubgroup & row_subgroup = row_group.subgroups.emplace_back(); row_subgroup.start_row_idx = substart; - row_subgroup.filter.rows_pass = subend - substart; - row_subgroup.filter.rows_total = row_subgroup.filter.rows_pass; + row_subgroup.filter.rows_pass = row_group.need_to_process ? subend - substart : 0; + row_subgroup.filter.rows_total = subend - substart; row_subgroup.columns.resize(primitive_columns.size()); row_subgroup.output.resize(extended_sample_block.columns()); @@ -1025,7 +1027,6 @@ void Reader::intersectColumnIndexResultsAndInitSubgroups(RowGroup & row_group) row_subgroup.block_missing_values.init(sample_block->columns()); } } - row_group.intersected_row_ranges_after_column_index = std::move(row_ranges); } @@ -1939,7 +1940,7 @@ MutableColumnPtr Reader::formOutputColumn(RowSubgroup & row_subgroup, size_t out return res; } -void Reader::applyPrewhere(RowSubgroup & row_subgroup) +void Reader::applyPrewhere(RowSubgroup & row_subgroup, const RowGroup & row_group) { for (size_t step_idx = 0; step_idx < prewhere_steps.size(); ++step_idx) { @@ -1990,7 +1991,7 @@ void Reader::applyPrewhere(RowSubgroup & row_subgroup) chassert(filter.size() == row_subgroup.filter.rows_pass); size_t rows_pass = countBytesInFilter(filter.data(), 0, filter.size()); - if (rows_pass == 0) + if (rows_pass == 0 || !row_group.need_to_process) { /// Whole row group was filtered out. row_subgroup.filter.rows_pass = 0; diff --git a/src/Processors/Formats/Impl/Parquet/Reader.h b/src/Processors/Formats/Impl/Parquet/Reader.h index 899d4371b0c4..b132ecc39046 100644 --- a/src/Processors/Formats/Impl/Parquet/Reader.h +++ b/src/Processors/Formats/Impl/Parquet/Reader.h @@ -387,6 +387,7 @@ struct Reader size_t row_group_idx; // in parquet file size_t start_global_row_idx = 0; // total number of rows in preceding row groups in the file + bool need_to_process = false; /// Parallel to Reader::primitive_columns. /// NOT parallel to `meta.columns` (it's a subset of parquet columns). std::vector columns; @@ -470,7 +471,7 @@ struct Reader void init(const ReadOptions & options_, const Block & sample_block_, FormatFilterInfoPtr format_filter_info_); static parq::FileMetaData readFileMetaData(Prefetcher & prefetcher); - void prefilterAndInitRowGroups(); + void prefilterAndInitRowGroups(const std::optional> & row_groups_to_read); void preparePrewhere(); /// Deserialize bf header and determine which bf blocks to read. @@ -502,7 +503,7 @@ struct Reader /// is not called again for the moved-out columns. MutableColumnPtr formOutputColumn(RowSubgroup & row_subgroup, size_t output_column_idx, size_t num_rows); - void applyPrewhere(RowSubgroup & row_subgroup); + void applyPrewhere(RowSubgroup & row_subgroup, const RowGroup & row_group); private: struct BloomFilterLookup : public KeyCondition::BloomFilter diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index e8a4c8fdce91..b935b2259823 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -1,5 +1,10 @@ +#include #include +#include +#include #include +#include +#include #if USE_PARQUET @@ -657,6 +662,45 @@ const parquet::ColumnDescriptor * getColumnDescriptorIfBloomFilterIsPresent( return parquet_column_descriptor; } +void ParquetFileBucketInfo::serialize(WriteBuffer & buffer) +{ + writeVarUInt(row_group_ids.size(), buffer); + for (auto chunk : row_group_ids) + writeVarUInt(chunk, buffer); +} + +void ParquetFileBucketInfo::deserialize(ReadBuffer & buffer) +{ + size_t size_chunks; + readVarUInt(size_chunks, buffer); + row_group_ids = std::vector{}; + row_group_ids.resize(size_chunks); + size_t bucket; + for (size_t i = 0; i < size_chunks; ++i) + { + readVarUInt(bucket, buffer); + row_group_ids[i] = bucket; + } +} + +String ParquetFileBucketInfo::getIdentifier() const +{ + String result; + for (auto chunk : row_group_ids) + result += "_" + std::to_string(chunk); + return result; +} + +ParquetFileBucketInfo::ParquetFileBucketInfo(const std::vector & row_group_ids_) + : row_group_ids(row_group_ids_) +{ +} + +void registerParquetFileBucketInfo(std::unordered_map & instances) +{ + instances.emplace("Parquet", std::make_shared()); +} + ParquetBlockInputFormat::ParquetBlockInputFormat( ReadBuffer & buf, SharedHeader header_, @@ -716,6 +760,17 @@ void ParquetBlockInputFormat::initializeIfNeeded() return; metadata = getFileMetaData(); + + if (buckets_to_read) + { + std::unordered_set set_to_read(buckets_to_read->row_group_ids.begin(), buckets_to_read->row_group_ids.end()); + for (int i = 0; i < metadata->num_row_groups(); ++i) + { + if (!set_to_read.contains(i)) + skip_row_groups.insert(i); + } + } + const bool prefetch_group = io_pool != nullptr; std::shared_ptr schema; @@ -1199,9 +1254,8 @@ void ParquetBlockInputFormat::scheduleMoreWorkIfNeeded(std::optional row { size_t max_decoding_threads = parser_shared_resources->getParsingThreadsPerReader(); while (row_group_batches_started - row_group_batches_completed < max_decoding_threads && - row_group_batches_started < row_group_batches.size()) + row_group_batches_started < row_group_batches.size()) scheduleRowGroup(row_group_batches_started++); - if (row_group_batch_touched) { auto & row_group = row_group_batches[*row_group_batch_touched]; @@ -1297,6 +1351,11 @@ Chunk ParquetBlockInputFormat::read() } } +void ParquetBlockInputFormat::setBucketsToRead(const FileBucketInfoPtr & buckets_to_read_) +{ + buckets_to_read = std::static_pointer_cast(buckets_to_read_); +} + void ParquetBlockInputFormat::resetParser() { is_stopped = true; @@ -1393,8 +1452,53 @@ std::optional ArrowParquetSchemaReader::readNumberOrRows() return metadata->num_rows(); } +std::vector ParquetBucketSplitter::splitToBuckets(size_t bucket_size, ReadBuffer & buf, const FormatSettings & format_settings_) +{ + std::atomic is_stopped = false; + auto arrow_file = asArrowFile(buf, format_settings_, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true, nullptr); + auto metadata = parquet::ReadMetaData(arrow_file); + std::vector bucket_sizes; + for (int i = 0; i < metadata->num_row_groups(); ++i) + bucket_sizes.push_back(metadata->RowGroup(i)->total_byte_size()); + + std::vector> buckets; + size_t current_weight = 0; + for (size_t i = 0; i < bucket_sizes.size(); ++i) + { + if (current_weight + bucket_sizes[i] <= bucket_size) + { + if (buckets.empty()) + buckets.emplace_back(); + buckets.back().push_back(i); + current_weight += bucket_sizes[i]; + } + else + { + current_weight = 0; + buckets.push_back({}); + buckets.back().push_back(i); + current_weight += bucket_sizes[i]; + } + } + + std::vector result; + for (const auto & bucket : buckets) + { + result.push_back(std::make_shared(bucket)); + } + return result; +} + void registerInputFormatParquet(FormatFactory & factory) { + factory.registerFileBucketInfo( + "Parquet", + [] + { + return std::make_shared(); + } + ); + factory.registerRandomAccessInputFormat( "Parquet", [](ReadBuffer & buf, @@ -1437,6 +1541,10 @@ void registerInputFormatParquet(FormatFactory & factory) void registerParquetSchemaReader(FormatFactory & factory) { + factory.registerSplitter("Parquet", [] + { + return std::make_shared(); + }); factory.registerSchemaReader( "Parquet", [](ReadBuffer & buf, const FormatSettings & settings) -> SchemaReaderPtr diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index 1038e1d35037..5c2bf96325a1 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -51,6 +51,28 @@ class ParquetRecordReader; // parallel reading+decoding, instead of using ParallelReadBuffer and ParallelParsingInputFormat. // That's what RandomAccessInputCreator in FormatFactory is about. +struct ParquetFileBucketInfo : public FileBucketInfo +{ + std::vector row_group_ids; + + ParquetFileBucketInfo() = default; + explicit ParquetFileBucketInfo(const std::vector & row_group_ids_); + void serialize(WriteBuffer & buffer) override; + void deserialize(ReadBuffer & buffer) override; + String getIdentifier() const override; + String getFormatName() const override + { + return "Parquet"; + } +}; +using ParquetFileBucketInfoPtr = std::shared_ptr; + +struct ParquetBucketSplitter : public IBucketSplitter +{ + ParquetBucketSplitter() = default; + std::vector splitToBuckets(size_t bucket_size, ReadBuffer & buf, const FormatSettings & format_settings_) override; +}; + class ParquetBlockInputFormat : public IInputFormat { public: @@ -74,6 +96,8 @@ class ParquetBlockInputFormat : public IInputFormat void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override; + void setBucketsToRead(const FileBucketInfoPtr & buckets_to_read_) override; + private: Chunk read() override; @@ -304,7 +328,8 @@ class ParquetBlockInputFormat : public IInputFormat }; const FormatSettings format_settings; - const std::unordered_set & skip_row_groups; + std::unordered_set skip_row_groups; + ParquetFileBucketInfoPtr buckets_to_read; FormatParserSharedResourcesPtr parser_shared_resources; FormatFilterInfoPtr format_filter_info; size_t min_bytes_for_seek; diff --git a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp index fca6fc38b89d..2d2bc2f318f0 100644 --- a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp @@ -1,3 +1,5 @@ +#include +#include #include #if USE_PARQUET @@ -7,10 +9,16 @@ #include #include #include +#include namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + Parquet::ReadOptions convertReadOptions(const FormatSettings & format_settings) { Parquet::ReadOptions options; @@ -84,7 +92,7 @@ void ParquetV3BlockInputFormat::initializeIfNeeded() reader.emplace(); reader->reader.prefetcher.init(in, read_options, parser_shared_resources); reader->reader.init(read_options, getPort().getHeader(), format_filter_info); - reader->init(parser_shared_resources); + reader->init(parser_shared_resources, buckets_to_read ? std::optional(buckets_to_read->row_group_ids) : std::nullopt); } } @@ -114,6 +122,13 @@ Chunk ParquetV3BlockInputFormat::read() return std::move(res.chunk); } +void ParquetV3BlockInputFormat::setBucketsToRead(const FileBucketInfoPtr & buckets_to_read_) +{ + if (reader) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Reader already initialized"); + buckets_to_read = std::static_pointer_cast(buckets_to_read_); +} + const BlockMissingValues * ParquetV3BlockInputFormat::getMissingValues() const { return &previous_block_missing_values; diff --git a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h index 122aaae174ac..ff7c80a6b230 100644 --- a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB { @@ -32,6 +33,8 @@ class ParquetV3BlockInputFormat : public IInputFormat return previous_approx_bytes_read_for_chunk; } + void setBucketsToRead(const FileBucketInfoPtr & buckets_to_read_) override; + private: Chunk read() override; @@ -49,6 +52,7 @@ class ParquetV3BlockInputFormat : public IInputFormat size_t previous_approx_bytes_read_for_chunk = 0; void initializeIfNeeded(); + std::shared_ptr buckets_to_read; }; class NativeParquetSchemaReader : public ISchemaReader diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index eb509c474afe..28d7faf1a765 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -6,7 +6,10 @@ #include #include #include -#include +#include +#include +#include +#include #include #include #include @@ -18,7 +21,6 @@ #include - namespace DataLake { class ICatalog; @@ -89,6 +91,12 @@ using SinkToStoragePtr = std::shared_ptr; class StorageObjectStorageConfiguration; using StorageObjectStorageConfigurationPtr = std::shared_ptr; struct StorageID; +struct IObjectIterator; +struct RelativePathWithMetadata; +class IObjectStorage; +using ObjectIterator = std::shared_ptr; +using ObjectStoragePtr = std::shared_ptr; +using ObjectInfoPtr = std::shared_ptr; class IDataLakeMetadata : boost::noncopyable { diff --git a/src/Storages/ObjectStorage/IObjectIterator.cpp b/src/Storages/ObjectStorage/IObjectIterator.cpp index cea5d709b5ca..08714909e63d 100644 --- a/src/Storages/ObjectStorage/IObjectIterator.cpp +++ b/src/Storages/ObjectStorage/IObjectIterator.cpp @@ -1,10 +1,23 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include +#include namespace DB { +namespace Setting +{ + extern const SettingsUInt64 cluster_table_function_buckets_batch_size; +} + static ExpressionActionsPtr getExpressionActions( const DB::ActionsDAG & filter_, const NamesAndTypesList & virtual_columns_, @@ -63,4 +76,52 @@ ObjectInfoPtr ObjectIteratorWithPathAndFileFilter::next(size_t id) return {}; } +ObjectIteratorSplitByBuckets::ObjectIteratorSplitByBuckets( + ObjectIterator iterator_, + const String & format_, + ObjectStoragePtr object_storage_, + const ContextPtr & context_) + : WithContext(context_) + , iterator(iterator_) + , format(format_) + , object_storage(object_storage_) + , format_settings(getFormatSettings(context_)) +{ +} + +ObjectInfoPtr ObjectIteratorSplitByBuckets::next(size_t id) +{ + if (!pending_objects_info.empty()) + { + auto result = pending_objects_info.front(); + pending_objects_info.pop(); + return result; + } + auto last_object_info = iterator->next(id); + if (!last_object_info) + return {}; + + auto splitter = FormatFactory::instance().getSplitter(format); + /// If there's no splitter for this format (e.g., CSV in archives), return the object as-is + if (!splitter) + return last_object_info; + + auto buffer = createReadBuffer(*last_object_info, object_storage, getContext(), log); + size_t bucket_size = getContext()->getSettingsRef()[Setting::cluster_table_function_buckets_batch_size]; + auto file_bucket_info = splitter->splitToBuckets(bucket_size, *buffer, format_settings); + for (const auto & file_bucket : file_bucket_info) + { + auto copy_object_info = *last_object_info; + copy_object_info.file_bucket_info = file_bucket; + pending_objects_info.push(std::make_shared(copy_object_info)); + } + + if (pending_objects_info.empty()) + return last_object_info; + + auto result = pending_objects_info.front(); + pending_objects_info.pop(); + return result; +} + } diff --git a/src/Storages/ObjectStorage/IObjectIterator.h b/src/Storages/ObjectStorage/IObjectIterator.h index 126abd181910..4ad74b1a76d6 100644 --- a/src/Storages/ObjectStorage/IObjectIterator.h +++ b/src/Storages/ObjectStorage/IObjectIterator.h @@ -1,12 +1,18 @@ #pragma once #include #include +#include +#include +#include +#include namespace DB { using ObjectInfo = PathWithMetadata; using ObjectInfoPtr = std::shared_ptr; +using ObjectInfos = std::vector; + class ExpressionActions; struct IObjectIterator @@ -41,4 +47,29 @@ class ObjectIteratorWithPathAndFileFilter : public IObjectIterator, private With const NamesAndTypesList hive_partition_columns; const std::shared_ptr filter_actions; }; + +class ObjectIteratorSplitByBuckets : public IObjectIterator, private WithContext +{ +public: + ObjectIteratorSplitByBuckets( + ObjectIterator iterator_, + const String & format_, + ObjectStoragePtr object_storage_, + const ContextPtr & context_); + + ObjectInfoPtr next(size_t) override; + size_t estimatedKeysCount() override { return iterator->estimatedKeysCount(); } + std::optional getSnapshotVersion() const override { return iterator->getSnapshotVersion(); } + +private: + const ObjectIterator iterator; + String format; + ObjectStoragePtr object_storage; + FormatSettings format_settings; + + std::queue pending_objects_info; + const LoggerPtr log = getLogger("GlobIterator"); +}; + + } diff --git a/src/Storages/ObjectStorage/ReadBufferIterator.cpp b/src/Storages/ObjectStorage/ReadBufferIterator.cpp index 131b73b343ae..6cef60c3d03b 100644 --- a/src/Storages/ObjectStorage/ReadBufferIterator.cpp +++ b/src/Storages/ObjectStorage/ReadBufferIterator.cpp @@ -7,7 +7,6 @@ #include #include - namespace DB { namespace Setting diff --git a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h index f45cd9725dea..959d3364f7ae 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h @@ -6,7 +6,6 @@ #include #include #include -#include #include #include #include @@ -14,13 +13,21 @@ #include #include #include +#include +#include +#include +#include namespace DB { class NamedCollection; class SinkToStorage; +class IDataLakeMetadata; +struct IObjectIterator; using SinkToStoragePtr = std::shared_ptr; +using ObjectIterator = std::shared_ptr; +using ObjectInfoPtr = std::shared_ptr; namespace ErrorCodes { @@ -275,17 +282,16 @@ class StorageObjectStorageConfiguration return false; } + String format = "auto"; + String compression_method = "auto"; + String structure = "auto"; + PartitionStrategyFactory::StrategyType partition_strategy_type = PartitionStrategyFactory::StrategyType::NONE; std::shared_ptr partition_strategy; /// Whether partition column values are contained in the actual data. /// And alternative is with hive partitioning, when they are contained in file path. bool partition_columns_in_data_file = true; -private: - String format = "auto"; - String compression_method = "auto"; - String structure = "auto"; - protected: bool initialized = false; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index cfc55307dd77..1fc4860831e1 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -30,8 +30,10 @@ #include #include #include +#include #include #include +#include #if ENABLE_DISTRIBUTED_CACHE #include #include @@ -40,6 +42,8 @@ #include #include +#include +#include namespace fs = std::filesystem; namespace ProfileEvents @@ -70,6 +74,7 @@ namespace Setting extern const SettingsBool table_engine_read_through_distributed_cache; extern const SettingsBool use_object_storage_list_objects_cache; extern const SettingsBool allow_experimental_iceberg_read_optimization; + extern const SettingsObjectStorageGranularityLevel cluster_table_function_split_granularity; } namespace ErrorCodes @@ -230,7 +235,7 @@ std::shared_ptr StorageObjectStorageSource::createFileIterator( if (filter_actions_dag) { - return std::make_shared( + iter = std::make_shared( std::move(iter), *filter_actions_dag, virtual_columns, @@ -238,6 +243,15 @@ std::shared_ptr StorageObjectStorageSource::createFileIterator( configuration->getNamespace(), local_context); } + if (local_context->getSettingsRef()[Setting::cluster_table_function_split_granularity] == ObjectStorageGranularityLevel::BUCKET) + { + iter = std::make_shared( + std::move(iter), + configuration->format, + object_storage, + local_context + ); + } return iter; } else @@ -522,7 +536,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade object_info->loadMetadata(object_storage, query_settings.ignore_non_existent_file); } while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0)); - + ProfileEvents::increment(ProfileEvents::ObjectStorageClusterProcessedTasks); ObjectStoragePtr storage_to_use = object_info->getObjectStorage(); @@ -754,6 +768,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade compression_method, need_only_count); + input_format->setBucketsToRead(object_info->file_bucket_info); input_format->setSerializationHints(read_from_format_info.serialization_hints); if (need_only_count) @@ -1333,7 +1348,7 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::ReadTaskIterator // We should use it directly and NOT overwrite relative_path. // Only resolve absolute_path if we need to determine which storage to use (for secondary storages). object_info->object_storage_to_use = object_storage; - + if (raw->absolute_path.has_value()) { auto [storage_to_use, key] @@ -1442,9 +1457,9 @@ StorageObjectStorageSource::ArchiveIterator::createArchiveReader(ObjectInfoPtr o return DB::createArchiveReader( /* path_to_archive */ object_info->getPath(), - /* archive_read_function */ [=, this]() { + /* archive_read_function */ [=, this]() { ObjectStoragePtr storage = object_info->getObjectStorage() ? object_info->getObjectStorage() : object_storage; - return createReadBuffer(*object_info, storage, getContext(), log); + return createReadBuffer(*object_info, storage, getContext(), log); }, /* archive_size */ size); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 82bc2d776b06..aec80ef8e082 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -121,8 +121,8 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t auto next_file = files.back(); files.pop_back(); - auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getAbsolutePath().value_or(next_file->getPath()); - auto it = unprocessed_files.find(file_path); + auto file_identifier = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getIdentifier(); + auto it = unprocessed_files.find(file_identifier); if (it == unprocessed_files.end()) continue; @@ -131,7 +131,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t LOG_TRACE( log, "Assigning pre-queued file {} to replica {}", - file_path, + file_identifier, number_of_current_replica ); @@ -166,25 +166,25 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter } } - String file_path; + String file_identifier; if (send_over_whole_archive && object_info->isArchive()) { - file_path = object_info->getPathOrPathToArchiveIfArchive(); + file_identifier = object_info->getPathOrPathToArchiveIfArchive(); LOG_TEST(log, "Will send over the whole archive {} to replicas. " "This will be suboptimal, consider turning on " - "cluster_function_process_archive_on_multiple_nodes setting", file_path); + "cluster_function_process_archive_on_multiple_nodes setting", file_identifier); } else { - file_path = object_info->getAbsolutePath().value_or(object_info->getPath()); + file_identifier = object_info->getIdentifier(); } - size_t file_replica_idx = getReplicaForFile(file_path); + size_t file_replica_idx = getReplicaForFile(file_identifier); if (file_replica_idx == number_of_current_replica) { LOG_TRACE( log, "Found file {} for replica {}", - file_path, number_of_current_replica + file_identifier, number_of_current_replica ); ProfileEvents::increment(ProfileEvents::ObjectStorageClusterSentToMatchedReplica); @@ -193,7 +193,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter LOG_TEST( log, "Found file {} for replica {} (number of current replica: {})", - file_path, + file_identifier, file_replica_idx, number_of_current_replica ); @@ -201,7 +201,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter // Queue file for its assigned replica { std::lock_guard lock(mutex); - unprocessed_files.emplace(file_path, std::make_pair(object_info, file_replica_idx)); + unprocessed_files.emplace(file_identifier, std::make_pair(object_info, file_replica_idx)); connection_to_files[file_replica_idx].push_back(object_info); } } diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index e0d7158ed44b..bd27f4cf6d4d 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -807,10 +807,10 @@ std::function IStorageURLBase::getReadPOSTDataCallback( namespace { - class ReadBufferIterator : public IReadBufferIterator, WithContext + class URLReadBufferIterator : public IReadBufferIterator, WithContext { public: - ReadBufferIterator( + URLReadBufferIterator( const std::vector & urls_to_check_, std::optional format_, const CompressionMethod & compression_method_, @@ -1054,7 +1054,7 @@ std::pair IStorageURLBase::getTableStructureAndForma else urls_to_check = {uri}; - ReadBufferIterator read_buffer_iterator(urls_to_check, format, compression_method, headers, format_settings, context); + URLReadBufferIterator read_buffer_iterator(urls_to_check, format, compression_method, headers, format_settings, context); if (format) return {readSchemaFromFormat(*format, format_settings, read_buffer_iterator, context), *format}; return detectFormatAndReadSchema(format_settings, read_buffer_iterator, context); diff --git a/tests/integration/helpers/iceberg_utils.py b/tests/integration/helpers/iceberg_utils.py index 5479fdc5a938..c10cce320927 100644 --- a/tests/integration/helpers/iceberg_utils.py +++ b/tests/integration/helpers/iceberg_utils.py @@ -153,10 +153,14 @@ def write_iceberg_from_df( if partition_by is None: df.writeTo(table_name).tableProperty( "format-version", format_version + ).tableProperty( + "write.parquet.row-group-size-bytes", "104850" # 1MB ).using("iceberg").create() else: df.writeTo(table_name).tableProperty( "format-version", format_version + ).tableProperty( + "write.parquet.row-group-size-bytes", "104850" # 1MB ).partitionedBy(partition_by).using("iceberg").create() else: df.writeTo(table_name).append() diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 6bc1963a87b2..3cfb9e9d053d 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -3925,7 +3925,7 @@ def check_validity_and_get_prunned_files(select_expression): ) - + def test_iceberg_write_minmax(started_cluster): instance = started_cluster.instances["node1"] TABLE_NAME = "test_iceberg_write_minmax_" + get_uuid_str() @@ -3939,3 +3939,95 @@ def test_iceberg_write_minmax(started_cluster): res = instance.query(f"SELECT x,y FROM {TABLE_NAME} WHERE y=2 ORDER BY ALL").strip() assert res == "1\t2" + + +@pytest.mark.parametrize("format_version", ["1", "2"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) +@pytest.mark.parametrize("cluster_table_function_buckets_batch_size", [0, 100, 1000]) +@pytest.mark.parametrize("input_format_parquet_use_native_reader_v3", [0, 1]) +def test_cluster_table_function_split_by_row_groups(started_cluster, format_version, storage_type, cluster_table_function_buckets_batch_size,input_format_parquet_use_native_reader_v3): + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + + TABLE_NAME = ( + "test_iceberg_cluster_" + + format_version + + "_" + + storage_type + + "_" + + get_uuid_str() + ) + + def add_df(mode): + write_iceberg_from_df( + spark, + generate_data(spark, 0, 100000), + TABLE_NAME, + mode=mode, + format_version=format_version, + ) + + files = default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", + ) + + logging.info(f"Adding another dataframe. result files: {files}") + + return files + + files = add_df(mode="overwrite") + for i in range(1, 5 * len(started_cluster.instances)): + files = add_df(mode="append") + + clusters = instance.query(f"SELECT * FROM system.clusters") + logging.info(f"Clusters setup: {clusters}") + + # Regular Query only node1 + table_function_expr = get_creation_expression( + storage_type, TABLE_NAME, started_cluster, table_function=True + ) + select_regular = ( + instance.query(f"SELECT * FROM {table_function_expr} ORDER BY ALL").strip().split() + ) + + # Cluster Query with node1 as coordinator + table_function_expr_cluster = get_creation_expression( + storage_type, + TABLE_NAME, + started_cluster, + table_function=True, + run_on_cluster=True, + ) + instance.query("SYSTEM FLUSH LOGS") + + def get_buffers_count(func): + buffers_count_before = int( + instance.query( + f"SELECT sum(ProfileEvents['EngineFileLikeReadFiles']) FROM system.query_log WHERE type = 'QueryFinish'" + ) + ) + + func() + instance.query("SYSTEM FLUSH LOGS") + buffers_count = int( + instance.query( + f"SELECT sum(ProfileEvents['EngineFileLikeReadFiles']) FROM system.query_log WHERE type = 'QueryFinish'" + ) + ) + return buffers_count - buffers_count_before + + select_cluster = ( + instance.query(f"SELECT * FROM {table_function_expr_cluster} ORDER BY ALL SETTINGS input_format_parquet_use_native_reader_v3={input_format_parquet_use_native_reader_v3},cluster_table_function_split_granularity='bucket', cluster_table_function_buckets_batch_size={cluster_table_function_buckets_batch_size}").strip().split() + ) + + # Simple size check + assert len(select_cluster) == len(select_regular) + # Actual check + assert select_cluster == select_regular + + buffers_count_with_splitted_tasks = get_buffers_count(lambda: instance.query(f"SELECT * FROM {table_function_expr_cluster} ORDER BY ALL SETTINGS input_format_parquet_use_native_reader_v3={input_format_parquet_use_native_reader_v3},cluster_table_function_split_granularity='bucket', cluster_table_function_buckets_batch_size={cluster_table_function_buckets_batch_size}").strip().split()) + buffers_count_default = get_buffers_count(lambda: instance.query(f"SELECT * FROM {table_function_expr_cluster} ORDER BY ALL SETTINGS input_format_parquet_use_native_reader_v3={input_format_parquet_use_native_reader_v3}, cluster_table_function_buckets_batch_size={cluster_table_function_buckets_batch_size}").strip().split()) + assert buffers_count_with_splitted_tasks > buffers_count_default