Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions cpp/src/arrow/adapters/orc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <algorithm>
#include <list>
#include <memory>
#include <optional>
#include <sstream>
#include <string>
#include <vector>
Expand All @@ -30,9 +31,11 @@

#include "arrow/adapters/orc/util.h"
#include "arrow/builder.h"
#include "arrow/compute/expression.h"
#include "arrow/io/interfaces.h"
#include "arrow/memory_pool.h"
#include "arrow/record_batch.h"
#include "arrow/scalar.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/table_builder.h"
Expand Down Expand Up @@ -100,6 +103,119 @@ constexpr uint64_t kOrcNaturalWriteSize = 128 * 1024;

using internal::checked_cast;

// Statistics container for min/max values from ORC stripe statistics
struct MinMaxStats {
int64_t min;
int64_t max;
bool has_null;

MinMaxStats(int64_t min_val, int64_t max_val, bool null_flag)
: min(min_val), max(max_val), has_null(null_flag) {}
};

// Extract stripe-level statistics for a specific column
// Returns nullopt if statistics are missing or invalid
std::optional<MinMaxStats> ExtractStripeStatistics(
const std::unique_ptr<liborc::StripeStatistics>& stripe_stats,
uint32_t orc_column_id,
const std::shared_ptr<DataType>& field_type) {

if (!stripe_stats) {
return std::nullopt; // No statistics available
}

// Get column statistics
const liborc::ColumnStatistics* col_stats =
stripe_stats->getColumnStatistics(orc_column_id);

if (!col_stats) {
return std::nullopt; // Column statistics missing
}

// Only INT64 support in this initial implementation
if (field_type->id() != Type::INT64) {
return std::nullopt; // Unsupported type
}

// Dynamic cast to get integer-specific statistics
const auto* int_stats =
dynamic_cast<const liborc::IntegerColumnStatistics*>(col_stats);

if (!int_stats) {
return std::nullopt; // Wrong statistics type
}

// Check if min/max are available
if (!int_stats->hasMinimum() || !int_stats->hasMaximum()) {
return std::nullopt; // Statistics incomplete
}

// Extract raw values
int64_t min_value = int_stats->getMinimum();
int64_t max_value = int_stats->getMaximum();
bool has_null = col_stats->hasNull();

// Sanity check: min should be <= max
if (min_value > max_value) {
return std::nullopt; // Invalid statistics
}

return MinMaxStats(min_value, max_value, has_null);
}

// Build Arrow Expression representing stripe statistics guarantee
// Returns expression: (field >= min AND field <= max) OR is_null(field)
//
// This expression describes what values COULD exist in the stripe.
// Arrow's SimplifyWithGuarantee() will use this to determine if
// a predicate could be satisfied by this stripe.
//
// Example: If stripe has min=0, max=100, the guarantee is:
// (field >= 0 AND field <= 100) OR is_null(field)
//
// Then for predicate "field > 200", SimplifyWithGuarantee returns literal(false),
// indicating the stripe can be skipped.
compute::Expression BuildMinMaxExpression(
const FieldRef& field_ref,
const std::shared_ptr<DataType>& field_type,
const Scalar& min_value,
const Scalar& max_value,
bool has_null) {

// Create field reference expression
auto field_expr = compute::field_ref(field_ref);

// Build range expression: field >= min AND field <= max
auto min_expr = compute::greater_equal(field_expr, compute::literal(min_value));
auto max_expr = compute::less_equal(field_expr, compute::literal(max_value));
auto range_expr = compute::and_(std::move(min_expr), std::move(max_expr));

// If stripe contains nulls, add null handling
// This ensures we don't skip stripes with nulls when predicate
// could match null values
if (has_null) {
auto null_expr = compute::is_null(field_expr);
return compute::or_(std::move(range_expr), std::move(null_expr));
}

return range_expr;
}

// Convenience overload that takes MinMaxStats directly
compute::Expression BuildMinMaxExpression(
const FieldRef& field_ref,
const std::shared_ptr<DataType>& field_type,
const MinMaxStats& stats) {

// Convert int64 to Arrow scalar
auto min_scalar = std::make_shared<Int64Scalar>(stats.min);
auto max_scalar = std::make_shared<Int64Scalar>(stats.max);

return BuildMinMaxExpression(field_ref, field_type,
*min_scalar, *max_scalar,
stats.has_null);
}

class ArrowInputFile : public liborc::InputStream {
public:
explicit ArrowInputFile(const std::shared_ptr<io::RandomAccessFile>& file)
Expand Down
203 changes: 203 additions & 0 deletions cpp/src/arrow/dataset/file_orc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@
#include <memory>

#include "arrow/adapters/orc/adapter.h"
#include "arrow/compute/expression.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/scanner.h"
#include "arrow/io/file.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/string.h"
#include "arrow/util/thread_pool.h"
#include <numeric>

namespace arrow {

Expand Down Expand Up @@ -58,6 +62,18 @@ Result<std::unique_ptr<arrow::adapters::orc::ORCFileReader>> OpenORCReader(
return reader;
}

// Fold expression into accumulator using AND logic
// Special handling for literal(true) to avoid building large expression trees
void FoldingAnd(compute::Expression* left, compute::Expression right) {
if (left->Equals(compute::literal(true))) {
// First expression - replace true with actual expression
*left = std::move(right);
} else {
// Combine with existing expression using AND
*left = compute::and_(std::move(*left), std::move(right));
}
}

/// \brief A ScanTask backed by an ORC file.
class OrcScanTask {
public:
Expand Down Expand Up @@ -212,6 +228,193 @@ Future<std::optional<int64_t>> OrcFileFormat::CountRows(
}));
}

// //
// // OrcFileFragment
// //

OrcFileFragment::OrcFileFragment(FileSource source,
std::shared_ptr<FileFormat> format,
compute::Expression partition_expression,
std::shared_ptr<Schema> physical_schema)
: FileFragment(std::move(source), std::move(format),
std::move(partition_expression), std::move(physical_schema)) {}

Status OrcFileFragment::EnsureMetadataCached() {
auto lock = metadata_mutex_.Lock();

if (metadata_cached_) {
return Status::OK();
}

// Open reader to get schema and stripe information
ARROW_ASSIGN_OR_RAISE(auto reader, OpenORCReader(source()));
ARROW_ASSIGN_OR_RAISE(cached_schema_, reader->ReadSchema());

// Get number of stripes and cache stripe info
int num_stripes = reader->NumberOfStripes();

// Cache stripe row counts for later use
stripe_num_rows_.resize(num_stripes);
for (int i = 0; i < num_stripes; i++) {
ARROW_ASSIGN_OR_RAISE(auto stripe_metadata, reader->GetStripeMetadata(i));
stripe_num_rows_[i] = stripe_metadata->num_rows;
}

// Initialize lazy evaluation structures
// One expression per stripe, starting as literal(true) (unprocessed)
statistics_expressions_.resize(num_stripes);
for (int i = 0; i < num_stripes; i++) {
statistics_expressions_[i] = compute::literal(true);
}

// One flag per field, starting as false (not processed)
int num_fields = cached_schema_->num_fields();
statistics_expressions_complete_.resize(num_fields, false);

metadata_cached_ = true;
return Status::OK();
}

Result<std::vector<compute::Expression>> OrcFileFragment::TestStripes(
const compute::Expression& predicate) {

// Ensure metadata is loaded
RETURN_NOT_OK(EnsureMetadataCached());

// Extract fields referenced in predicate
std::vector<FieldRef> field_refs = compute::FieldsInExpression(predicate);

// Open reader if not already cached
if (!cached_reader_) {
ARROW_ASSIGN_OR_RAISE(auto input,
arrow::io::RandomAccessFile::Open(source().path()));
ARROW_ASSIGN_OR_RAISE(cached_reader_,
adapters::orc::ORCFileReader::Open(input, arrow::default_memory_pool()));
}

// Process each field referenced in predicate (lazy evaluation)
for (const FieldRef& field_ref : field_refs) {
// Resolve field reference to actual field
ARROW_ASSIGN_OR_RAISE(auto match, field_ref.FindOne(*cached_schema_));

if (!match.has_value()) {
continue; // Field not in schema
}

const auto& [field_indices, field] = *match;

// Only support top-level fields for now
if (field_indices.size() != 1) {
continue; // Nested field - skip
}

int field_index = field_indices[0];

// Check if already processed (lazy evaluation)
if (statistics_expressions_complete_[field_index]) {
continue; // Already processed
}
statistics_expressions_complete_[field_index] = true;

// PR4 limitation: only support INT64
if (field->type()->id() != Type::INT64) {
continue; // Unsupported type
}

// ORC column ID: top-level fields are 1-indexed (0 is root struct)
uint32_t orc_column_id = static_cast<uint32_t>(field_index + 1);

// Process all stripes for this field
for (size_t stripe_idx = 0; stripe_idx < stripe_num_rows_.size(); stripe_idx++) {
// Get stripe statistics
ARROW_ASSIGN_OR_RAISE(auto stripe_stats,
cached_reader_->GetStripeStatistics(stripe_idx));

// Extract min/max statistics - this calls the function from PR1
// (need to inline it here for now since it's in adapter.cc's anonymous namespace)
const auto* col_stats = stripe_stats->getColumnStatistics(orc_column_id);
if (!col_stats) {
continue; // No statistics
}

const auto* int_stats =
dynamic_cast<const liborc::IntegerColumnStatistics*>(col_stats);
if (!int_stats || !int_stats->hasMinimum() || !int_stats->hasMaximum()) {
continue; // Statistics incomplete
}

int64_t min_value = int_stats->getMinimum();
int64_t max_value = int_stats->getMaximum();
bool has_null = col_stats->hasNull();

if (min_value > max_value) {
continue; // Invalid statistics
}

// Build guarantee expression (from PR2 logic)
auto field_expr = compute::field_ref(field_ref);
auto min_scalar = std::make_shared<Int64Scalar>(min_value);
auto max_scalar = std::make_shared<Int64Scalar>(max_value);

auto min_expr = compute::greater_equal(field_expr, compute::literal(*min_scalar));
auto max_expr = compute::less_equal(field_expr, compute::literal(*max_scalar));
auto range_expr = compute::and_(std::move(min_expr), std::move(max_expr));

compute::Expression guarantee_expr;
if (has_null) {
auto null_expr = compute::is_null(field_expr);
guarantee_expr = compute::or_(std::move(range_expr), std::move(null_expr));
} else {
guarantee_expr = std::move(range_expr);
}

// Fold into accumulated expression for this stripe
FoldingAnd(&statistics_expressions_[stripe_idx], std::move(guarantee_expr));
}
}

// Simplify predicate with each stripe's guarantee
std::vector<compute::Expression> simplified_expressions;
simplified_expressions.reserve(stripe_num_rows_.size());

for (size_t i = 0; i < stripe_num_rows_.size(); i++) {
ARROW_ASSIGN_OR_RAISE(auto simplified,
compute::SimplifyWithGuarantee(predicate, statistics_expressions_[i]));
simplified_expressions.push_back(std::move(simplified));
}

return simplified_expressions;
}

Result<std::vector<int>> OrcFileFragment::FilterStripes(
const compute::Expression& predicate) {

// Feature flag for disabling predicate pushdown
if (auto env_var = arrow::internal::GetEnvVar("ARROW_ORC_DISABLE_PREDICATE_PUSHDOWN")) {
if (env_var.ok() && *env_var == "1") {
// Return all stripe indices
std::vector<int> all_stripes(stripe_num_rows_.size());
std::iota(all_stripes.begin(), all_stripes.end(), 0);
return all_stripes;
}
}

// Test each stripe
ARROW_ASSIGN_OR_RAISE(auto tested_expressions, TestStripes(predicate));

// Select stripes where predicate is satisfiable
std::vector<int> selected_stripes;
selected_stripes.reserve(stripe_num_rows_.size());

for (size_t i = 0; i < tested_expressions.size(); i++) {
if (compute::IsSatisfiable(tested_expressions[i])) {
selected_stripes.push_back(static_cast<int>(i));
}
}

return selected_stripes;
}

// //
// // OrcFileWriter, OrcFileWriteOptions
// //
Expand Down
Loading