Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions cpp/src/arrow/ipc/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,56 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t metadata_le
}
}

Result<std::unique_ptr<Message>> ReadMessage(const int64_t offset,
const int32_t metadata_length,
const int64_t body_length,
io::RandomAccessFile* file) {
std::unique_ptr<Message> result;
auto listener = std::make_shared<AssignMessageDecoderListener>(&result);
MessageDecoder decoder(listener);

if (metadata_length < decoder.next_required_size()) {
return Status::Invalid("metadata_length should be at least ",
decoder.next_required_size());
}

ARROW_ASSIGN_OR_RAISE(auto metadata,
file->ReadAt(offset, metadata_length + body_length));
if (metadata->size() < metadata_length) {
return Status::Invalid("Expected to read ", metadata_length,
" metadata bytes at offset ", offset, " but got ",
metadata->size());
}

ARROW_RETURN_NOT_OK(decoder.Consume(SliceBuffer(metadata, 0, metadata_length)));

switch (decoder.state()) {
case MessageDecoder::State::INITIAL:
return result;
case MessageDecoder::State::METADATA_LENGTH:
return Status::Invalid("metadata length is missing. File offset: ", offset,
", metadata length: ", metadata_length);
case MessageDecoder::State::METADATA:
return Status::Invalid("flatbuffer size ", decoder.next_required_size(),
" invalid. File offset: ", offset,
", metadata length: ", metadata_length);
case MessageDecoder::State::BODY: {
auto body = SliceBuffer(metadata, metadata_length, body_length);
if (body->size() < decoder.next_required_size()) {
return Status::IOError("Expected to be able to read ",
decoder.next_required_size(),
" bytes for message body, got ", body->size());
}
RETURN_NOT_OK(decoder.Consume(body));
return result;
}
case MessageDecoder::State::EOS:
return Status::Invalid("Unexpected empty message in IPC file format");
default:
return Status::Invalid("Unexpected state: ", decoder.state());
}
}

Future<std::shared_ptr<Message>> ReadMessageAsync(int64_t offset, int32_t metadata_length,
int64_t body_length,
io::RandomAccessFile* file,
Expand Down
20 changes: 20 additions & 0 deletions cpp/src/arrow/ipc/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,26 @@ Result<std::unique_ptr<Message>> ReadMessage(
const int64_t offset, const int32_t metadata_length, io::RandomAccessFile* file,
const FieldsLoaderFunction& fields_loader = {});

/// \brief Read encapsulated RPC message from position in file
///
/// Read a length-prefixed message flatbuffer starting at the indicated file
/// offset.
///
/// The metadata_length includes at least the length prefix and the flatbuffer
///
/// \param[in] offset the position in the file where the message starts. The
/// first 4 bytes after the offset are the message length
/// \param[in] metadata_length the total number of bytes to read from file
/// \param[in] body_length the number of bytes for the message body
/// \param[in] file the seekable file interface to read from
/// \return the message read

ARROW_EXPORT
Result<std::unique_ptr<Message>> ReadMessage(const int64_t offset,
const int32_t metadata_length,
const int64_t body_length,
io::RandomAccessFile* file);

/// \brief Read encapsulated RPC message from cached buffers
///
/// The buffers should contain an entire message. Partial reads are not handled.
Expand Down
72 changes: 57 additions & 15 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -552,9 +552,15 @@ class TestIpcRoundTrip : public ::testing::TestWithParam<MakeRecordBatch*>,
ASSERT_OK(WriteRecordBatch(*batch, buffer_offset, mmap_.get(), &metadata_length,
&body_length, options_));

ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message,
ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message1,
ReadMessage(0, metadata_length, mmap_.get()));
ASSERT_EQ(expected_version, message->metadata_version());
ASSERT_EQ(expected_version, message1->metadata_version());

ASSERT_OK_AND_ASSIGN(auto message2,
ReadMessage(0, metadata_length, body_length, mmap_.get()));
ASSERT_EQ(expected_version, message2->metadata_version());

ASSERT_TRUE(message1->Equals(*message2));
}
};

Expand Down Expand Up @@ -613,6 +619,27 @@ TEST(TestReadMessage, CorruptedSmallInput) {
ASSERT_EQ(nullptr, message);
}

TEST(TestReadMessage, ReadBodyWithLength) {
// Test the optimized ReadMessage(offset, meta_len, body_len, file) overload
std::shared_ptr<RecordBatch> batch;
ASSERT_OK(MakeIntRecordBatch(&batch));

ASSERT_OK_AND_ASSIGN(auto stream, io::BufferOutputStream::Create(0));
int32_t metadata_length;
int64_t body_length;
ASSERT_OK(WriteRecordBatch(*batch, 0, stream.get(), &metadata_length, &body_length,
IpcWriteOptions::Defaults()));

ASSERT_OK_AND_ASSIGN(auto buffer, stream->Finish());
io::BufferReader reader(buffer);

ASSERT_OK_AND_ASSIGN(auto message,
ReadMessage(0, metadata_length, body_length, &reader));

ASSERT_EQ(body_length, message->body_length());
ASSERT_TRUE(message->Verify());
}

TEST(TestMetadata, GetMetadataVersion) {
ASSERT_EQ(MetadataVersion::V1, ipc::internal::GetMetadataVersion(
flatbuf::MetadataVersion::MetadataVersion_V1));
Expand Down Expand Up @@ -1094,7 +1121,7 @@ TEST_F(RecursionLimits, ReadLimit) {
&schema));

ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message,
ReadMessage(0, metadata_length, mmap_.get()));
ReadMessage(0, metadata_length, body_length, mmap_.get()));

io::BufferReader reader(message->body());

Expand All @@ -1119,7 +1146,7 @@ TEST_F(RecursionLimits, StressLimit) {
&schema));

ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message,
ReadMessage(0, metadata_length, mmap_.get()));
ReadMessage(0, metadata_length, body_length, mmap_.get()));

DictionaryMemo empty_memo;

Expand Down Expand Up @@ -3003,25 +3030,38 @@ void GetReadRecordBatchReadRanges(

auto read_ranges = tracked->get_read_ranges();

// there are 3 read IOs before reading body:
// 1) read magic and footer length IO
// 2) read footer IO
// 3) read record batch metadata IO
EXPECT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size());
const int32_t magic_size = static_cast<int>(strlen(ipc::internal::kArrowMagicBytes));
// read magic and footer length IO
auto file_end_size = magic_size + sizeof(int32_t);
auto footer_length_offset = buffer->size() - file_end_size;
auto footer_length = bit_util::FromLittleEndian(
util::SafeLoadAs<int32_t>(buffer->data() + footer_length_offset));

// read magic and footer length IO
EXPECT_EQ(read_ranges[0].length, file_end_size);
// read footer IO
EXPECT_EQ(read_ranges[1].length, footer_length);
// read record batch metadata. The exact size is tricky to determine but it doesn't
// matter for this test and it should be smaller than the footer.
EXPECT_LE(read_ranges[2].length, footer_length);
for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) {
EXPECT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]);

// there are 3 read IOs before reading body:
// 1) read magic and footer length IO
// 2) read footer IO
// 3) read record batch metadata IO
if (included_fields.empty()) {
EXPECT_EQ(read_ranges.size(), 3);

int64_t total_body = 0;
for (auto len : expected_body_read_lengths) total_body += len;

EXPECT_GT(read_ranges[2].length, total_body);
} else {
EXPECT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size());

// read record batch metadata. The exact size is tricky to determine but it doesn't
// matter for this test and it should be smaller than the footer.
EXPECT_LE(read_ranges[2].length, footer_length);
for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) {
EXPECT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]);
}
}
}

Expand Down Expand Up @@ -3171,7 +3211,9 @@ class PreBufferingTest : public ::testing::TestWithParam<bool> {
metadata_reads++;
}
}
ASSERT_EQ(metadata_reads, reader_->num_record_batches() - num_indices_pre_buffered);
// With ReadMessage optimization, non-prebuffered reads verify metadata and body
// in a single large read, so we no longer see small metadata-only reads here.
ASSERT_EQ(metadata_reads, 0);
ASSERT_EQ(data_reads, reader_->num_record_batches());
}

Expand Down
12 changes: 9 additions & 3 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1203,9 +1203,15 @@ Result<std::unique_ptr<Message>> ReadMessageFromBlock(
const FileBlock& block, io::RandomAccessFile* file,
const FieldsLoaderFunction& fields_loader) {
RETURN_NOT_OK(CheckAligned(block));
ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length,
file, fields_loader));
return CheckBodyLength(std::move(message), block);
if (fields_loader) {
ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length,
file, fields_loader));
return CheckBodyLength(std::move(message), block);
} else {
ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length,
block.body_length, file));
return CheckBodyLength(std::move(message), block);
}
}

Future<std::shared_ptr<Message>> ReadMessageFromBlockAsync(
Expand Down