From 948b085308841c19368dbb35227b7949b2f46b47 Mon Sep 17 00:00:00 2001 From: scanhex12 <75157521+scanhex12@users.noreply.github.com> Date: Fri, 14 Nov 2025 21:51:57 +0000 Subject: [PATCH 1/7] Merge pull request #87508 from scanhex12/distributed_execution_better_spread Distributed execution: better split tasks --- src/Core/ProtocolDefines.h | 6 +- src/Core/Settings.cpp | 13 ++ src/Core/Settings.h | 4 +- src/Core/SettingsChangesHistory.cpp | 88 ++++++++++++++ src/Core/SettingsEnums.cpp | 6 + src/Core/SettingsEnums.h | 8 ++ src/Disks/ObjectStorages/IObjectStorage.h | 1 + src/Formats/FormatFactory.cpp | 29 +++++ src/Formats/FormatFactory.h | 13 ++ src/Interpreters/ClusterFunctionReadTask.cpp | 34 +++++- src/Interpreters/ClusterFunctionReadTask.h | 10 ++ src/Processors/Formats/IInputFormat.cpp | 9 ++ src/Processors/Formats/IInputFormat.h | 26 ++++ .../Formats/Impl/Parquet/ReadManager.cpp | 17 ++- .../Formats/Impl/Parquet/ReadManager.h | 4 +- .../Formats/Impl/Parquet/Reader.cpp | 13 +- src/Processors/Formats/Impl/Parquet/Reader.h | 5 +- .../Formats/Impl/ParquetBlockInputFormat.cpp | 112 +++++++++++++++++- .../Formats/Impl/ParquetBlockInputFormat.h | 27 ++++- .../Impl/ParquetV3BlockInputFormat.cpp | 17 ++- .../Formats/Impl/ParquetV3BlockInputFormat.h | 4 + .../DataLakes/IDataLakeMetadata.h | 11 +- .../ObjectStorage/IObjectIterator.cpp | 13 ++ src/Storages/ObjectStorage/IObjectIterator.h | 31 +++++ .../ObjectStorage/ReadBufferIterator.cpp | 1 - .../StorageObjectStorageConfiguration.h | 8 +- .../StorageObjectStorageSource.cpp | 25 +++- ...rageObjectStorageStableTaskDistributor.cpp | 22 ++-- tests/integration/helpers/iceberg_utils.py | 4 + 29 files changed, 521 insertions(+), 40 deletions(-) diff --git a/src/Core/ProtocolDefines.h b/src/Core/ProtocolDefines.h index 95258a2b320c..a27c99714ea7 100644 --- a/src/Core/ProtocolDefines.h +++ b/src/Core/ProtocolDefines.h @@ -35,8 +35,10 @@ static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54 static constexpr auto DBMS_CLUSTER_INITIAL_PROCESSING_PROTOCOL_VERSION = 1; static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_METADATA = 2; -static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA = 3; -static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 3; +static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_METADATA = 3; +static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO = 4; +static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA = 5; +static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO; static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3; static constexpr auto DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD = 4; diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 2f0f02f63491..88550adb0dda 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6789,6 +6789,19 @@ Both database and table names have to be unquoted - only simple identifiers are )", 0) \ DECLARE(Bool, allow_general_join_planning, true, R"( Allows a more general join planning algorithm that can handle more complex conditions, but only works with hash join. If hash join is not enabled, then the usual join planning algorithm is used regardless of the value of this setting. +)", 0) \ + DECLARE(ObjectStorageGranularityLevel, cluster_table_function_split_granularity, ObjectStorageGranularityLevel::FILE, R"( +Controls how data is split into tasks when executing a CLUSTER TABLE FUNCTION. + +This setting defines the granularity of work distribution across the cluster: +- `file` — each task processes an entire file. +- `bucket` — tasks are created per internal data block within a file (for example, Parquet row groups). + +Choosing finer granularity (like `bucket`) can improve parallelism when working with a small number of large files. +For instance, if a Parquet file contains multiple row groups, enabling `bucket` granularity allows each group to be processed independently by different workers. +)", 0) \ + DECLARE(UInt64, cluster_table_function_buckets_batch_size, 0, R"( +Defines the approximate size of a batch (in bytes) used in distributed processing of tasks in cluster table functions with `bucket` split granularity. The system accumulates data until at least this amount is reached. The actual size may be slightly larger to align with data boundaries. )", 0) \ DECLARE(UInt64, merge_table_max_tables_to_look_for_schema_inference, 1000, R"( When creating a `Merge` table without an explicit schema or when using the `merge` table function, infer schema as a union of not more than the specified number of matching tables. diff --git a/src/Core/Settings.h b/src/Core/Settings.h index c70767e21aca..22ffb47c9402 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -109,7 +109,9 @@ class WriteBuffer; M(CLASS_NAME, UInt64Auto) \ M(CLASS_NAME, URI) \ M(CLASS_NAME, VectorSearchFilterStrategy) \ - M(CLASS_NAME, GeoToH3ArgumentOrder) + M(CLASS_NAME, GeoToH3ArgumentOrder) \ + M(CLASS_NAME, ObjectStorageGranularityLevel) \ + M(CLASS_NAME, DecorrelationJoinKind) COMMON_SETTINGS_SUPPORTED_TYPES(Settings, DECLARE_SETTING_TRAIT) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 09bdf3a3bcb7..5acca102d8f0 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -58,6 +58,94 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}, {"export_merge_tree_part_max_bytes_per_file", 0, 0, "New setting."}, {"export_merge_tree_part_max_rows_per_file", 0, 0, "New setting."}, + {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."} + {"cluster_table_function_split_granularity", "file", "file", "New setting."}, + {"cluster_table_function_buckets_batch_size", 0, 0, "New setting."}, + {"arrow_flight_request_descriptor_type", "path", "path", "New setting. Type of descriptor to use for Arrow Flight requests: 'path' or 'command'. Dremio requires 'command'."}, + {"send_profile_events", true, true, "New setting. Whether to send profile events to the clients."}, + {"into_outfile_create_parent_directories", false, false, "New setting"}, + {"correlated_subqueries_default_join_kind", "left", "right", "New setting. Default join kind for decorrelated query plan."}, + {"use_statistics_cache", 0, 0, "New setting"}, + {"input_format_parquet_use_native_reader_v3", false, true, "Seems stable"}, + {"max_projection_rows_to_use_projection_index", 1'000'000, 1'000'000, "New setting"}, + {"min_table_rows_to_use_projection_index", 1'000'000, 1'000'000, "New setting"}, + {"use_text_index_dictionary_cache", false, false, "New setting"}, + {"use_text_index_header_cache", false, false, "New setting"}, + {"use_text_index_postings_cache", false, false, "New setting"}, + {"s3_retry_attempts", 500, 500, "Changed the value of the obsolete setting"}, + {"http_write_exception_in_output_format", true, false, "Changed for consistency across formats"}, + {"optimize_const_name_size", -1, 256, "Replace with scalar and use hash as a name for large constants (size is estimated by name length)"}, + {"enable_lazy_columns_replication", false, true, "Enable lazy columns replication in JOIN and ARRAY JOIN by default"}, + {"allow_special_serialization_kinds_in_output_formats", false, true, "Enable direct output of special columns representations like Sparse/Replicated in some output formats"}, + {"allow_experimental_alias_table_engine", false, false, "New setting"}, + {"input_format_parquet_local_time_as_utc", false, true, "Use more appropriate type DateTime64(..., 'UTC') for parquet 'local time without timezone' type."}, + {"input_format_parquet_verify_checksums", true, true, "New setting."}, + {"output_format_parquet_write_checksums", false, true, "New setting."}, + {"database_shared_drop_table_delay_seconds", 8 * 60 * 60, 8 * 60 * 60, "New setting."}, + {"filesystem_cache_allow_background_download", true, true, "New setting to control background downloads in filesystem cache per query."}, + }); + addSettingsChanges(settings_changes_history, "25.10", + { + {"allow_special_serialization_kinds_in_output_formats", false, false, "Add a setting to allow output of special columns representations like Sparse/Replicated without converting them to full columns"}, + {"enable_lazy_columns_replication", false, false, "Add a setting to enable lazy columns replication in JOIN and ARRAY JOIN"}, + {"correlated_subqueries_default_join_kind", "left", "right", "New setting. Default join kind for decorrelated query plan."}, + {"show_data_lake_catalogs_in_system_tables", true, false, "Disable catalogs in system tables by default"}, + {"optimize_rewrite_like_perfect_affix", false, true, "New setting"}, + {"allow_dynamic_type_in_join_keys", true, false, "Disallow using Dynamic type in JOIN keys by default"}, + {"s3queue_keeper_fault_injection_probability", 0, 0, "New setting."}, + {"enable_join_runtime_filters", false, false, "New setting"}, + {"join_runtime_filter_exact_values_limit", 10000, 10000, "New setting"}, + {"join_runtime_bloom_filter_bytes", 512_KiB, 512_KiB, "New setting"}, + {"join_runtime_bloom_filter_hash_functions", 3, 3, "New setting"}, + {"use_join_disjunctions_push_down", false, false, "New setting."}, + {"joined_block_split_single_row", false, false, "New setting"}, + {"temporary_files_buffer_size", DBMS_DEFAULT_BUFFER_SIZE, DBMS_DEFAULT_BUFFER_SIZE, "New setting"}, + {"rewrite_in_to_join", false, false, "New experimental setting"}, + {"delta_lake_log_metadata", false, false, "New setting."}, + {"distributed_cache_prefer_bigger_buffer_size", false, false, "New setting."}, + {"allow_experimental_qbit_type", false, false, "New experimental setting"}, + {"optimize_qbit_distance_function_reads", true, true, "New setting"}, + {"read_from_distributed_cache_if_exists_otherwise_bypass_cache", false, false, "New setting"}, + {"s3_slow_all_threads_after_retryable_error", false, false, "Disable the setting by default"}, + {"backup_slow_all_threads_after_retryable_s3_error", false, false, "Disable the setting by default"}, + {"enable_http_compression", false, true, "It should be beneficial in general"}, + {"inject_random_order_for_select_without_order_by", false, false, "New setting"}, + {"exclude_materialize_skip_indexes_on_insert", "", "", "New setting."}, + {"optimize_empty_string_comparisons", false, true, "A new setting."}, + {"query_plan_use_logical_join_step", true, true, "Added alias"}, + {"schema_inference_make_columns_nullable", 1, 3, "Take nullability information from Parquet/ORC/Arrow metadata by default, instead of making everything nullable."}, + {"materialized_views_squash_parallel_inserts", false, true, "Added setting to preserve old behavior if needed."}, + {"distributed_cache_connect_timeout_ms", 50, 50, "New setting"}, + {"distributed_cache_receive_timeout_ms", 3000, 3000, "New setting"}, + {"distributed_cache_send_timeout_ms", 3000, 3000, "New setting"}, + {"distributed_cache_tcp_keep_alive_timeout_ms", 2900, 2900, "New setting"}, + }); + addSettingsChanges(settings_changes_history, "25.9", + { + {"input_format_protobuf_oneof_presence", false, false, "New setting"}, + {"iceberg_delete_data_on_drop", false, false, "New setting"}, + {"use_skip_indexes_on_data_read", false, false, "New setting"}, + {"s3_slow_all_threads_after_retryable_error", false, false, "Added an alias for setting `backup_slow_all_threads_after_retryable_s3_error`"}, + {"iceberg_metadata_log_level", "none", "none", "New setting."}, + {"iceberg_insert_max_rows_in_data_file", 1000000, 1000000, "New setting."}, + {"iceberg_insert_max_bytes_in_data_file", 1_GiB, 1_GiB, "New setting."}, + {"query_plan_optimize_join_order_limit", 1, 1, "New setting"}, + {"query_plan_display_internal_aliases", false, false, "New setting"}, + {"query_plan_max_step_description_length", 1000000000, 500, "New setting"}, + {"allow_experimental_delta_lake_writes", false, false, "New setting."}, + {"query_plan_convert_any_join_to_semi_or_anti_join", true, true, "New setting."}, + {"text_index_use_bloom_filter", true, true, "New setting."}, + {"query_plan_direct_read_from_text_index", true, true, "New setting."}, + {"enable_producing_buckets_out_of_order_in_aggregation", false, true, "New setting"}, + {"jemalloc_enable_profiler", false, false, "New setting"}, + {"jemalloc_collect_profile_samples_in_trace_log", false, false, "New setting"}, + {"delta_lake_insert_max_bytes_in_data_file", 1_GiB, 1_GiB, "New setting."}, + {"delta_lake_insert_max_rows_in_data_file", 1000000, 1000000, "New setting."}, + {"promql_evaluation_time", Field{"auto"}, Field{"auto"}, "The setting was renamed. The previous name is `evaluation_time`."}, + {"evaluation_time", 0, 0, "Old setting which popped up here being renamed."}, + {"os_threads_nice_value_query", 0, 0, "New setting."}, + {"os_threads_nice_value_materialized_view", 0, 0, "New setting."}, + {"os_thread_priority", 0, 0, "Alias for os_threads_nice_value_query."}, }); addSettingsChanges(settings_changes_history, "25.8", { diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index 2fb4f1668ed4..6dd69453d703 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -372,4 +372,10 @@ IMPLEMENT_SETTING_ENUM( {"manifest_file_entry", IcebergMetadataLogLevel::ManifestFileEntry}}) IMPLEMENT_SETTING_AUTO_ENUM(MergeTreePartExportFileAlreadyExistsPolicy, ErrorCodes::BAD_ARGUMENTS); + +IMPLEMENT_SETTING_ENUM( + ObjectStorageGranularityLevel, + ErrorCodes::BAD_ARGUMENTS, + {{"file", ObjectStorageGranularityLevel::FILE}, + {"bucket", ObjectStorageGranularityLevel::BUCKET}}) } diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index 935e0ee8b615..225358d7541d 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -481,6 +481,14 @@ enum class IcebergMetadataLogLevel : uint8_t DECLARE_SETTING_ENUM(IcebergMetadataLogLevel) +enum class ObjectStorageGranularityLevel : uint8_t +{ + FILE = 0, + BUCKET = 1, +}; + +DECLARE_SETTING_ENUM(ObjectStorageGranularityLevel) + enum class MergeTreePartExportFileAlreadyExistsPolicy : uint8_t { skip, diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index 292a8de13537..bcb4ecaf8bbe 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -29,6 +29,7 @@ #include #include +#include #include #include diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 4b955510a608..2e89173e05b8 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -374,6 +374,20 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se return format_settings; } +FileBucketInfoPtr FormatFactory::getFileBucketInfo(const String & format) +{ + auto creator = getCreators(format); + return creator.file_bucket_info_creator(); +} + +void FormatFactory::registerFileBucketInfo(const String & format, FileBucketInfoCreator bucket_info) +{ + chassert(bucket_info); + auto & creators = getOrCreateCreators(format); + if (creators.file_bucket_info_creator) + throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Bucket splitter for format {} is already registered", format); + creators.file_bucket_info_creator = std::move(bucket_info); +} InputFormatPtr FormatFactory::getInput( const String & name, @@ -694,6 +708,21 @@ void FormatFactory::registerInputFormat(const String & name, InputCreator input_ KnownFormatNames::instance().add(name, /* case_insensitive = */ true); } +void FormatFactory::registerSplitter(const String & format, BucketSplitterCreator splitter) +{ + chassert(splitter); + auto & creators = getOrCreateCreators(format); + if (creators.bucket_splitter_creator) + throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Bucket splitter for format {} is already registered", format); + creators.bucket_splitter_creator = std::move(splitter); +} + +BucketSplitter FormatFactory::getSplitter(const String & format) +{ + auto creator = getCreators(format); + return creator.bucket_splitter_creator(); +} + void FormatFactory::registerRandomAccessInputFormat(const String & name, RandomAccessInputCreator input_creator) { chassert(input_creator); diff --git a/src/Formats/FormatFactory.h b/src/Formats/FormatFactory.h index fb8204b7fcb3..1aab5426a319 100644 --- a/src/Formats/FormatFactory.h +++ b/src/Formats/FormatFactory.h @@ -10,6 +10,8 @@ #include #include +#include + #include #include @@ -94,6 +96,10 @@ class FormatFactory final : private boost::noncopyable const RowInputFormatParams & params, const FormatSettings & settings)>; + using FileBucketInfoCreator = std::function; + + using BucketSplitterCreator = std::function; + // Incompatible with FileSegmentationEngine. using RandomAccessInputCreator = std::function #include #include +#include #include +#include +#include namespace DB { @@ -40,6 +43,7 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o const bool send_over_whole_archive = !context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes]; path = send_over_whole_archive ? object->getPathOrPathToArchiveIfArchive() : object->getPath(); absolute_path = object->getAbsolutePath(); + file_bucket_info = object->file_bucket_info; } } @@ -58,7 +62,9 @@ ObjectInfoPtr ClusterFunctionReadTaskResponse::getObjectInfo() const object->file_meta_info = file_meta_info; if (absolute_path.has_value() && !absolute_path.value().empty()) object->absolute_path = absolute_path; - + + object->file_bucket_info = file_bucket_info; + return object; } @@ -76,6 +82,21 @@ void ClusterFunctionReadTaskResponse::serialize(WriteBuffer & out, size_t protoc ActionsDAG().serialize(out, registry); } + if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO) + { + if (file_bucket_info) + { + /// Write format name so we can create appropriate file bucket info during deserialization. + writeStringBinary(file_bucket_info->getFormatName(), out); + file_bucket_info->serialize(out); + } + else + { + /// Write empty string as format name if file_bucket_info is not set. + writeStringBinary("", out); + } + } + if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA) { /// This info is not used when optimization is disabled, so there is no need to send it. @@ -111,6 +132,17 @@ void ClusterFunctionReadTaskResponse::deserialize(ReadBuffer & in) } } + if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO) + { + String format; + readStringBinary(format, in); + if (!format.empty()) + { + file_bucket_info = FormatFactory::instance().getFileBucketInfo(format); + file_bucket_info->deserialize(in); + } + } + if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA) { auto info = std::make_shared(DataFileMetaInfo::deserialize(in)); diff --git a/src/Interpreters/ClusterFunctionReadTask.h b/src/Interpreters/ClusterFunctionReadTask.h index b5e2123cbc4f..8b77fbe846f8 100644 --- a/src/Interpreters/ClusterFunctionReadTask.h +++ b/src/Interpreters/ClusterFunctionReadTask.h @@ -1,5 +1,11 @@ #pragma once #include +<<<<<<< HEAD +======= +#include +#include +#include +>>>>>>> 4bed2ad0c69 (Merge pull request #87508 from scanhex12/distributed_execution_better_spread) #include #include @@ -18,8 +24,12 @@ struct ClusterFunctionReadTaskResponse /// Data path (object path, in case of object storage). String path; + /// Absolute path (including storage type prefix). std::optional absolute_path; + + FileBucketInfoPtr file_bucket_info; + /// Object metadata path, in case of data lake object. DataLakeObjectMetadata data_lake_metadata; /// File's columns info diff --git a/src/Processors/Formats/IInputFormat.cpp b/src/Processors/Formats/IInputFormat.cpp index 4311e8bcaf97..3ed63fdbbaa2 100644 --- a/src/Processors/Formats/IInputFormat.cpp +++ b/src/Processors/Formats/IInputFormat.cpp @@ -1,7 +1,11 @@ +#include #include #include #include #include +#include +#include +#include namespace DB { @@ -70,4 +74,9 @@ void IInputFormat::onFinish() { resetReadBuffer(); } + +void IInputFormat::setBucketsToRead(const FileBucketInfoPtr & /*buckets_to_read*/) +{ +} + } diff --git a/src/Processors/Formats/IInputFormat.h b/src/Processors/Formats/IInputFormat.h index cb5feea18646..bf0eccf2fa88 100644 --- a/src/Processors/Formats/IInputFormat.h +++ b/src/Processors/Formats/IInputFormat.h @@ -4,6 +4,8 @@ #include #include #include +#include +#include #include #include #include @@ -46,6 +48,29 @@ struct ChunkInfoRowNumbers : public ChunkInfo std::optional applied_filter; }; +/// Structure for storing information about buckets that IInputFormat needs to read. +struct FileBucketInfo +{ + virtual void serialize(WriteBuffer & buffer) = 0; + virtual void deserialize(ReadBuffer & buffer) = 0; + virtual String getIdentifier() const = 0; + virtual String getFormatName() const = 0; + + virtual ~FileBucketInfo() = default; +}; +using FileBucketInfoPtr = std::shared_ptr; + +/// Interface for splitting a file into buckets. +struct IBucketSplitter +{ + /// Splits a file into buckets using the given read buffer and format settings. + /// Returns information about the resulting buckets (see the structure above for details). + virtual std::vector splitToBuckets(size_t bucket_size, ReadBuffer & buf, const FormatSettings & format_settings_) = 0; + + virtual ~IBucketSplitter() = default; +}; +using BucketSplitter = std::shared_ptr; + /** Input format is a source, that reads data from ReadBuffer. */ class IInputFormat : public ISource @@ -69,6 +94,7 @@ class IInputFormat : public ISource /// All data reading from the read buffer must be performed by this method. virtual Chunk read() = 0; + virtual void setBucketsToRead(const FileBucketInfoPtr & buckets_to_read); /** In some usecase (hello Kafka) we need to read a lot of tiny streams in exactly the same format. * The recreating of parser for each small stream takes too long, so we introduce a method * resetParser() which allow to reset the state of parser to continue reading of diff --git a/src/Processors/Formats/Impl/Parquet/ReadManager.cpp b/src/Processors/Formats/Impl/Parquet/ReadManager.cpp index 0cea3ce8e407..fc6a8c3d75c5 100644 --- a/src/Processors/Formats/Impl/Parquet/ReadManager.cpp +++ b/src/Processors/Formats/Impl/Parquet/ReadManager.cpp @@ -6,7 +6,9 @@ #include #include +#include #include +#include namespace DB::ErrorCodes { @@ -41,11 +43,18 @@ std::optional AtomicBitSet::findFirst() return std::nullopt; } -void ReadManager::init(FormatParserSharedResourcesPtr parser_shared_resources_) +void ReadManager::init(FormatParserSharedResourcesPtr parser_shared_resources_, const std::optional> & buckets_to_read_) { parser_shared_resources = parser_shared_resources_; reader.file_metadata = Reader::readFileMetaData(reader.prefetcher); - reader.prefilterAndInitRowGroups(); + + if (buckets_to_read_) + { + row_groups_to_read = std::unordered_set{}; + for (auto rg : *buckets_to_read_) + row_groups_to_read->insert(rg); + } + reader.prefilterAndInitRowGroups(row_groups_to_read); reader.preparePrewhere(); ProfileEvents::increment(ProfileEvents::ParquetReadRowGroups, reader.row_groups.size()); @@ -337,8 +346,8 @@ void ReadManager::finishRowSubgroupStage(size_t row_group_idx, size_t row_subgro case ReadStage::PrewhereData: { chassert(!reader.prewhere_steps.empty()); - reader.applyPrewhere(row_subgroup); - size_t prev = row_group.prewhere_ptr.exchange(row_subgroup_idx + 1); + reader.applyPrewhere(row_subgroup, row_group); + size_t prev = row_group.prewhere_ptr.exchange(row_subgroup_idx + 1); /// NOLINT(clang-analyzer-deadcode.DeadStores) chassert(prev == row_subgroup_idx); if (row_subgroup_idx + 1 < row_group.subgroups.size()) { diff --git a/src/Processors/Formats/Impl/Parquet/ReadManager.h b/src/Processors/Formats/Impl/Parquet/ReadManager.h index 291f81a2cb4a..113d4430ee2a 100644 --- a/src/Processors/Formats/Impl/Parquet/ReadManager.h +++ b/src/Processors/Formats/Impl/Parquet/ReadManager.h @@ -40,7 +40,7 @@ class ReadManager /// (I'm trying this style because the usual pattern of passing-through lots of arguments through /// layers of constructors seems bad. This seems better but still not great, hopefully there's an /// even better way.) - void init(FormatParserSharedResourcesPtr parser_shared_resources_); + void init(FormatParserSharedResourcesPtr parser_shared_resources_, const std::optional> & buckets_to_read_); ~ReadManager(); @@ -109,6 +109,8 @@ class ReadManager std::priority_queue, Task::Comparator> delivery_queue; std::condition_variable delivery_cv; std::exception_ptr exception; + /// Nullopt means that ReadManager reads all row groups + std::optional> row_groups_to_read; void scheduleTask(Task task, bool is_first_in_group, MemoryUsageDiff & diff, std::vector & out_tasks); void runTask(Task task, bool last_in_batch, MemoryUsageDiff & diff); diff --git a/src/Processors/Formats/Impl/Parquet/Reader.cpp b/src/Processors/Formats/Impl/Parquet/Reader.cpp index fdd0a433d789..5f2ecbbb1140 100644 --- a/src/Processors/Formats/Impl/Parquet/Reader.cpp +++ b/src/Processors/Formats/Impl/Parquet/Reader.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #if USE_SNAPPY @@ -262,7 +263,7 @@ void Reader::getHyperrectangleForRowGroup(const parq::RowGroup * meta, Hyperrect } } -void Reader::prefilterAndInitRowGroups() +void Reader::prefilterAndInitRowGroups(const std::optional> & row_groups_to_read) { extended_sample_block = *sample_block; for (const auto & col : format_filter_info->additional_columns) @@ -330,6 +331,7 @@ void Reader::prefilterAndInitRowGroups() RowGroup & row_group = row_groups.emplace_back(); row_group.meta = meta; + row_group.need_to_process = !row_groups_to_read.has_value() || row_groups_to_read->contains(row_group_idx); row_group.row_group_idx = row_group_idx; row_group.start_global_row_idx = total_rows - size_t(meta->num_rows); row_group.columns.resize(primitive_columns.size()); @@ -1016,8 +1018,8 @@ void Reader::intersectColumnIndexResultsAndInitSubgroups(RowGroup & row_group) RowSubgroup & row_subgroup = row_group.subgroups.emplace_back(); row_subgroup.start_row_idx = substart; - row_subgroup.filter.rows_pass = subend - substart; - row_subgroup.filter.rows_total = row_subgroup.filter.rows_pass; + row_subgroup.filter.rows_pass = row_group.need_to_process ? subend - substart : 0; + row_subgroup.filter.rows_total = subend - substart; row_subgroup.columns.resize(primitive_columns.size()); row_subgroup.output.resize(extended_sample_block.columns()); @@ -1025,7 +1027,6 @@ void Reader::intersectColumnIndexResultsAndInitSubgroups(RowGroup & row_group) row_subgroup.block_missing_values.init(sample_block->columns()); } } - row_group.intersected_row_ranges_after_column_index = std::move(row_ranges); } @@ -1939,7 +1940,7 @@ MutableColumnPtr Reader::formOutputColumn(RowSubgroup & row_subgroup, size_t out return res; } -void Reader::applyPrewhere(RowSubgroup & row_subgroup) +void Reader::applyPrewhere(RowSubgroup & row_subgroup, const RowGroup & row_group) { for (size_t step_idx = 0; step_idx < prewhere_steps.size(); ++step_idx) { @@ -1990,7 +1991,7 @@ void Reader::applyPrewhere(RowSubgroup & row_subgroup) chassert(filter.size() == row_subgroup.filter.rows_pass); size_t rows_pass = countBytesInFilter(filter.data(), 0, filter.size()); - if (rows_pass == 0) + if (rows_pass == 0 || !row_group.need_to_process) { /// Whole row group was filtered out. row_subgroup.filter.rows_pass = 0; diff --git a/src/Processors/Formats/Impl/Parquet/Reader.h b/src/Processors/Formats/Impl/Parquet/Reader.h index 899d4371b0c4..b132ecc39046 100644 --- a/src/Processors/Formats/Impl/Parquet/Reader.h +++ b/src/Processors/Formats/Impl/Parquet/Reader.h @@ -387,6 +387,7 @@ struct Reader size_t row_group_idx; // in parquet file size_t start_global_row_idx = 0; // total number of rows in preceding row groups in the file + bool need_to_process = false; /// Parallel to Reader::primitive_columns. /// NOT parallel to `meta.columns` (it's a subset of parquet columns). std::vector columns; @@ -470,7 +471,7 @@ struct Reader void init(const ReadOptions & options_, const Block & sample_block_, FormatFilterInfoPtr format_filter_info_); static parq::FileMetaData readFileMetaData(Prefetcher & prefetcher); - void prefilterAndInitRowGroups(); + void prefilterAndInitRowGroups(const std::optional> & row_groups_to_read); void preparePrewhere(); /// Deserialize bf header and determine which bf blocks to read. @@ -502,7 +503,7 @@ struct Reader /// is not called again for the moved-out columns. MutableColumnPtr formOutputColumn(RowSubgroup & row_subgroup, size_t output_column_idx, size_t num_rows); - void applyPrewhere(RowSubgroup & row_subgroup); + void applyPrewhere(RowSubgroup & row_subgroup, const RowGroup & row_group); private: struct BloomFilterLookup : public KeyCondition::BloomFilter diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index e8a4c8fdce91..b935b2259823 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -1,5 +1,10 @@ +#include #include +#include +#include #include +#include +#include #if USE_PARQUET @@ -657,6 +662,45 @@ const parquet::ColumnDescriptor * getColumnDescriptorIfBloomFilterIsPresent( return parquet_column_descriptor; } +void ParquetFileBucketInfo::serialize(WriteBuffer & buffer) +{ + writeVarUInt(row_group_ids.size(), buffer); + for (auto chunk : row_group_ids) + writeVarUInt(chunk, buffer); +} + +void ParquetFileBucketInfo::deserialize(ReadBuffer & buffer) +{ + size_t size_chunks; + readVarUInt(size_chunks, buffer); + row_group_ids = std::vector{}; + row_group_ids.resize(size_chunks); + size_t bucket; + for (size_t i = 0; i < size_chunks; ++i) + { + readVarUInt(bucket, buffer); + row_group_ids[i] = bucket; + } +} + +String ParquetFileBucketInfo::getIdentifier() const +{ + String result; + for (auto chunk : row_group_ids) + result += "_" + std::to_string(chunk); + return result; +} + +ParquetFileBucketInfo::ParquetFileBucketInfo(const std::vector & row_group_ids_) + : row_group_ids(row_group_ids_) +{ +} + +void registerParquetFileBucketInfo(std::unordered_map & instances) +{ + instances.emplace("Parquet", std::make_shared()); +} + ParquetBlockInputFormat::ParquetBlockInputFormat( ReadBuffer & buf, SharedHeader header_, @@ -716,6 +760,17 @@ void ParquetBlockInputFormat::initializeIfNeeded() return; metadata = getFileMetaData(); + + if (buckets_to_read) + { + std::unordered_set set_to_read(buckets_to_read->row_group_ids.begin(), buckets_to_read->row_group_ids.end()); + for (int i = 0; i < metadata->num_row_groups(); ++i) + { + if (!set_to_read.contains(i)) + skip_row_groups.insert(i); + } + } + const bool prefetch_group = io_pool != nullptr; std::shared_ptr schema; @@ -1199,9 +1254,8 @@ void ParquetBlockInputFormat::scheduleMoreWorkIfNeeded(std::optional row { size_t max_decoding_threads = parser_shared_resources->getParsingThreadsPerReader(); while (row_group_batches_started - row_group_batches_completed < max_decoding_threads && - row_group_batches_started < row_group_batches.size()) + row_group_batches_started < row_group_batches.size()) scheduleRowGroup(row_group_batches_started++); - if (row_group_batch_touched) { auto & row_group = row_group_batches[*row_group_batch_touched]; @@ -1297,6 +1351,11 @@ Chunk ParquetBlockInputFormat::read() } } +void ParquetBlockInputFormat::setBucketsToRead(const FileBucketInfoPtr & buckets_to_read_) +{ + buckets_to_read = std::static_pointer_cast(buckets_to_read_); +} + void ParquetBlockInputFormat::resetParser() { is_stopped = true; @@ -1393,8 +1452,53 @@ std::optional ArrowParquetSchemaReader::readNumberOrRows() return metadata->num_rows(); } +std::vector ParquetBucketSplitter::splitToBuckets(size_t bucket_size, ReadBuffer & buf, const FormatSettings & format_settings_) +{ + std::atomic is_stopped = false; + auto arrow_file = asArrowFile(buf, format_settings_, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true, nullptr); + auto metadata = parquet::ReadMetaData(arrow_file); + std::vector bucket_sizes; + for (int i = 0; i < metadata->num_row_groups(); ++i) + bucket_sizes.push_back(metadata->RowGroup(i)->total_byte_size()); + + std::vector> buckets; + size_t current_weight = 0; + for (size_t i = 0; i < bucket_sizes.size(); ++i) + { + if (current_weight + bucket_sizes[i] <= bucket_size) + { + if (buckets.empty()) + buckets.emplace_back(); + buckets.back().push_back(i); + current_weight += bucket_sizes[i]; + } + else + { + current_weight = 0; + buckets.push_back({}); + buckets.back().push_back(i); + current_weight += bucket_sizes[i]; + } + } + + std::vector result; + for (const auto & bucket : buckets) + { + result.push_back(std::make_shared(bucket)); + } + return result; +} + void registerInputFormatParquet(FormatFactory & factory) { + factory.registerFileBucketInfo( + "Parquet", + [] + { + return std::make_shared(); + } + ); + factory.registerRandomAccessInputFormat( "Parquet", [](ReadBuffer & buf, @@ -1437,6 +1541,10 @@ void registerInputFormatParquet(FormatFactory & factory) void registerParquetSchemaReader(FormatFactory & factory) { + factory.registerSplitter("Parquet", [] + { + return std::make_shared(); + }); factory.registerSchemaReader( "Parquet", [](ReadBuffer & buf, const FormatSettings & settings) -> SchemaReaderPtr diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index 1038e1d35037..5c2bf96325a1 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -51,6 +51,28 @@ class ParquetRecordReader; // parallel reading+decoding, instead of using ParallelReadBuffer and ParallelParsingInputFormat. // That's what RandomAccessInputCreator in FormatFactory is about. +struct ParquetFileBucketInfo : public FileBucketInfo +{ + std::vector row_group_ids; + + ParquetFileBucketInfo() = default; + explicit ParquetFileBucketInfo(const std::vector & row_group_ids_); + void serialize(WriteBuffer & buffer) override; + void deserialize(ReadBuffer & buffer) override; + String getIdentifier() const override; + String getFormatName() const override + { + return "Parquet"; + } +}; +using ParquetFileBucketInfoPtr = std::shared_ptr; + +struct ParquetBucketSplitter : public IBucketSplitter +{ + ParquetBucketSplitter() = default; + std::vector splitToBuckets(size_t bucket_size, ReadBuffer & buf, const FormatSettings & format_settings_) override; +}; + class ParquetBlockInputFormat : public IInputFormat { public: @@ -74,6 +96,8 @@ class ParquetBlockInputFormat : public IInputFormat void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override; + void setBucketsToRead(const FileBucketInfoPtr & buckets_to_read_) override; + private: Chunk read() override; @@ -304,7 +328,8 @@ class ParquetBlockInputFormat : public IInputFormat }; const FormatSettings format_settings; - const std::unordered_set & skip_row_groups; + std::unordered_set skip_row_groups; + ParquetFileBucketInfoPtr buckets_to_read; FormatParserSharedResourcesPtr parser_shared_resources; FormatFilterInfoPtr format_filter_info; size_t min_bytes_for_seek; diff --git a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp index fca6fc38b89d..2d2bc2f318f0 100644 --- a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp @@ -1,3 +1,5 @@ +#include +#include #include #if USE_PARQUET @@ -7,10 +9,16 @@ #include #include #include +#include namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + Parquet::ReadOptions convertReadOptions(const FormatSettings & format_settings) { Parquet::ReadOptions options; @@ -84,7 +92,7 @@ void ParquetV3BlockInputFormat::initializeIfNeeded() reader.emplace(); reader->reader.prefetcher.init(in, read_options, parser_shared_resources); reader->reader.init(read_options, getPort().getHeader(), format_filter_info); - reader->init(parser_shared_resources); + reader->init(parser_shared_resources, buckets_to_read ? std::optional(buckets_to_read->row_group_ids) : std::nullopt); } } @@ -114,6 +122,13 @@ Chunk ParquetV3BlockInputFormat::read() return std::move(res.chunk); } +void ParquetV3BlockInputFormat::setBucketsToRead(const FileBucketInfoPtr & buckets_to_read_) +{ + if (reader) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Reader already initialized"); + buckets_to_read = std::static_pointer_cast(buckets_to_read_); +} + const BlockMissingValues * ParquetV3BlockInputFormat::getMissingValues() const { return &previous_block_missing_values; diff --git a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h index 122aaae174ac..ff7c80a6b230 100644 --- a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB { @@ -32,6 +33,8 @@ class ParquetV3BlockInputFormat : public IInputFormat return previous_approx_bytes_read_for_chunk; } + void setBucketsToRead(const FileBucketInfoPtr & buckets_to_read_) override; + private: Chunk read() override; @@ -49,6 +52,7 @@ class ParquetV3BlockInputFormat : public IInputFormat size_t previous_approx_bytes_read_for_chunk = 0; void initializeIfNeeded(); + std::shared_ptr buckets_to_read; }; class NativeParquetSchemaReader : public ISchemaReader diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index eb509c474afe..cc18a6dfe318 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -7,6 +7,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -18,7 +21,6 @@ #include - namespace DataLake { class ICatalog; @@ -89,6 +91,13 @@ using SinkToStoragePtr = std::shared_ptr; class StorageObjectStorageConfiguration; using StorageObjectStorageConfigurationPtr = std::shared_ptr; struct StorageID; +struct IObjectIterator; +struct RelativePathWithMetadata; +class IObjectStorage; +struct ObjectInfo; +using ObjectInfoPtr = std::shared_ptr; +using ObjectIterator = std::shared_ptr; +using ObjectStoragePtr = std::shared_ptr; class IDataLakeMetadata : boost::noncopyable { diff --git a/src/Storages/ObjectStorage/IObjectIterator.cpp b/src/Storages/ObjectStorage/IObjectIterator.cpp index cea5d709b5ca..d4de76ebe5c7 100644 --- a/src/Storages/ObjectStorage/IObjectIterator.cpp +++ b/src/Storages/ObjectStorage/IObjectIterator.cpp @@ -1,10 +1,23 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include +#include namespace DB { +namespace Setting +{ + extern const SettingsUInt64 cluster_table_function_buckets_batch_size; +} + static ExpressionActionsPtr getExpressionActions( const DB::ActionsDAG & filter_, const NamesAndTypesList & virtual_columns_, diff --git a/src/Storages/ObjectStorage/IObjectIterator.h b/src/Storages/ObjectStorage/IObjectIterator.h index 126abd181910..4ad74b1a76d6 100644 --- a/src/Storages/ObjectStorage/IObjectIterator.h +++ b/src/Storages/ObjectStorage/IObjectIterator.h @@ -1,12 +1,18 @@ #pragma once #include #include +#include +#include +#include +#include namespace DB { using ObjectInfo = PathWithMetadata; using ObjectInfoPtr = std::shared_ptr; +using ObjectInfos = std::vector; + class ExpressionActions; struct IObjectIterator @@ -41,4 +47,29 @@ class ObjectIteratorWithPathAndFileFilter : public IObjectIterator, private With const NamesAndTypesList hive_partition_columns; const std::shared_ptr filter_actions; }; + +class ObjectIteratorSplitByBuckets : public IObjectIterator, private WithContext +{ +public: + ObjectIteratorSplitByBuckets( + ObjectIterator iterator_, + const String & format_, + ObjectStoragePtr object_storage_, + const ContextPtr & context_); + + ObjectInfoPtr next(size_t) override; + size_t estimatedKeysCount() override { return iterator->estimatedKeysCount(); } + std::optional getSnapshotVersion() const override { return iterator->getSnapshotVersion(); } + +private: + const ObjectIterator iterator; + String format; + ObjectStoragePtr object_storage; + FormatSettings format_settings; + + std::queue pending_objects_info; + const LoggerPtr log = getLogger("GlobIterator"); +}; + + } diff --git a/src/Storages/ObjectStorage/ReadBufferIterator.cpp b/src/Storages/ObjectStorage/ReadBufferIterator.cpp index 131b73b343ae..6cef60c3d03b 100644 --- a/src/Storages/ObjectStorage/ReadBufferIterator.cpp +++ b/src/Storages/ObjectStorage/ReadBufferIterator.cpp @@ -7,7 +7,6 @@ #include #include - namespace DB { namespace Setting diff --git a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h index f45cd9725dea..471e8b5eea40 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h @@ -6,7 +6,6 @@ #include #include #include -#include #include #include #include @@ -14,13 +13,20 @@ #include #include #include +#include +#include +#include +#include namespace DB { class NamedCollection; class SinkToStorage; +class IDataLakeMetadata; +struct IObjectIterator; using SinkToStoragePtr = std::shared_ptr; +using ObjectIterator = std::shared_ptr; namespace ErrorCodes { diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index cfc55307dd77..1fc4860831e1 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -30,8 +30,10 @@ #include #include #include +#include #include #include +#include #if ENABLE_DISTRIBUTED_CACHE #include #include @@ -40,6 +42,8 @@ #include #include +#include +#include namespace fs = std::filesystem; namespace ProfileEvents @@ -70,6 +74,7 @@ namespace Setting extern const SettingsBool table_engine_read_through_distributed_cache; extern const SettingsBool use_object_storage_list_objects_cache; extern const SettingsBool allow_experimental_iceberg_read_optimization; + extern const SettingsObjectStorageGranularityLevel cluster_table_function_split_granularity; } namespace ErrorCodes @@ -230,7 +235,7 @@ std::shared_ptr StorageObjectStorageSource::createFileIterator( if (filter_actions_dag) { - return std::make_shared( + iter = std::make_shared( std::move(iter), *filter_actions_dag, virtual_columns, @@ -238,6 +243,15 @@ std::shared_ptr StorageObjectStorageSource::createFileIterator( configuration->getNamespace(), local_context); } + if (local_context->getSettingsRef()[Setting::cluster_table_function_split_granularity] == ObjectStorageGranularityLevel::BUCKET) + { + iter = std::make_shared( + std::move(iter), + configuration->format, + object_storage, + local_context + ); + } return iter; } else @@ -522,7 +536,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade object_info->loadMetadata(object_storage, query_settings.ignore_non_existent_file); } while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0)); - + ProfileEvents::increment(ProfileEvents::ObjectStorageClusterProcessedTasks); ObjectStoragePtr storage_to_use = object_info->getObjectStorage(); @@ -754,6 +768,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade compression_method, need_only_count); + input_format->setBucketsToRead(object_info->file_bucket_info); input_format->setSerializationHints(read_from_format_info.serialization_hints); if (need_only_count) @@ -1333,7 +1348,7 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::ReadTaskIterator // We should use it directly and NOT overwrite relative_path. // Only resolve absolute_path if we need to determine which storage to use (for secondary storages). object_info->object_storage_to_use = object_storage; - + if (raw->absolute_path.has_value()) { auto [storage_to_use, key] @@ -1442,9 +1457,9 @@ StorageObjectStorageSource::ArchiveIterator::createArchiveReader(ObjectInfoPtr o return DB::createArchiveReader( /* path_to_archive */ object_info->getPath(), - /* archive_read_function */ [=, this]() { + /* archive_read_function */ [=, this]() { ObjectStoragePtr storage = object_info->getObjectStorage() ? object_info->getObjectStorage() : object_storage; - return createReadBuffer(*object_info, storage, getContext(), log); + return createReadBuffer(*object_info, storage, getContext(), log); }, /* archive_size */ size); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 82bc2d776b06..aec80ef8e082 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -121,8 +121,8 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t auto next_file = files.back(); files.pop_back(); - auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getAbsolutePath().value_or(next_file->getPath()); - auto it = unprocessed_files.find(file_path); + auto file_identifier = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getIdentifier(); + auto it = unprocessed_files.find(file_identifier); if (it == unprocessed_files.end()) continue; @@ -131,7 +131,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t LOG_TRACE( log, "Assigning pre-queued file {} to replica {}", - file_path, + file_identifier, number_of_current_replica ); @@ -166,25 +166,25 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter } } - String file_path; + String file_identifier; if (send_over_whole_archive && object_info->isArchive()) { - file_path = object_info->getPathOrPathToArchiveIfArchive(); + file_identifier = object_info->getPathOrPathToArchiveIfArchive(); LOG_TEST(log, "Will send over the whole archive {} to replicas. " "This will be suboptimal, consider turning on " - "cluster_function_process_archive_on_multiple_nodes setting", file_path); + "cluster_function_process_archive_on_multiple_nodes setting", file_identifier); } else { - file_path = object_info->getAbsolutePath().value_or(object_info->getPath()); + file_identifier = object_info->getIdentifier(); } - size_t file_replica_idx = getReplicaForFile(file_path); + size_t file_replica_idx = getReplicaForFile(file_identifier); if (file_replica_idx == number_of_current_replica) { LOG_TRACE( log, "Found file {} for replica {}", - file_path, number_of_current_replica + file_identifier, number_of_current_replica ); ProfileEvents::increment(ProfileEvents::ObjectStorageClusterSentToMatchedReplica); @@ -193,7 +193,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter LOG_TEST( log, "Found file {} for replica {} (number of current replica: {})", - file_path, + file_identifier, file_replica_idx, number_of_current_replica ); @@ -201,7 +201,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter // Queue file for its assigned replica { std::lock_guard lock(mutex); - unprocessed_files.emplace(file_path, std::make_pair(object_info, file_replica_idx)); + unprocessed_files.emplace(file_identifier, std::make_pair(object_info, file_replica_idx)); connection_to_files[file_replica_idx].push_back(object_info); } } diff --git a/tests/integration/helpers/iceberg_utils.py b/tests/integration/helpers/iceberg_utils.py index 5479fdc5a938..c10cce320927 100644 --- a/tests/integration/helpers/iceberg_utils.py +++ b/tests/integration/helpers/iceberg_utils.py @@ -153,10 +153,14 @@ def write_iceberg_from_df( if partition_by is None: df.writeTo(table_name).tableProperty( "format-version", format_version + ).tableProperty( + "write.parquet.row-group-size-bytes", "104850" # 1MB ).using("iceberg").create() else: df.writeTo(table_name).tableProperty( "format-version", format_version + ).tableProperty( + "write.parquet.row-group-size-bytes", "104850" # 1MB ).partitionedBy(partition_by).using("iceberg").create() else: df.writeTo(table_name).append() From 3303aa4283510b38a8bfd77a3386704f5cd153bc Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Wed, 17 Dec 2025 00:01:00 +0100 Subject: [PATCH 2/7] fix buld squashed --- src/Core/Settings.h | 3 +- src/Disks/ObjectStorages/IObjectStorage.h | 10 ++ src/Interpreters/ClusterFunctionReadTask.h | 3 - .../DataLakes/IDataLakeMetadata.h | 5 +- .../ObjectStorage/IObjectIterator.cpp | 45 +++++++++ .../StorageObjectStorageConfiguration.h | 10 +- src/Storages/StorageURL.cpp | 6 +- .../integration/test_storage_iceberg/test.py | 94 ++++++++++++++++++- 8 files changed, 159 insertions(+), 17 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 22ffb47c9402..49d50d9ce3b5 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -110,8 +110,7 @@ class WriteBuffer; M(CLASS_NAME, URI) \ M(CLASS_NAME, VectorSearchFilterStrategy) \ M(CLASS_NAME, GeoToH3ArgumentOrder) \ - M(CLASS_NAME, ObjectStorageGranularityLevel) \ - M(CLASS_NAME, DecorrelationJoinKind) + M(CLASS_NAME, ObjectStorageGranularityLevel) COMMON_SETTINGS_SUPPORTED_TYPES(Settings, DECLARE_SETTING_TRAIT) diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index bcb4ecaf8bbe..0a5f20e2b8a9 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -150,6 +150,8 @@ struct PathWithMetadata std::optional absolute_path; ObjectStoragePtr object_storage_to_use = nullptr; + FileBucketInfoPtr file_bucket_info; + PathWithMetadata() = default; explicit PathWithMetadata( @@ -189,6 +191,14 @@ struct PathWithMetadata void loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file = true); ObjectStoragePtr getObjectStorage() const { return object_storage_to_use; } + + String getIdentifier() const + { + String result = absolute_path.value_or(relative_path); + if (file_bucket_info) + result += file_bucket_info->getIdentifier(); + return result; + } }; struct ObjectKeyWithMetadata diff --git a/src/Interpreters/ClusterFunctionReadTask.h b/src/Interpreters/ClusterFunctionReadTask.h index 8b77fbe846f8..7e6af60424f8 100644 --- a/src/Interpreters/ClusterFunctionReadTask.h +++ b/src/Interpreters/ClusterFunctionReadTask.h @@ -1,11 +1,8 @@ #pragma once #include -<<<<<<< HEAD -======= #include #include #include ->>>>>>> 4bed2ad0c69 (Merge pull request #87508 from scanhex12/distributed_execution_better_spread) #include #include diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index cc18a6dfe318..28d7faf1a765 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -6,7 +6,7 @@ #include #include #include -#include +#include #include #include #include @@ -94,10 +94,9 @@ struct StorageID; struct IObjectIterator; struct RelativePathWithMetadata; class IObjectStorage; -struct ObjectInfo; -using ObjectInfoPtr = std::shared_ptr; using ObjectIterator = std::shared_ptr; using ObjectStoragePtr = std::shared_ptr; +using ObjectInfoPtr = std::shared_ptr; class IDataLakeMetadata : boost::noncopyable { diff --git a/src/Storages/ObjectStorage/IObjectIterator.cpp b/src/Storages/ObjectStorage/IObjectIterator.cpp index d4de76ebe5c7..a1da8033147d 100644 --- a/src/Storages/ObjectStorage/IObjectIterator.cpp +++ b/src/Storages/ObjectStorage/IObjectIterator.cpp @@ -76,4 +76,49 @@ ObjectInfoPtr ObjectIteratorWithPathAndFileFilter::next(size_t id) return {}; } +ObjectIteratorSplitByBuckets::ObjectIteratorSplitByBuckets( + ObjectIterator iterator_, + const String & format_, + ObjectStoragePtr object_storage_, + const ContextPtr & context_) + : WithContext(context_) + , iterator(iterator_) + , format(format_) + , object_storage(object_storage_) + , format_settings(getFormatSettings(context_)) +{ +} + +ObjectInfoPtr ObjectIteratorSplitByBuckets::next(size_t id) +{ + if (!pending_objects_info.empty()) + { + auto result = pending_objects_info.front(); + pending_objects_info.pop(); + return result; + } + auto last_object_info = iterator->next(id); + if (!last_object_info) + return {}; + + auto buffer = createReadBuffer(*last_object_info, object_storage, getContext(), log); + + auto splitter = FormatFactory::instance().getSplitter(format); + if (splitter) + { + size_t bucket_size = getContext()->getSettingsRef()[Setting::cluster_table_function_buckets_batch_size]; + auto file_bucket_info = splitter->splitToBuckets(bucket_size, *buffer, format_settings); + for (const auto & file_bucket : file_bucket_info) + { + auto copy_object_info = *last_object_info; + copy_object_info.file_bucket_info = file_bucket; + pending_objects_info.push(std::make_shared(copy_object_info)); + } + } + + auto result = pending_objects_info.front(); + pending_objects_info.pop(); + return result; +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h index 471e8b5eea40..959d3364f7ae 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h @@ -27,6 +27,7 @@ class IDataLakeMetadata; struct IObjectIterator; using SinkToStoragePtr = std::shared_ptr; using ObjectIterator = std::shared_ptr; +using ObjectInfoPtr = std::shared_ptr; namespace ErrorCodes { @@ -281,17 +282,16 @@ class StorageObjectStorageConfiguration return false; } + String format = "auto"; + String compression_method = "auto"; + String structure = "auto"; + PartitionStrategyFactory::StrategyType partition_strategy_type = PartitionStrategyFactory::StrategyType::NONE; std::shared_ptr partition_strategy; /// Whether partition column values are contained in the actual data. /// And alternative is with hive partitioning, when they are contained in file path. bool partition_columns_in_data_file = true; -private: - String format = "auto"; - String compression_method = "auto"; - String structure = "auto"; - protected: bool initialized = false; diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index e0d7158ed44b..bd27f4cf6d4d 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -807,10 +807,10 @@ std::function IStorageURLBase::getReadPOSTDataCallback( namespace { - class ReadBufferIterator : public IReadBufferIterator, WithContext + class URLReadBufferIterator : public IReadBufferIterator, WithContext { public: - ReadBufferIterator( + URLReadBufferIterator( const std::vector & urls_to_check_, std::optional format_, const CompressionMethod & compression_method_, @@ -1054,7 +1054,7 @@ std::pair IStorageURLBase::getTableStructureAndForma else urls_to_check = {uri}; - ReadBufferIterator read_buffer_iterator(urls_to_check, format, compression_method, headers, format_settings, context); + URLReadBufferIterator read_buffer_iterator(urls_to_check, format, compression_method, headers, format_settings, context); if (format) return {readSchemaFromFormat(*format, format_settings, read_buffer_iterator, context), *format}; return detectFormatAndReadSchema(format_settings, read_buffer_iterator, context); diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 6bc1963a87b2..cfffc4304bad 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -3925,7 +3925,7 @@ def check_validity_and_get_prunned_files(select_expression): ) - + def test_iceberg_write_minmax(started_cluster): instance = started_cluster.instances["node1"] TABLE_NAME = "test_iceberg_write_minmax_" + get_uuid_str() @@ -3939,3 +3939,95 @@ def test_iceberg_write_minmax(started_cluster): res = instance.query(f"SELECT x,y FROM {TABLE_NAME} WHERE y=2 ORDER BY ALL").strip() assert res == "1\t2" + + +@pytest.mark.parametrize("format_version", ["1", "2"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) +@pytest.mark.parametrize("cluster_table_function_buckets_batch_size", [0, 100, 1000]) +@pytest.mark.parametrize("input_format_parquet_use_native_reader_v3", [0, 1]) +def test_cluster_table_function_split_by_row_groups(started_cluster_iceberg_with_spark, format_version, storage_type, cluster_table_function_buckets_batch_size,input_format_parquet_use_native_reader_v3): + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + + TABLE_NAME = ( + "test_iceberg_cluster_" + + format_version + + "_" + + storage_type + + "_" + + get_uuid_str() + ) + + def add_df(mode): + write_iceberg_from_df( + spark, + generate_data(spark, 0, 100000), + TABLE_NAME, + mode=mode, + format_version=format_version, + ) + + files = default_upload_directory( + started_cluster_iceberg_with_spark, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", + ) + + logging.info(f"Adding another dataframe. result files: {files}") + + return files + + files = add_df(mode="overwrite") + for i in range(1, 5 * len(started_cluster_iceberg_with_spark.instances)): + files = add_df(mode="append") + + clusters = instance.query(f"SELECT * FROM system.clusters") + logging.info(f"Clusters setup: {clusters}") + + # Regular Query only node1 + table_function_expr = get_creation_expression( + storage_type, TABLE_NAME, started_cluster_iceberg_with_spark, table_function=True + ) + select_regular = ( + instance.query(f"SELECT * FROM {table_function_expr} ORDER BY ALL").strip().split() + ) + + # Cluster Query with node1 as coordinator + table_function_expr_cluster = get_creation_expression( + storage_type, + TABLE_NAME, + started_cluster_iceberg_with_spark, + table_function=True, + run_on_cluster=True, + ) + instance.query("SYSTEM FLUSH LOGS") + + def get_buffers_count(func): + buffers_count_before = int( + instance.query( + f"SELECT sum(ProfileEvents['EngineFileLikeReadFiles']) FROM system.query_log WHERE type = 'QueryFinish'" + ) + ) + + func() + instance.query("SYSTEM FLUSH LOGS") + buffers_count = int( + instance.query( + f"SELECT sum(ProfileEvents['EngineFileLikeReadFiles']) FROM system.query_log WHERE type = 'QueryFinish'" + ) + ) + return buffers_count - buffers_count_before + + select_cluster = ( + instance.query(f"SELECT * FROM {table_function_expr_cluster} ORDER BY ALL SETTINGS input_format_parquet_use_native_reader_v3={input_format_parquet_use_native_reader_v3},cluster_table_function_split_granularity='bucket', cluster_table_function_buckets_batch_size={cluster_table_function_buckets_batch_size}").strip().split() + ) + + # Simple size check + assert len(select_cluster) == len(select_regular) + # Actual check + assert select_cluster == select_regular + + buffers_count_with_splitted_tasks = get_buffers_count(lambda: instance.query(f"SELECT * FROM {table_function_expr_cluster} ORDER BY ALL SETTINGS input_format_parquet_use_native_reader_v3={input_format_parquet_use_native_reader_v3},cluster_table_function_split_granularity='bucket', cluster_table_function_buckets_batch_size={cluster_table_function_buckets_batch_size}").strip().split()) + buffers_count_default = get_buffers_count(lambda: instance.query(f"SELECT * FROM {table_function_expr_cluster} ORDER BY ALL SETTINGS input_format_parquet_use_native_reader_v3={input_format_parquet_use_native_reader_v3}, cluster_table_function_buckets_batch_size={cluster_table_function_buckets_batch_size}").strip().split()) + assert buffers_count_with_splitted_tasks > buffers_count_default From 95525d8ebb67f278b2c63d1d67fd26f19da294d3 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Thu, 18 Dec 2025 10:01:12 +0100 Subject: [PATCH 3/7] fix settingschangeshistory --- src/Core/SettingsChangesHistory.cpp | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 5acca102d8f0..8b70b8a6913d 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -61,28 +61,6 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."} {"cluster_table_function_split_granularity", "file", "file", "New setting."}, {"cluster_table_function_buckets_batch_size", 0, 0, "New setting."}, - {"arrow_flight_request_descriptor_type", "path", "path", "New setting. Type of descriptor to use for Arrow Flight requests: 'path' or 'command'. Dremio requires 'command'."}, - {"send_profile_events", true, true, "New setting. Whether to send profile events to the clients."}, - {"into_outfile_create_parent_directories", false, false, "New setting"}, - {"correlated_subqueries_default_join_kind", "left", "right", "New setting. Default join kind for decorrelated query plan."}, - {"use_statistics_cache", 0, 0, "New setting"}, - {"input_format_parquet_use_native_reader_v3", false, true, "Seems stable"}, - {"max_projection_rows_to_use_projection_index", 1'000'000, 1'000'000, "New setting"}, - {"min_table_rows_to_use_projection_index", 1'000'000, 1'000'000, "New setting"}, - {"use_text_index_dictionary_cache", false, false, "New setting"}, - {"use_text_index_header_cache", false, false, "New setting"}, - {"use_text_index_postings_cache", false, false, "New setting"}, - {"s3_retry_attempts", 500, 500, "Changed the value of the obsolete setting"}, - {"http_write_exception_in_output_format", true, false, "Changed for consistency across formats"}, - {"optimize_const_name_size", -1, 256, "Replace with scalar and use hash as a name for large constants (size is estimated by name length)"}, - {"enable_lazy_columns_replication", false, true, "Enable lazy columns replication in JOIN and ARRAY JOIN by default"}, - {"allow_special_serialization_kinds_in_output_formats", false, true, "Enable direct output of special columns representations like Sparse/Replicated in some output formats"}, - {"allow_experimental_alias_table_engine", false, false, "New setting"}, - {"input_format_parquet_local_time_as_utc", false, true, "Use more appropriate type DateTime64(..., 'UTC') for parquet 'local time without timezone' type."}, - {"input_format_parquet_verify_checksums", true, true, "New setting."}, - {"output_format_parquet_write_checksums", false, true, "New setting."}, - {"database_shared_drop_table_delay_seconds", 8 * 60 * 60, 8 * 60 * 60, "New setting."}, - {"filesystem_cache_allow_background_download", true, true, "New setting to control background downloads in filesystem cache per query."}, }); addSettingsChanges(settings_changes_history, "25.10", { From 062b835e12043552b4ba9c453bd8d3c3b64de7e4 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Thu, 18 Dec 2025 16:11:20 +0100 Subject: [PATCH 4/7] fix settingschangeshistory again --- src/Core/SettingsChangesHistory.cpp | 63 ----------------------------- 1 file changed, 63 deletions(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 8b70b8a6913d..ff517e556e6b 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -62,69 +62,6 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"cluster_table_function_split_granularity", "file", "file", "New setting."}, {"cluster_table_function_buckets_batch_size", 0, 0, "New setting."}, }); - addSettingsChanges(settings_changes_history, "25.10", - { - {"allow_special_serialization_kinds_in_output_formats", false, false, "Add a setting to allow output of special columns representations like Sparse/Replicated without converting them to full columns"}, - {"enable_lazy_columns_replication", false, false, "Add a setting to enable lazy columns replication in JOIN and ARRAY JOIN"}, - {"correlated_subqueries_default_join_kind", "left", "right", "New setting. Default join kind for decorrelated query plan."}, - {"show_data_lake_catalogs_in_system_tables", true, false, "Disable catalogs in system tables by default"}, - {"optimize_rewrite_like_perfect_affix", false, true, "New setting"}, - {"allow_dynamic_type_in_join_keys", true, false, "Disallow using Dynamic type in JOIN keys by default"}, - {"s3queue_keeper_fault_injection_probability", 0, 0, "New setting."}, - {"enable_join_runtime_filters", false, false, "New setting"}, - {"join_runtime_filter_exact_values_limit", 10000, 10000, "New setting"}, - {"join_runtime_bloom_filter_bytes", 512_KiB, 512_KiB, "New setting"}, - {"join_runtime_bloom_filter_hash_functions", 3, 3, "New setting"}, - {"use_join_disjunctions_push_down", false, false, "New setting."}, - {"joined_block_split_single_row", false, false, "New setting"}, - {"temporary_files_buffer_size", DBMS_DEFAULT_BUFFER_SIZE, DBMS_DEFAULT_BUFFER_SIZE, "New setting"}, - {"rewrite_in_to_join", false, false, "New experimental setting"}, - {"delta_lake_log_metadata", false, false, "New setting."}, - {"distributed_cache_prefer_bigger_buffer_size", false, false, "New setting."}, - {"allow_experimental_qbit_type", false, false, "New experimental setting"}, - {"optimize_qbit_distance_function_reads", true, true, "New setting"}, - {"read_from_distributed_cache_if_exists_otherwise_bypass_cache", false, false, "New setting"}, - {"s3_slow_all_threads_after_retryable_error", false, false, "Disable the setting by default"}, - {"backup_slow_all_threads_after_retryable_s3_error", false, false, "Disable the setting by default"}, - {"enable_http_compression", false, true, "It should be beneficial in general"}, - {"inject_random_order_for_select_without_order_by", false, false, "New setting"}, - {"exclude_materialize_skip_indexes_on_insert", "", "", "New setting."}, - {"optimize_empty_string_comparisons", false, true, "A new setting."}, - {"query_plan_use_logical_join_step", true, true, "Added alias"}, - {"schema_inference_make_columns_nullable", 1, 3, "Take nullability information from Parquet/ORC/Arrow metadata by default, instead of making everything nullable."}, - {"materialized_views_squash_parallel_inserts", false, true, "Added setting to preserve old behavior if needed."}, - {"distributed_cache_connect_timeout_ms", 50, 50, "New setting"}, - {"distributed_cache_receive_timeout_ms", 3000, 3000, "New setting"}, - {"distributed_cache_send_timeout_ms", 3000, 3000, "New setting"}, - {"distributed_cache_tcp_keep_alive_timeout_ms", 2900, 2900, "New setting"}, - }); - addSettingsChanges(settings_changes_history, "25.9", - { - {"input_format_protobuf_oneof_presence", false, false, "New setting"}, - {"iceberg_delete_data_on_drop", false, false, "New setting"}, - {"use_skip_indexes_on_data_read", false, false, "New setting"}, - {"s3_slow_all_threads_after_retryable_error", false, false, "Added an alias for setting `backup_slow_all_threads_after_retryable_s3_error`"}, - {"iceberg_metadata_log_level", "none", "none", "New setting."}, - {"iceberg_insert_max_rows_in_data_file", 1000000, 1000000, "New setting."}, - {"iceberg_insert_max_bytes_in_data_file", 1_GiB, 1_GiB, "New setting."}, - {"query_plan_optimize_join_order_limit", 1, 1, "New setting"}, - {"query_plan_display_internal_aliases", false, false, "New setting"}, - {"query_plan_max_step_description_length", 1000000000, 500, "New setting"}, - {"allow_experimental_delta_lake_writes", false, false, "New setting."}, - {"query_plan_convert_any_join_to_semi_or_anti_join", true, true, "New setting."}, - {"text_index_use_bloom_filter", true, true, "New setting."}, - {"query_plan_direct_read_from_text_index", true, true, "New setting."}, - {"enable_producing_buckets_out_of_order_in_aggregation", false, true, "New setting"}, - {"jemalloc_enable_profiler", false, false, "New setting"}, - {"jemalloc_collect_profile_samples_in_trace_log", false, false, "New setting"}, - {"delta_lake_insert_max_bytes_in_data_file", 1_GiB, 1_GiB, "New setting."}, - {"delta_lake_insert_max_rows_in_data_file", 1000000, 1000000, "New setting."}, - {"promql_evaluation_time", Field{"auto"}, Field{"auto"}, "The setting was renamed. The previous name is `evaluation_time`."}, - {"evaluation_time", 0, 0, "Old setting which popped up here being renamed."}, - {"os_threads_nice_value_query", 0, 0, "New setting."}, - {"os_threads_nice_value_materialized_view", 0, 0, "New setting."}, - {"os_thread_priority", 0, 0, "Alias for os_threads_nice_value_query."}, - }); addSettingsChanges(settings_changes_history, "25.8", { {"output_format_json_quote_64bit_integers", true, false, "Disable quoting of the 64 bit integers in JSON by default"}, From 09a42664f414aec98f316c6c27d17b9521b4fd97 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Fri, 19 Dec 2025 18:28:53 +0100 Subject: [PATCH 5/7] fix test --- tests/integration/test_storage_iceberg/test.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index cfffc4304bad..3cfb9e9d053d 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -3945,9 +3945,9 @@ def test_iceberg_write_minmax(started_cluster): @pytest.mark.parametrize("storage_type", ["s3", "azure"]) @pytest.mark.parametrize("cluster_table_function_buckets_batch_size", [0, 100, 1000]) @pytest.mark.parametrize("input_format_parquet_use_native_reader_v3", [0, 1]) -def test_cluster_table_function_split_by_row_groups(started_cluster_iceberg_with_spark, format_version, storage_type, cluster_table_function_buckets_batch_size,input_format_parquet_use_native_reader_v3): - instance = started_cluster_iceberg_with_spark.instances["node1"] - spark = started_cluster_iceberg_with_spark.spark_session +def test_cluster_table_function_split_by_row_groups(started_cluster, format_version, storage_type, cluster_table_function_buckets_batch_size,input_format_parquet_use_native_reader_v3): + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session TABLE_NAME = ( "test_iceberg_cluster_" @@ -3968,7 +3968,7 @@ def add_df(mode): ) files = default_upload_directory( - started_cluster_iceberg_with_spark, + started_cluster, storage_type, f"/iceberg_data/default/{TABLE_NAME}/", f"/iceberg_data/default/{TABLE_NAME}/", @@ -3979,7 +3979,7 @@ def add_df(mode): return files files = add_df(mode="overwrite") - for i in range(1, 5 * len(started_cluster_iceberg_with_spark.instances)): + for i in range(1, 5 * len(started_cluster.instances)): files = add_df(mode="append") clusters = instance.query(f"SELECT * FROM system.clusters") @@ -3987,7 +3987,7 @@ def add_df(mode): # Regular Query only node1 table_function_expr = get_creation_expression( - storage_type, TABLE_NAME, started_cluster_iceberg_with_spark, table_function=True + storage_type, TABLE_NAME, started_cluster, table_function=True ) select_regular = ( instance.query(f"SELECT * FROM {table_function_expr} ORDER BY ALL").strip().split() @@ -3997,7 +3997,7 @@ def add_df(mode): table_function_expr_cluster = get_creation_expression( storage_type, TABLE_NAME, - started_cluster_iceberg_with_spark, + started_cluster, table_function=True, run_on_cluster=True, ) From 2b7210436016404896f00419fc67b1f983d4550e Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Fri, 19 Dec 2025 21:54:36 +0100 Subject: [PATCH 6/7] fix fix build --- src/Core/SettingsChangesHistory.cpp | 2 +- src/Disks/ObjectStorages/IObjectStorage.h | 2 +- .../ObjectStorage/IObjectIterator.cpp | 25 +++++++++++-------- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index ff517e556e6b..238dc1e3a7e2 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -58,7 +58,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}, {"export_merge_tree_part_max_bytes_per_file", 0, 0, "New setting."}, {"export_merge_tree_part_max_rows_per_file", 0, 0, "New setting."}, - {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."} + {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}, {"cluster_table_function_split_granularity", "file", "file", "New setting."}, {"cluster_table_function_buckets_batch_size", 0, 0, "New setting."}, }); diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index 0a5f20e2b8a9..a6093d4d92f9 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -194,7 +194,7 @@ struct PathWithMetadata String getIdentifier() const { - String result = absolute_path.value_or(relative_path); + String result = getAbsolutePath().value_or(getPath()); if (file_bucket_info) result += file_bucket_info->getIdentifier(); return result; diff --git a/src/Storages/ObjectStorage/IObjectIterator.cpp b/src/Storages/ObjectStorage/IObjectIterator.cpp index a1da8033147d..08714909e63d 100644 --- a/src/Storages/ObjectStorage/IObjectIterator.cpp +++ b/src/Storages/ObjectStorage/IObjectIterator.cpp @@ -101,21 +101,24 @@ ObjectInfoPtr ObjectIteratorSplitByBuckets::next(size_t id) if (!last_object_info) return {}; - auto buffer = createReadBuffer(*last_object_info, object_storage, getContext(), log); - auto splitter = FormatFactory::instance().getSplitter(format); - if (splitter) + /// If there's no splitter for this format (e.g., CSV in archives), return the object as-is + if (!splitter) + return last_object_info; + + auto buffer = createReadBuffer(*last_object_info, object_storage, getContext(), log); + size_t bucket_size = getContext()->getSettingsRef()[Setting::cluster_table_function_buckets_batch_size]; + auto file_bucket_info = splitter->splitToBuckets(bucket_size, *buffer, format_settings); + for (const auto & file_bucket : file_bucket_info) { - size_t bucket_size = getContext()->getSettingsRef()[Setting::cluster_table_function_buckets_batch_size]; - auto file_bucket_info = splitter->splitToBuckets(bucket_size, *buffer, format_settings); - for (const auto & file_bucket : file_bucket_info) - { - auto copy_object_info = *last_object_info; - copy_object_info.file_bucket_info = file_bucket; - pending_objects_info.push(std::make_shared(copy_object_info)); - } + auto copy_object_info = *last_object_info; + copy_object_info.file_bucket_info = file_bucket; + pending_objects_info.push(std::make_shared(copy_object_info)); } + if (pending_objects_info.empty()) + return last_object_info; + auto result = pending_objects_info.front(); pending_objects_info.pop(); return result; From 98c50763bba870409f4f9d7078a3bfced1b7907c Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Mon, 22 Dec 2025 13:41:52 +0100 Subject: [PATCH 7/7] fix protocol version --- src/Core/ProtocolDefines.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Core/ProtocolDefines.h b/src/Core/ProtocolDefines.h index a27c99714ea7..b8bbf2c9e945 100644 --- a/src/Core/ProtocolDefines.h +++ b/src/Core/ProtocolDefines.h @@ -38,7 +38,7 @@ static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_ME static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_METADATA = 3; static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO = 4; static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA = 5; -static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO; +static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA; static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3; static constexpr auto DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD = 4;