Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/Core/ProtocolDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54

static constexpr auto DBMS_CLUSTER_INITIAL_PROCESSING_PROTOCOL_VERSION = 1;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_METADATA = 2;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA = 3;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 3;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_METADATA = 3;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO = 4;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA = 5;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA;

static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3;
static constexpr auto DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD = 4;
Expand Down
13 changes: 13 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6789,6 +6789,19 @@ Both database and table names have to be unquoted - only simple identifiers are
)", 0) \
DECLARE(Bool, allow_general_join_planning, true, R"(
Allows a more general join planning algorithm that can handle more complex conditions, but only works with hash join. If hash join is not enabled, then the usual join planning algorithm is used regardless of the value of this setting.
)", 0) \
DECLARE(ObjectStorageGranularityLevel, cluster_table_function_split_granularity, ObjectStorageGranularityLevel::FILE, R"(
Controls how data is split into tasks when executing a CLUSTER TABLE FUNCTION.

This setting defines the granularity of work distribution across the cluster:
- `file` — each task processes an entire file.
- `bucket` — tasks are created per internal data block within a file (for example, Parquet row groups).

Choosing finer granularity (like `bucket`) can improve parallelism when working with a small number of large files.
For instance, if a Parquet file contains multiple row groups, enabling `bucket` granularity allows each group to be processed independently by different workers.
)", 0) \
DECLARE(UInt64, cluster_table_function_buckets_batch_size, 0, R"(
Defines the approximate size of a batch (in bytes) used in distributed processing of tasks in cluster table functions with `bucket` split granularity. The system accumulates data until at least this amount is reached. The actual size may be slightly larger to align with data boundaries.
)", 0) \
DECLARE(UInt64, merge_table_max_tables_to_look_for_schema_inference, 1000, R"(
When creating a `Merge` table without an explicit schema or when using the `merge` table function, infer schema as a union of not more than the specified number of matching tables.
Expand Down
3 changes: 2 additions & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ class WriteBuffer;
M(CLASS_NAME, UInt64Auto) \
M(CLASS_NAME, URI) \
M(CLASS_NAME, VectorSearchFilterStrategy) \
M(CLASS_NAME, GeoToH3ArgumentOrder)
M(CLASS_NAME, GeoToH3ArgumentOrder) \
M(CLASS_NAME, ObjectStorageGranularityLevel)


COMMON_SETTINGS_SUPPORTED_TYPES(Settings, DECLARE_SETTING_TRAIT)
Expand Down
3 changes: 3 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
{"export_merge_tree_part_max_bytes_per_file", 0, 0, "New setting."},
{"export_merge_tree_part_max_rows_per_file", 0, 0, "New setting."},
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
{"cluster_table_function_split_granularity", "file", "file", "New setting."},
{"cluster_table_function_buckets_batch_size", 0, 0, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.8",
{
Expand Down
6 changes: 6 additions & 0 deletions src/Core/SettingsEnums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,4 +372,10 @@ IMPLEMENT_SETTING_ENUM(
{"manifest_file_entry", IcebergMetadataLogLevel::ManifestFileEntry}})

IMPLEMENT_SETTING_AUTO_ENUM(MergeTreePartExportFileAlreadyExistsPolicy, ErrorCodes::BAD_ARGUMENTS);

IMPLEMENT_SETTING_ENUM(
ObjectStorageGranularityLevel,
ErrorCodes::BAD_ARGUMENTS,
{{"file", ObjectStorageGranularityLevel::FILE},
{"bucket", ObjectStorageGranularityLevel::BUCKET}})
}
8 changes: 8 additions & 0 deletions src/Core/SettingsEnums.h
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,14 @@ enum class IcebergMetadataLogLevel : uint8_t

DECLARE_SETTING_ENUM(IcebergMetadataLogLevel)

enum class ObjectStorageGranularityLevel : uint8_t
{
FILE = 0,
BUCKET = 1,
};

DECLARE_SETTING_ENUM(ObjectStorageGranularityLevel)

enum class MergeTreePartExportFileAlreadyExistsPolicy : uint8_t
{
skip,
Expand Down
11 changes: 11 additions & 0 deletions src/Disks/ObjectStorages/IObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <Disks/WriteMode.h>

#include <Processors/ISimpleTransform.h>
#include <Processors/Formats/IInputFormat.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeObjectMetadata.h>

#include <Interpreters/Context_fwd.h>
Expand Down Expand Up @@ -149,6 +150,8 @@ struct PathWithMetadata
std::optional<String> absolute_path;
ObjectStoragePtr object_storage_to_use = nullptr;

FileBucketInfoPtr file_bucket_info;

PathWithMetadata() = default;

explicit PathWithMetadata(
Expand Down Expand Up @@ -188,6 +191,14 @@ struct PathWithMetadata
void loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file = true);

ObjectStoragePtr getObjectStorage() const { return object_storage_to_use; }

String getIdentifier() const
{
String result = getAbsolutePath().value_or(getPath());
if (file_bucket_info)
result += file_bucket_info->getIdentifier();
return result;
}
};

struct ObjectKeyWithMetadata
Expand Down
29 changes: 29 additions & 0 deletions src/Formats/FormatFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,20 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
return format_settings;
}

FileBucketInfoPtr FormatFactory::getFileBucketInfo(const String & format)
{
auto creator = getCreators(format);
return creator.file_bucket_info_creator();
}

void FormatFactory::registerFileBucketInfo(const String & format, FileBucketInfoCreator bucket_info)
{
chassert(bucket_info);
auto & creators = getOrCreateCreators(format);
if (creators.file_bucket_info_creator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Bucket splitter for format {} is already registered", format);
creators.file_bucket_info_creator = std::move(bucket_info);
}

InputFormatPtr FormatFactory::getInput(
const String & name,
Expand Down Expand Up @@ -694,6 +708,21 @@ void FormatFactory::registerInputFormat(const String & name, InputCreator input_
KnownFormatNames::instance().add(name, /* case_insensitive = */ true);
}

void FormatFactory::registerSplitter(const String & format, BucketSplitterCreator splitter)
{
chassert(splitter);
auto & creators = getOrCreateCreators(format);
if (creators.bucket_splitter_creator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Bucket splitter for format {} is already registered", format);
creators.bucket_splitter_creator = std::move(splitter);
}

BucketSplitter FormatFactory::getSplitter(const String & format)
{
auto creator = getCreators(format);
return creator.bucket_splitter_creator();
}

void FormatFactory::registerRandomAccessInputFormat(const String & name, RandomAccessInputCreator input_creator)
{
chassert(input_creator);
Expand Down
13 changes: 13 additions & 0 deletions src/Formats/FormatFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <base/types.h>
#include <Common/Allocator.h>

#include <Processors/Formats/IInputFormat.h>

#include <boost/noncopyable.hpp>

#include <functional>
Expand Down Expand Up @@ -94,6 +96,10 @@ class FormatFactory final : private boost::noncopyable
const RowInputFormatParams & params,
const FormatSettings & settings)>;

using FileBucketInfoCreator = std::function<FileBucketInfoPtr()>;

using BucketSplitterCreator = std::function<BucketSplitter()>;

// Incompatible with FileSegmentationEngine.
using RandomAccessInputCreator = std::function<InputFormatPtr(
ReadBuffer & buf,
Expand Down Expand Up @@ -142,6 +148,8 @@ class FormatFactory final : private boost::noncopyable
{
String name;
InputCreator input_creator;
FileBucketInfoCreator file_bucket_info_creator;
BucketSplitterCreator bucket_splitter_creator;
RandomAccessInputCreator random_access_input_creator;
OutputCreator output_creator;
FileSegmentationEngineCreator file_segmentation_engine_creator;
Expand Down Expand Up @@ -286,6 +294,11 @@ class FormatFactory final : private boost::noncopyable
void checkFormatName(const String & name) const;
bool exists(const String & name) const;

FileBucketInfoPtr getFileBucketInfo(const String & format);
void registerFileBucketInfo(const String & format, FileBucketInfoCreator bucket_info);
void registerSplitter(const String & format, BucketSplitterCreator splitter);
BucketSplitter getSplitter(const String & format);

private:
FormatsDictionary dict;
FileExtensionFormats file_extension_formats;
Expand Down
34 changes: 33 additions & 1 deletion src/Interpreters/ClusterFunctionReadTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
#include <IO/ReadHelpers.h>
#include <Interpreters/ActionsDAG.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>

namespace DB
{
Expand Down Expand Up @@ -40,6 +43,7 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o
const bool send_over_whole_archive = !context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes];
path = send_over_whole_archive ? object->getPathOrPathToArchiveIfArchive() : object->getPath();
absolute_path = object->getAbsolutePath();
file_bucket_info = object->file_bucket_info;
}
}

Expand All @@ -58,7 +62,9 @@ ObjectInfoPtr ClusterFunctionReadTaskResponse::getObjectInfo() const
object->file_meta_info = file_meta_info;
if (absolute_path.has_value() && !absolute_path.value().empty())
object->absolute_path = absolute_path;


object->file_bucket_info = file_bucket_info;

return object;
}

Expand All @@ -76,6 +82,21 @@ void ClusterFunctionReadTaskResponse::serialize(WriteBuffer & out, size_t protoc
ActionsDAG().serialize(out, registry);
}

if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO)
{
if (file_bucket_info)
{
/// Write format name so we can create appropriate file bucket info during deserialization.
writeStringBinary(file_bucket_info->getFormatName(), out);
file_bucket_info->serialize(out);
}
else
{
/// Write empty string as format name if file_bucket_info is not set.
writeStringBinary("", out);
}
}

if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA)
{
/// This info is not used when optimization is disabled, so there is no need to send it.
Expand Down Expand Up @@ -111,6 +132,17 @@ void ClusterFunctionReadTaskResponse::deserialize(ReadBuffer & in)
}
}

if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO)
{
String format;
readStringBinary(format, in);
if (!format.empty())
{
file_bucket_info = FormatFactory::instance().getFileBucketInfo(format);
file_bucket_info->deserialize(in);
}
}

if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA)
{
auto info = std::make_shared<DataFileMetaInfo>(DataFileMetaInfo::deserialize(in));
Expand Down
7 changes: 7 additions & 0 deletions src/Interpreters/ClusterFunctionReadTask.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#pragma once
#include <Core/Types.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeObjectMetadata.h>
#include <Processors/Formats/IInputFormat.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h>
#include <Storages/ObjectStorage/IObjectIterator.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeObjectMetadata.h>

Expand All @@ -18,8 +21,12 @@ struct ClusterFunctionReadTaskResponse

/// Data path (object path, in case of object storage).
String path;

/// Absolute path (including storage type prefix).
std::optional<String> absolute_path;

FileBucketInfoPtr file_bucket_info;

/// Object metadata path, in case of data lake object.
DataLakeObjectMetadata data_lake_metadata;
/// File's columns info
Expand Down
9 changes: 9 additions & 0 deletions src/Processors/Formats/IInputFormat.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
#include <optional>
#include <Processors/Formats/IInputFormat.h>
#include <IO/ReadBuffer.h>
#include <IO/WithFileName.h>
#include <Common/Exception.h>
#include <IO/VarInt.h>
#include <Interpreters/Context_fwd.h>
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>

namespace DB
{
Expand Down Expand Up @@ -70,4 +74,9 @@ void IInputFormat::onFinish()
{
resetReadBuffer();
}

void IInputFormat::setBucketsToRead(const FileBucketInfoPtr & /*buckets_to_read*/)
{
}

}
26 changes: 26 additions & 0 deletions src/Processors/Formats/IInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <IO/ReadBuffer.h>
#include <Processors/Formats/InputFormatErrorsLogger.h>
#include <Common/PODArray.h>
#include <IO/WriteBuffer.h>
#include <base/types.h>
#include <Core/BlockMissingValues.h>
#include <Processors/ISource.h>
#include <Core/Settings.h>
Expand Down Expand Up @@ -46,6 +48,29 @@ struct ChunkInfoRowNumbers : public ChunkInfo
std::optional<IColumnFilter> applied_filter;
};

/// Structure for storing information about buckets that IInputFormat needs to read.
struct FileBucketInfo
{
virtual void serialize(WriteBuffer & buffer) = 0;
virtual void deserialize(ReadBuffer & buffer) = 0;
virtual String getIdentifier() const = 0;
virtual String getFormatName() const = 0;

virtual ~FileBucketInfo() = default;
};
using FileBucketInfoPtr = std::shared_ptr<FileBucketInfo>;

/// Interface for splitting a file into buckets.
struct IBucketSplitter
{
/// Splits a file into buckets using the given read buffer and format settings.
/// Returns information about the resulting buckets (see the structure above for details).
virtual std::vector<FileBucketInfoPtr> splitToBuckets(size_t bucket_size, ReadBuffer & buf, const FormatSettings & format_settings_) = 0;

virtual ~IBucketSplitter() = default;
};
using BucketSplitter = std::shared_ptr<IBucketSplitter>;

/** Input format is a source, that reads data from ReadBuffer.
*/
class IInputFormat : public ISource
Expand All @@ -69,6 +94,7 @@ class IInputFormat : public ISource
/// All data reading from the read buffer must be performed by this method.
virtual Chunk read() = 0;

virtual void setBucketsToRead(const FileBucketInfoPtr & buckets_to_read);
/** In some usecase (hello Kafka) we need to read a lot of tiny streams in exactly the same format.
* The recreating of parser for each small stream takes too long, so we introduce a method
* resetParser() which allow to reset the state of parser to continue reading of
Expand Down
17 changes: 13 additions & 4 deletions src/Processors/Formats/Impl/Parquet/ReadManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
#include <Formats/FormatParserSharedResources.h>
#include <Processors/Formats/IInputFormat.h>

#include <mutex>
#include <shared_mutex>
#include <unordered_set>

namespace DB::ErrorCodes
{
Expand Down Expand Up @@ -41,11 +43,18 @@ std::optional<size_t> AtomicBitSet::findFirst()
return std::nullopt;
}

void ReadManager::init(FormatParserSharedResourcesPtr parser_shared_resources_)
void ReadManager::init(FormatParserSharedResourcesPtr parser_shared_resources_, const std::optional<std::vector<size_t>> & buckets_to_read_)
{
parser_shared_resources = parser_shared_resources_;
reader.file_metadata = Reader::readFileMetaData(reader.prefetcher);
reader.prefilterAndInitRowGroups();

if (buckets_to_read_)
{
row_groups_to_read = std::unordered_set<UInt64>{};
for (auto rg : *buckets_to_read_)
row_groups_to_read->insert(rg);
}
reader.prefilterAndInitRowGroups(row_groups_to_read);
reader.preparePrewhere();

ProfileEvents::increment(ProfileEvents::ParquetReadRowGroups, reader.row_groups.size());
Expand Down Expand Up @@ -337,8 +346,8 @@ void ReadManager::finishRowSubgroupStage(size_t row_group_idx, size_t row_subgro
case ReadStage::PrewhereData:
{
chassert(!reader.prewhere_steps.empty());
reader.applyPrewhere(row_subgroup);
size_t prev = row_group.prewhere_ptr.exchange(row_subgroup_idx + 1);
reader.applyPrewhere(row_subgroup, row_group);
size_t prev = row_group.prewhere_ptr.exchange(row_subgroup_idx + 1); /// NOLINT(clang-analyzer-deadcode.DeadStores)
chassert(prev == row_subgroup_idx);
if (row_subgroup_idx + 1 < row_group.subgroups.size())
{
Expand Down
Loading
Loading