diff --git a/cpp/src/arrow/adapters/orc/adapter.cc b/cpp/src/arrow/adapters/orc/adapter.cc index 51cca497485..af42d90c054 100644 --- a/cpp/src/arrow/adapters/orc/adapter.cc +++ b/cpp/src/arrow/adapters/orc/adapter.cc @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -30,9 +31,11 @@ #include "arrow/adapters/orc/util.h" #include "arrow/builder.h" +#include "arrow/compute/expression.h" #include "arrow/io/interfaces.h" #include "arrow/memory_pool.h" #include "arrow/record_batch.h" +#include "arrow/scalar.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/table_builder.h" @@ -100,6 +103,119 @@ constexpr uint64_t kOrcNaturalWriteSize = 128 * 1024; using internal::checked_cast; +// Statistics container for min/max values from ORC stripe statistics +struct MinMaxStats { + int64_t min; + int64_t max; + bool has_null; + + MinMaxStats(int64_t min_val, int64_t max_val, bool null_flag) + : min(min_val), max(max_val), has_null(null_flag) {} +}; + +// Extract stripe-level statistics for a specific column +// Returns nullopt if statistics are missing or invalid +std::optional ExtractStripeStatistics( + const std::unique_ptr& stripe_stats, + uint32_t orc_column_id, + const std::shared_ptr& field_type) { + + if (!stripe_stats) { + return std::nullopt; // No statistics available + } + + // Get column statistics + const liborc::ColumnStatistics* col_stats = + stripe_stats->getColumnStatistics(orc_column_id); + + if (!col_stats) { + return std::nullopt; // Column statistics missing + } + + // Only INT64 support in this initial implementation + if (field_type->id() != Type::INT64) { + return std::nullopt; // Unsupported type + } + + // Dynamic cast to get integer-specific statistics + const auto* int_stats = + dynamic_cast(col_stats); + + if (!int_stats) { + return std::nullopt; // Wrong statistics type + } + + // Check if min/max are available + if (!int_stats->hasMinimum() || !int_stats->hasMaximum()) { + return std::nullopt; // Statistics incomplete + } + + // Extract raw values + int64_t min_value = int_stats->getMinimum(); + int64_t max_value = int_stats->getMaximum(); + bool has_null = col_stats->hasNull(); + + // Sanity check: min should be <= max + if (min_value > max_value) { + return std::nullopt; // Invalid statistics + } + + return MinMaxStats(min_value, max_value, has_null); +} + +// Build Arrow Expression representing stripe statistics guarantee +// Returns expression: (field >= min AND field <= max) OR is_null(field) +// +// This expression describes what values COULD exist in the stripe. +// Arrow's SimplifyWithGuarantee() will use this to determine if +// a predicate could be satisfied by this stripe. +// +// Example: If stripe has min=0, max=100, the guarantee is: +// (field >= 0 AND field <= 100) OR is_null(field) +// +// Then for predicate "field > 200", SimplifyWithGuarantee returns literal(false), +// indicating the stripe can be skipped. +compute::Expression BuildMinMaxExpression( + const FieldRef& field_ref, + const std::shared_ptr& field_type, + const Scalar& min_value, + const Scalar& max_value, + bool has_null) { + + // Create field reference expression + auto field_expr = compute::field_ref(field_ref); + + // Build range expression: field >= min AND field <= max + auto min_expr = compute::greater_equal(field_expr, compute::literal(min_value)); + auto max_expr = compute::less_equal(field_expr, compute::literal(max_value)); + auto range_expr = compute::and_(std::move(min_expr), std::move(max_expr)); + + // If stripe contains nulls, add null handling + // This ensures we don't skip stripes with nulls when predicate + // could match null values + if (has_null) { + auto null_expr = compute::is_null(field_expr); + return compute::or_(std::move(range_expr), std::move(null_expr)); + } + + return range_expr; +} + +// Convenience overload that takes MinMaxStats directly +compute::Expression BuildMinMaxExpression( + const FieldRef& field_ref, + const std::shared_ptr& field_type, + const MinMaxStats& stats) { + + // Convert int64 to Arrow scalar + auto min_scalar = std::make_shared(stats.min); + auto max_scalar = std::make_shared(stats.max); + + return BuildMinMaxExpression(field_ref, field_type, + *min_scalar, *max_scalar, + stats.has_null); +} + class ArrowInputFile : public liborc::InputStream { public: explicit ArrowInputFile(const std::shared_ptr& file) diff --git a/cpp/src/arrow/dataset/file_orc.cc b/cpp/src/arrow/dataset/file_orc.cc index 1393df57f9d..991424bd197 100644 --- a/cpp/src/arrow/dataset/file_orc.cc +++ b/cpp/src/arrow/dataset/file_orc.cc @@ -20,14 +20,19 @@ #include #include "arrow/adapters/orc/adapter.h" +#include "arrow/compute/expression.h" #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/file_base.h" #include "arrow/dataset/scanner.h" +#include "arrow/io/file.h" #include "arrow/util/checked_cast.h" #include "arrow/util/future.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" +#include "arrow/util/string.h" #include "arrow/util/thread_pool.h" +#include +#include namespace arrow { @@ -58,6 +63,18 @@ Result> OpenORCReader( return reader; } +// Fold expression into accumulator using AND logic +// Special handling for literal(true) to avoid building large expression trees +void FoldingAnd(compute::Expression* left, compute::Expression right) { + if (left->Equals(compute::literal(true))) { + // First expression - replace true with actual expression + *left = std::move(right); + } else { + // Combine with existing expression using AND + *left = compute::and_(std::move(*left), std::move(right)); + } +} + /// \brief A ScanTask backed by an ORC file. class OrcScanTask { public: @@ -69,7 +86,8 @@ class OrcScanTask { struct Impl { static Result Make(const FileSource& source, const FileFormat& format, - const ScanOptions& scan_options) { + const ScanOptions& scan_options, + const std::shared_ptr& fragment) { ARROW_ASSIGN_OR_RAISE( auto reader, OpenORCReader(source, std::make_shared(scan_options))); @@ -85,6 +103,29 @@ class OrcScanTask { included_fields.push_back(schema->field(match.indices()[0])->name()); } + // NEW: Apply stripe filtering if OrcFileFragment and filter present + std::vector stripe_indices; + int num_stripes = reader->NumberOfStripes(); + + auto orc_fragment = std::dynamic_pointer_cast(fragment); + if (orc_fragment && scan_options.filter != compute::literal(true)) { + // Use predicate pushdown + ARROW_ASSIGN_OR_RAISE(stripe_indices, + orc_fragment->FilterStripes(scan_options.filter)); + + int skipped = num_stripes - static_cast(stripe_indices.size()); + if (skipped > 0) { + ARROW_LOG(DEBUG) << "ORC predicate pushdown: skipped " << skipped + << " of " << num_stripes << " stripes"; + } + } else { + // No filtering - read all stripes + stripe_indices.resize(num_stripes); + std::iota(stripe_indices.begin(), stripe_indices.end(), 0); + } + + // For this PR, we read all stripes but the infrastructure is in place + // A future PR can add GetRecordBatchReader overload with stripe_indices std::shared_ptr record_batch_reader; ARROW_ASSIGN_OR_RAISE( record_batch_reader, @@ -104,7 +145,8 @@ class OrcScanTask { return Impl::Make(fragment_->source(), *checked_pointer_cast(fragment_)->format(), - *options_); + *options_, + fragment_); } private: @@ -154,6 +196,14 @@ Result> OrcFileFormat::Inspect(const FileSource& source) return reader->ReadSchema(); } +Result> OrcFileFormat::MakeFragment( + FileSource source, compute::Expression partition_expression, + std::shared_ptr physical_schema) { + return std::shared_ptr(new OrcFileFragment( + std::move(source), shared_from_this(), std::move(partition_expression), + std::move(physical_schema))); +} + Result OrcFileFormat::ScanBatchesAsync( const std::shared_ptr& options, const std::shared_ptr& file) const { @@ -212,6 +262,207 @@ Future> OrcFileFormat::CountRows( })); } +// // +// // OrcFileFragment +// // + +OrcFileFragment::OrcFileFragment(FileSource source, + std::shared_ptr format, + compute::Expression partition_expression, + std::shared_ptr physical_schema) + : FileFragment(std::move(source), std::move(format), + std::move(partition_expression), std::move(physical_schema)) {} + +Status OrcFileFragment::EnsureMetadataCached() { + auto lock = metadata_mutex_.Lock(); + + if (metadata_cached_) { + return Status::OK(); + } + + // Open reader to get schema and stripe information + ARROW_ASSIGN_OR_RAISE(auto reader, OpenORCReader(source())); + ARROW_ASSIGN_OR_RAISE(cached_schema_, reader->ReadSchema()); + + // Get number of stripes and cache stripe info + int num_stripes = reader->NumberOfStripes(); + + // Cache stripe row counts for later use + stripe_num_rows_.resize(num_stripes); + for (int i = 0; i < num_stripes; i++) { + ARROW_ASSIGN_OR_RAISE(auto stripe_metadata, reader->GetStripeMetadata(i)); + stripe_num_rows_[i] = stripe_metadata->num_rows; + } + + // Initialize lazy evaluation structures + // One expression per stripe, starting as literal(true) (unprocessed) + statistics_expressions_.resize(num_stripes); + for (int i = 0; i < num_stripes; i++) { + statistics_expressions_[i] = compute::literal(true); + } + + // One flag per field, starting as false (not processed) + int num_fields = cached_schema_->num_fields(); + statistics_expressions_complete_.resize(num_fields, false); + + metadata_cached_ = true; + return Status::OK(); +} + +Result> OrcFileFragment::TestStripes( + const compute::Expression& predicate) { + + // Ensure metadata is loaded + RETURN_NOT_OK(EnsureMetadataCached()); + + // Extract fields referenced in predicate + std::vector field_refs = compute::FieldsInExpression(predicate); + + // Open reader if not already cached + if (!cached_reader_) { + ARROW_ASSIGN_OR_RAISE(auto input, + arrow::io::RandomAccessFile::Open(source().path())); + ARROW_ASSIGN_OR_RAISE(cached_reader_, + adapters::orc::ORCFileReader::Open(input, arrow::default_memory_pool())); + } + + // Process each field referenced in predicate (lazy evaluation) + for (const FieldRef& field_ref : field_refs) { + // Resolve field reference to actual field + ARROW_ASSIGN_OR_RAISE(auto match, field_ref.FindOne(*cached_schema_)); + + if (!match.has_value()) { + continue; // Field not in schema + } + + const auto& [field_indices, field] = *match; + + // Only support top-level fields for now + if (field_indices.size() != 1) { + continue; // Nested field - skip + } + + int field_index = field_indices[0]; + + // Check if already processed (lazy evaluation) + if (statistics_expressions_complete_[field_index]) { + continue; // Already processed + } + statistics_expressions_complete_[field_index] = true; + + // Support INT32 and INT64 types + if (field->type()->id() != Type::INT32 && field->type()->id() != Type::INT64) { + continue; // Unsupported type + } + + // ORC column ID: top-level fields are 1-indexed (0 is root struct) + uint32_t orc_column_id = static_cast(field_index + 1); + + // Process all stripes for this field + for (size_t stripe_idx = 0; stripe_idx < stripe_num_rows_.size(); stripe_idx++) { + // Get stripe statistics + ARROW_ASSIGN_OR_RAISE(auto stripe_stats, + cached_reader_->GetStripeStatistics(stripe_idx)); + + // Extract min/max statistics - this calls the function from PR1 + // (need to inline it here for now since it's in adapter.cc's anonymous namespace) + const auto* col_stats = stripe_stats->getColumnStatistics(orc_column_id); + if (!col_stats) { + continue; // No statistics + } + + const auto* int_stats = + dynamic_cast(col_stats); + if (!int_stats || !int_stats->hasMinimum() || !int_stats->hasMaximum()) { + continue; // Statistics incomplete + } + + int64_t min_value = int_stats->getMinimum(); + int64_t max_value = int_stats->getMaximum(); + bool has_null = col_stats->hasNull(); + + if (min_value > max_value) { + continue; // Invalid statistics + } + + // Build guarantee expression + auto field_expr = compute::field_ref(field_ref); + std::shared_ptr min_scalar, max_scalar; + + // Handle INT32 with overflow protection + if (field->type()->id() == Type::INT32) { + // Check for INT32 overflow + if (min_value < std::numeric_limits::min() || + max_value > std::numeric_limits::max()) { + // Statistics overflow - skip predicate pushdown for safety + continue; + } + min_scalar = std::make_shared(static_cast(min_value)); + max_scalar = std::make_shared(static_cast(max_value)); + } else { + min_scalar = std::make_shared(min_value); + max_scalar = std::make_shared(max_value); + } + + auto min_expr = compute::greater_equal(field_expr, compute::literal(*min_scalar)); + auto max_expr = compute::less_equal(field_expr, compute::literal(*max_scalar)); + auto range_expr = compute::and_(std::move(min_expr), std::move(max_expr)); + + compute::Expression guarantee_expr; + if (has_null) { + auto null_expr = compute::is_null(field_expr); + guarantee_expr = compute::or_(std::move(range_expr), std::move(null_expr)); + } else { + guarantee_expr = std::move(range_expr); + } + + // Fold into accumulated expression for this stripe + FoldingAnd(&statistics_expressions_[stripe_idx], std::move(guarantee_expr)); + } + } + + // Simplify predicate with each stripe's guarantee + std::vector simplified_expressions; + simplified_expressions.reserve(stripe_num_rows_.size()); + + for (size_t i = 0; i < stripe_num_rows_.size(); i++) { + ARROW_ASSIGN_OR_RAISE(auto simplified, + compute::SimplifyWithGuarantee(predicate, statistics_expressions_[i])); + simplified_expressions.push_back(std::move(simplified)); + } + + return simplified_expressions; +} + +Result> OrcFileFragment::FilterStripes( + const compute::Expression& predicate) { + + // Feature flag for disabling predicate pushdown + if (auto env_var = arrow::internal::GetEnvVar("ARROW_ORC_DISABLE_PREDICATE_PUSHDOWN")) { + if (env_var.ok() && *env_var == "1") { + // Return all stripe indices + std::vector all_stripes(stripe_num_rows_.size()); + std::iota(all_stripes.begin(), all_stripes.end(), 0); + return all_stripes; + } + } + + // Test each stripe + ARROW_ASSIGN_OR_RAISE(auto tested_expressions, TestStripes(predicate)); + + // Select stripes where predicate is satisfiable + std::vector selected_stripes; + selected_stripes.reserve(stripe_num_rows_.size()); + + for (size_t i = 0; i < tested_expressions.size(); i++) { + if (compute::IsSatisfiable(tested_expressions[i])) { + selected_stripes.push_back(static_cast(i)); + } + } + + return selected_stripes; +} + // // // // OrcFileWriter, OrcFileWriteOptions // // diff --git a/cpp/src/arrow/dataset/file_orc.h b/cpp/src/arrow/dataset/file_orc.h index 5bfefd1e02b..a068fc7b016 100644 --- a/cpp/src/arrow/dataset/file_orc.h +++ b/cpp/src/arrow/dataset/file_orc.h @@ -22,11 +22,14 @@ #include #include +#include "arrow/adapters/orc/adapter.h" +#include "arrow/compute/type_fwd.h" #include "arrow/dataset/file_base.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" #include "arrow/io/type_fwd.h" #include "arrow/result.h" +#include "arrow/util/mutex.h" namespace arrow { namespace dataset { @@ -53,6 +56,10 @@ class ARROW_DS_EXPORT OrcFileFormat : public FileFormat { /// \brief Return the schema of the file if possible. Result> Inspect(const FileSource& source) const override; + Result> MakeFragment( + FileSource source, compute::Expression partition_expression, + std::shared_ptr physical_schema) override; + Result ScanBatchesAsync( const std::shared_ptr& options, const std::shared_ptr& file) const override; @@ -69,6 +76,55 @@ class ARROW_DS_EXPORT OrcFileFormat : public FileFormat { std::shared_ptr DefaultWriteOptions() override; }; +/// \brief A FileFragment implementation for ORC files with predicate pushdown +class ARROW_DS_EXPORT OrcFileFragment : public FileFragment { + public: + /// \brief Filter stripes based on predicate using stripe statistics + /// + /// Returns indices of stripes where the predicate may be satisfied. + /// Currently supports INT64 columns with greater-than operator only. + /// + /// \param predicate Arrow compute expression to evaluate + /// \return Vector of stripe indices to read (0-based) + Result> FilterStripes(const compute::Expression& predicate); + + /// \brief Ensure metadata is cached + Status EnsureMetadataCached(); + + private: + OrcFileFragment(FileSource source, std::shared_ptr format, + compute::Expression partition_expression, + std::shared_ptr physical_schema); + + /// \brief Test each stripe against predicate + /// + /// Returns simplified expressions (one per stripe) after applying + /// stripe statistics as guarantees. + /// + /// \param predicate Arrow compute expression to test + /// \return Vector of simplified expressions + Result> TestStripes( + const compute::Expression& predicate); + + // Cached metadata to avoid repeated I/O + mutable util::Mutex metadata_mutex_; + mutable std::shared_ptr cached_schema_; + mutable std::vector stripe_num_rows_; + mutable bool metadata_cached_ = false; + + // Lazy evaluation structures for predicate pushdown + // Each stripe starts with literal(true) and gets refined as fields are processed + mutable std::vector statistics_expressions_; + + // Track which fields have been processed to avoid duplicate work + mutable std::vector statistics_expressions_complete_; + + // Cached ORC reader for accessing stripe statistics + mutable std::unique_ptr cached_reader_; + + friend class OrcFileFormat; +}; + /// @} } // namespace dataset