Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions c_glib/parquet-glib/arrow-file-reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,7 @@ gparquet_arrow_file_reader_read_row_group(GParquetArrowFileReader *reader,
{
const gchar *tag = "[parquet][arrow][file-reader][read-row-group]";
auto parquet_arrow_file_reader = gparquet_arrow_file_reader_get_raw(reader);
std::shared_ptr<arrow::Table> arrow_table;
arrow::Status status;
arrow::Result<std::shared_ptr<arrow::Table>> arrow_table_result;
if (column_indices) {
const auto n_columns =
parquet_arrow_file_reader->parquet_reader()->metadata()->num_columns();
Expand All @@ -268,14 +267,13 @@ gparquet_arrow_file_reader_read_row_group(GParquetArrowFileReader *reader,
}
parquet_column_indices.push_back(column_index);
}
status = parquet_arrow_file_reader->ReadRowGroup(row_group_index,
parquet_column_indices,
&arrow_table);
arrow_table_result =
parquet_arrow_file_reader->ReadRowGroup(row_group_index, parquet_column_indices);
} else {
status = parquet_arrow_file_reader->ReadRowGroup(row_group_index, &arrow_table);
arrow_table_result = parquet_arrow_file_reader->ReadRowGroup(row_group_index);
}
if (garrow_error_check(error, status, tag)) {
return garrow_table_new_raw(&arrow_table);
if (garrow::check(error, arrow_table_result, tag)) {
return garrow_table_new_raw(&(*arrow_table_result));
} else {
return NULL;
}
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2451,12 +2451,12 @@ TEST(TestArrowReadWrite, ReadSingleRowGroup) {

ASSERT_EQ(2, reader->num_row_groups());

std::shared_ptr<Table> r1, r2, r3, r4;
std::shared_ptr<Table> r2;
// Read everything
ASSERT_OK_NO_THROW(reader->ReadRowGroup(0, &r1));
ASSERT_OK_AND_ASSIGN(auto r1, reader->ReadRowGroup(0));
ASSERT_OK_NO_THROW(reader->RowGroup(1)->ReadTable(&r2));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a Result version for this ReadTable call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call is on a RowGroupReader, which still only has the Status + out-parameter API, so I left it unchanged.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's work on it as a separated task.

ASSERT_OK_NO_THROW(reader->ReadRowGroups({0, 1}, &r3));
ASSERT_OK_NO_THROW(reader->ReadRowGroups({1}, &r4));
ASSERT_OK_AND_ASSIGN(auto r3, reader->ReadRowGroups({0, 1}));
ASSERT_OK_AND_ASSIGN(auto r4, reader->ReadRowGroups({1}));

std::shared_ptr<Table> concatenated;

Expand Down Expand Up @@ -4085,7 +4085,7 @@ TEST_F(TestNestedSchemaRead, ReadTablePartial) {
ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table));

// columns: {group1.leaf1, leaf3}
ASSERT_OK_NO_THROW(reader_->ReadRowGroup(0, {0, 2}, &table));
ASSERT_OK_AND_ASSIGN(table, reader_->ReadRowGroup(0, {0, 2}));
ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
ASSERT_EQ(table->num_columns(), 2);
ASSERT_EQ(table->schema()->field(0)->name(), "group1");
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/parquet/arrow/fuzz_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,16 @@ namespace {
Status FuzzReadData(std::unique_ptr<FileReader> reader) {
auto final_status = Status::OK();
for (int i = 0; i < reader->num_row_groups(); ++i) {
std::shared_ptr<Table> table;
auto row_group_status = reader->ReadRowGroup(i, &table);
if (row_group_status.ok()) {
auto table_result = reader->ReadRowGroup(i);
if (table_result.ok()) {
// When reading returns successfully, the Arrow data should be structurally
// valid so that it can be read normally. If that is not the case, abort
// so that the error can be published by OSS-Fuzz.
auto table = *table_result;
ARROW_CHECK_OK(table->Validate());
row_group_status &= table->ValidateFull();
final_status &= table->ValidateFull();
}
final_status &= row_group_status;
final_status &= table_result.status();
}
return final_status;
}
Expand Down
64 changes: 42 additions & 22 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,7 @@ class FileReaderImpl : public FileReader {

Result<std::shared_ptr<Table>> ReadTable(
const std::vector<int>& column_indices) override {
std::shared_ptr<Table> table;
RETURN_NOT_OK(ReadRowGroups(Iota(reader_->metadata()->num_row_groups()),
column_indices, &table));
return table;
return ReadRowGroups(Iota(reader_->metadata()->num_row_groups()), column_indices);
}

Status GetFieldReader(int i,
Expand Down Expand Up @@ -312,9 +309,8 @@ class FileReaderImpl : public FileReader {
return ReadTable(Iota(reader_->metadata()->num_columns()));
}

Status ReadRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& indices,
std::shared_ptr<Table>* table) override;
Result<std::shared_ptr<Table>> ReadRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& indices) override;

// Helper method used by ReadRowGroups - read the given row groups/columns, skipping
// bounds checks and pre-buffering. Takes a shared_ptr to self to keep the reader
Expand All @@ -323,18 +319,18 @@ class FileReaderImpl : public FileReader {
std::shared_ptr<FileReaderImpl> self, const std::vector<int>& row_groups,
const std::vector<int>& column_indices, ::arrow::internal::Executor* cpu_executor);

Status ReadRowGroups(const std::vector<int>& row_groups,
std::shared_ptr<Table>* table) override {
return ReadRowGroups(row_groups, Iota(reader_->metadata()->num_columns()), table);
Result<std::shared_ptr<Table>> ReadRowGroups(
const std::vector<int>& row_groups) override {
return ReadRowGroups(row_groups, Iota(reader_->metadata()->num_columns()));
}

Status ReadRowGroup(int row_group_index, const std::vector<int>& column_indices,
std::shared_ptr<Table>* out) override {
return ReadRowGroups({row_group_index}, column_indices, out);
Result<std::shared_ptr<Table>> ReadRowGroup(
int row_group_index, const std::vector<int>& column_indices) override {
return ReadRowGroups({row_group_index}, column_indices);
}

Status ReadRowGroup(int i, std::shared_ptr<Table>* table) override {
return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()), table);
Result<std::shared_ptr<Table>> ReadRowGroup(int i) override {
return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()));
}

Result<std::unique_ptr<RecordBatchReader>> GetRecordBatchReader(
Expand Down Expand Up @@ -437,11 +433,13 @@ class RowGroupReaderImpl : public RowGroupReader {

Status ReadTable(const std::vector<int>& column_indices,
std::shared_ptr<::arrow::Table>* out) override {
return impl_->ReadRowGroup(row_group_index_, column_indices, out);
ARROW_ASSIGN_OR_RAISE(*out, impl_->ReadRowGroup(row_group_index_, column_indices));
return Status::OK();
}

Status ReadTable(std::shared_ptr<::arrow::Table>* out) override {
return impl_->ReadRowGroup(row_group_index_, out);
ARROW_ASSIGN_OR_RAISE(*out, impl_->ReadRowGroup(row_group_index_));
return Status::OK();
}

private:
Expand Down Expand Up @@ -1254,9 +1252,8 @@ Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_facto
return Status::OK();
}

Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& column_indices,
std::shared_ptr<Table>* out) {
Result<std::shared_ptr<Table>> FileReaderImpl::ReadRowGroups(
const std::vector<int>& row_groups, const std::vector<int>& column_indices) {
RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));

// PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
Expand All @@ -1270,8 +1267,7 @@ Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,

auto fut = DecodeRowGroups(/*self=*/nullptr, row_groups, column_indices,
/*cpu_executor=*/nullptr);
ARROW_ASSIGN_OR_RAISE(*out, fut.MoveResult());
return Status::OK();
return fut.MoveResult();
}

Future<std::shared_ptr<Table>> FileReaderImpl::DecodeRowGroups(
Expand Down Expand Up @@ -1353,6 +1349,30 @@ Status FileReader::ReadTable(const std::vector<int>& column_indices,
return Status::OK();
}

Status FileReader::ReadRowGroup(int i, const std::vector<int>& column_indices,
std::shared_ptr<Table>* out) {
ARROW_ASSIGN_OR_RAISE(*out, ReadRowGroup(i, column_indices));
return Status::OK();
}

Status FileReader::ReadRowGroup(int i, std::shared_ptr<Table>* out) {
ARROW_ASSIGN_OR_RAISE(*out, ReadRowGroup(i));
return Status::OK();
}

Status FileReader::ReadRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& column_indices,
std::shared_ptr<Table>* out) {
ARROW_ASSIGN_OR_RAISE(*out, ReadRowGroups(row_groups, column_indices));
return Status::OK();
}

Status FileReader::ReadRowGroups(const std::vector<int>& row_groups,
std::shared_ptr<Table>* out) {
ARROW_ASSIGN_OR_RAISE(*out, ReadRowGroups(row_groups));
return Status::OK();
}

Status FileReader::Make(::arrow::MemoryPool* pool,
std::unique_ptr<ParquetFileReader> reader,
const ArrowReaderProperties& properties,
Expand Down
39 changes: 31 additions & 8 deletions cpp/src/parquet/arrow/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,17 +266,40 @@ class PARQUET_EXPORT FileReader {
::arrow::Status ReadTable(const std::vector<int>& column_indices,
std::shared_ptr<::arrow::Table>* out);

virtual ::arrow::Status ReadRowGroup(int i, const std::vector<int>& column_indices,
std::shared_ptr<::arrow::Table>* out) = 0;
/// \brief Read the given row group columns into a Table
virtual ::arrow::Result<std::shared_ptr<::arrow::Table>> ReadRowGroup(
int i, const std::vector<int>& column_indices) = 0;

virtual ::arrow::Status ReadRowGroup(int i, std::shared_ptr<::arrow::Table>* out) = 0;
/// \brief Read the given row group into a Table
virtual ::arrow::Result<std::shared_ptr<::arrow::Table>> ReadRowGroup(int i) = 0;

virtual ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& column_indices,
std::shared_ptr<::arrow::Table>* out) = 0;
/// \brief Read the given row groups columns into a Table
virtual ::arrow::Result<std::shared_ptr<::arrow::Table>> ReadRowGroups(
const std::vector<int>& row_groups, const std::vector<int>& column_indices) = 0;

virtual ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
std::shared_ptr<::arrow::Table>* out) = 0;
/// \brief Read the given row groups into a Table
virtual ::arrow::Result<std::shared_ptr<::arrow::Table>> ReadRowGroups(
const std::vector<int>& row_groups) = 0;

/// \deprecated Deprecated in 24.0.0. Use arrow::Result version instead.
ARROW_DEPRECATED("Deprecated in 24.0.0. Use arrow::Result version instead.")
::arrow::Status ReadRowGroup(int i, const std::vector<int>& column_indices,
std::shared_ptr<::arrow::Table>* out);

/// \deprecated Deprecated in 24.0.0. Use arrow::Result version instead.
ARROW_DEPRECATED("Deprecated in 24.0.0. Use arrow::Result version instead.")
::arrow::Status ReadRowGroup(int i, std::shared_ptr<::arrow::Table>* out);

/// \deprecated Deprecated in 24.0.0. Use arrow::Result version instead.
ARROW_DEPRECATED("Deprecated in 24.0.0. Use arrow::Result version instead.")
::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& column_indices,
std::shared_ptr<::arrow::Table>* out);

/// \deprecated Deprecated in 24.0.0. Use arrow::Result version instead.
ARROW_DEPRECATED("Deprecated in 24.0.0. Use arrow::Result version instead.")
::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
std::shared_ptr<::arrow::Table>* out);

/// \brief Scan file contents with one thread, return number of rows
virtual ::arrow::Status ScanContents(std::vector<int> columns,
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/parquet/arrow/reader_writer_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -777,8 +777,7 @@ static void BM_ReadMultipleRowGroups(::benchmark::State& state) {
EXIT_NOT_OK(arrow_reader_result.status());
auto arrow_reader = std::move(*arrow_reader_result);

std::shared_ptr<Table> table;
EXIT_NOT_OK(arrow_reader->ReadRowGroups(rgs, &table));
PARQUET_ASSIGN_OR_THROW(auto table, arrow_reader->ReadRowGroups(rgs));
}
SetBytesProcessed<Int64Type>(state);
}
Expand Down
12 changes: 5 additions & 7 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1811,7 +1811,7 @@ cdef class ParquetReader(_Weakrefable):
table : pyarrow.Table
"""
cdef:
shared_ptr[CTable] ctable
CResult[shared_ptr[CTable]] table_result
vector[int] c_row_groups
vector[int] c_column_indices

Expand All @@ -1825,15 +1825,13 @@ cdef class ParquetReader(_Weakrefable):
c_column_indices.push_back(index)

with nogil:
check_status(self.reader.get()
.ReadRowGroups(c_row_groups, c_column_indices,
&ctable))
table_result = self.reader.get().ReadRowGroups(c_row_groups,
c_column_indices)
else:
# Read all columns
with nogil:
check_status(self.reader.get()
.ReadRowGroups(c_row_groups, &ctable))
return pyarrow_wrap_table(ctable)
table_result = self.reader.get().ReadRowGroups(c_row_groups)
return pyarrow_wrap_table(GetResultValue(table_result))

def read_all(self, column_indices=None, bint use_threads=True):
"""
Expand Down
16 changes: 7 additions & 9 deletions python/pyarrow/includes/libparquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -534,15 +534,13 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
CStatus ReadSchemaField(int i, shared_ptr[CChunkedArray]* out)

int num_row_groups()
CStatus ReadRowGroup(int i, shared_ptr[CTable]* out)
CStatus ReadRowGroup(int i, const vector[int]& column_indices,
shared_ptr[CTable]* out)

CStatus ReadRowGroups(const vector[int]& row_groups,
shared_ptr[CTable]* out)
CStatus ReadRowGroups(const vector[int]& row_groups,
const vector[int]& column_indices,
shared_ptr[CTable]* out)
CResult[shared_ptr[CTable]] ReadRowGroup(int i)
CResult[shared_ptr[CTable]] ReadRowGroup(int i,
const vector[int]& column_indices)

CResult[shared_ptr[CTable]] ReadRowGroups(const vector[int]& row_groups)
CResult[shared_ptr[CTable]] ReadRowGroups(const vector[int]& row_groups,
const vector[int]& column_indices)

CResult[unique_ptr[CRecordBatchReader]] GetRecordBatchReader(const vector[int]& row_group_indices,
const vector[int]& column_indices)
Expand Down
36 changes: 12 additions & 24 deletions r/src/parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,48 +147,36 @@ std::shared_ptr<arrow::Table> parquet___arrow___FileReader__ReadTable2(
// [[parquet::export]]
std::shared_ptr<arrow::Table> parquet___arrow___FileReader__ReadRowGroup1(
const std::shared_ptr<parquet::arrow::FileReader>& reader, int i) {
std::shared_ptr<arrow::Table> table;
auto result =
RunWithCapturedRIfPossibleVoid([&]() { return reader->ReadRowGroup(i, &table); });

StopIfNotOk(result);
return table;
auto result = RunWithCapturedRIfPossible<std::shared_ptr<arrow::Table>>(
[&]() { return reader->ReadRowGroup(i); });
return ValueOrStop(result);
}

// [[parquet::export]]
std::shared_ptr<arrow::Table> parquet___arrow___FileReader__ReadRowGroup2(
const std::shared_ptr<parquet::arrow::FileReader>& reader, int i,
const std::vector<int>& column_indices) {
std::shared_ptr<arrow::Table> table;
auto result = RunWithCapturedRIfPossibleVoid(
[&]() { return reader->ReadRowGroup(i, column_indices, &table); });

StopIfNotOk(result);
return table;
auto result = RunWithCapturedRIfPossible<std::shared_ptr<arrow::Table>>(
[&]() { return reader->ReadRowGroup(i, column_indices); });
return ValueOrStop(result);
}

// [[parquet::export]]
std::shared_ptr<arrow::Table> parquet___arrow___FileReader__ReadRowGroups1(
const std::shared_ptr<parquet::arrow::FileReader>& reader,
const std::vector<int>& row_groups) {
std::shared_ptr<arrow::Table> table;
auto result = RunWithCapturedRIfPossibleVoid(
[&]() { return reader->ReadRowGroups(row_groups, &table); });

StopIfNotOk(result);
return table;
auto result = RunWithCapturedRIfPossible<std::shared_ptr<arrow::Table>>(
[&]() { return reader->ReadRowGroups(row_groups); });
return ValueOrStop(result);
}

// [[parquet::export]]
std::shared_ptr<arrow::Table> parquet___arrow___FileReader__ReadRowGroups2(
const std::shared_ptr<parquet::arrow::FileReader>& reader,
const std::vector<int>& row_groups, const std::vector<int>& column_indices) {
std::shared_ptr<arrow::Table> table;
auto result = RunWithCapturedRIfPossibleVoid(
[&]() { return reader->ReadRowGroups(row_groups, column_indices, &table); });

StopIfNotOk(result);
return table;
auto result = RunWithCapturedRIfPossible<std::shared_ptr<arrow::Table>>(
[&]() { return reader->ReadRowGroups(row_groups, column_indices); });
return ValueOrStop(result);
}

// [[parquet::export]]
Expand Down
Loading