From 64f0538b94142eddc95885a7bab9b1272ebbd431 Mon Sep 17 00:00:00 2001 From: Abhishek Bansal Date: Sun, 25 Jan 2026 01:08:08 +0530 Subject: [PATCH] GH-48846: [C++] Optimize ReadMessage to read metadata and body in one go --- cpp/src/arrow/ipc/message.cc | 50 +++++++++++++++++++ cpp/src/arrow/ipc/message.h | 20 ++++++++ cpp/src/arrow/ipc/read_write_test.cc | 72 ++++++++++++++++++++++------ cpp/src/arrow/ipc/reader.cc | 12 +++-- 4 files changed, 136 insertions(+), 18 deletions(-) diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index 8be09956f10..bcf6243bf7c 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -421,6 +421,56 @@ Result> ReadMessage(int64_t offset, int32_t metadata_le } } +Result> ReadMessage(const int64_t offset, + const int32_t metadata_length, + const int64_t body_length, + io::RandomAccessFile* file) { + std::unique_ptr result; + auto listener = std::make_shared(&result); + MessageDecoder decoder(listener); + + if (metadata_length < decoder.next_required_size()) { + return Status::Invalid("metadata_length should be at least ", + decoder.next_required_size()); + } + + ARROW_ASSIGN_OR_RAISE(auto metadata, + file->ReadAt(offset, metadata_length + body_length)); + if (metadata->size() < metadata_length) { + return Status::Invalid("Expected to read ", metadata_length, + " metadata bytes at offset ", offset, " but got ", + metadata->size()); + } + + ARROW_RETURN_NOT_OK(decoder.Consume(SliceBuffer(metadata, 0, metadata_length))); + + switch (decoder.state()) { + case MessageDecoder::State::INITIAL: + return result; + case MessageDecoder::State::METADATA_LENGTH: + return Status::Invalid("metadata length is missing. File offset: ", offset, + ", metadata length: ", metadata_length); + case MessageDecoder::State::METADATA: + return Status::Invalid("flatbuffer size ", decoder.next_required_size(), + " invalid. File offset: ", offset, + ", metadata length: ", metadata_length); + case MessageDecoder::State::BODY: { + auto body = SliceBuffer(metadata, metadata_length, body_length); + if (body->size() < decoder.next_required_size()) { + return Status::IOError("Expected to be able to read ", + decoder.next_required_size(), + " bytes for message body, got ", body->size()); + } + RETURN_NOT_OK(decoder.Consume(body)); + return result; + } + case MessageDecoder::State::EOS: + return Status::Invalid("Unexpected empty message in IPC file format"); + default: + return Status::Invalid("Unexpected state: ", decoder.state()); + } +} + Future> ReadMessageAsync(int64_t offset, int32_t metadata_length, int64_t body_length, io::RandomAccessFile* file, diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h index 1cd72ce993e..e36a8859b9e 100644 --- a/cpp/src/arrow/ipc/message.h +++ b/cpp/src/arrow/ipc/message.h @@ -469,6 +469,26 @@ Result> ReadMessage( const int64_t offset, const int32_t metadata_length, io::RandomAccessFile* file, const FieldsLoaderFunction& fields_loader = {}); +/// \brief Read encapsulated RPC message from position in file +/// +/// Read a length-prefixed message flatbuffer starting at the indicated file +/// offset. +/// +/// The metadata_length includes at least the length prefix and the flatbuffer +/// +/// \param[in] offset the position in the file where the message starts. The +/// first 4 bytes after the offset are the message length +/// \param[in] metadata_length the total number of bytes to read from file +/// \param[in] body_length the number of bytes for the message body +/// \param[in] file the seekable file interface to read from +/// \return the message read + +ARROW_EXPORT +Result> ReadMessage(const int64_t offset, + const int32_t metadata_length, + const int64_t body_length, + io::RandomAccessFile* file); + /// \brief Read encapsulated RPC message from cached buffers /// /// The buffers should contain an entire message. Partial reads are not handled. diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 315d8bd07d9..92d192afa5e 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -552,9 +552,15 @@ class TestIpcRoundTrip : public ::testing::TestWithParam, ASSERT_OK(WriteRecordBatch(*batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, options_)); - ASSERT_OK_AND_ASSIGN(std::unique_ptr message, + ASSERT_OK_AND_ASSIGN(std::unique_ptr message1, ReadMessage(0, metadata_length, mmap_.get())); - ASSERT_EQ(expected_version, message->metadata_version()); + ASSERT_EQ(expected_version, message1->metadata_version()); + + ASSERT_OK_AND_ASSIGN(auto message2, + ReadMessage(0, metadata_length, body_length, mmap_.get())); + ASSERT_EQ(expected_version, message2->metadata_version()); + + ASSERT_TRUE(message1->Equals(*message2)); } }; @@ -613,6 +619,27 @@ TEST(TestReadMessage, CorruptedSmallInput) { ASSERT_EQ(nullptr, message); } +TEST(TestReadMessage, ReadBodyWithLength) { + // Test the optimized ReadMessage(offset, meta_len, body_len, file) overload + std::shared_ptr batch; + ASSERT_OK(MakeIntRecordBatch(&batch)); + + ASSERT_OK_AND_ASSIGN(auto stream, io::BufferOutputStream::Create(0)); + int32_t metadata_length; + int64_t body_length; + ASSERT_OK(WriteRecordBatch(*batch, 0, stream.get(), &metadata_length, &body_length, + IpcWriteOptions::Defaults())); + + ASSERT_OK_AND_ASSIGN(auto buffer, stream->Finish()); + io::BufferReader reader(buffer); + + ASSERT_OK_AND_ASSIGN(auto message, + ReadMessage(0, metadata_length, body_length, &reader)); + + ASSERT_EQ(body_length, message->body_length()); + ASSERT_TRUE(message->Verify()); +} + TEST(TestMetadata, GetMetadataVersion) { ASSERT_EQ(MetadataVersion::V1, ipc::internal::GetMetadataVersion( flatbuf::MetadataVersion::MetadataVersion_V1)); @@ -1094,7 +1121,7 @@ TEST_F(RecursionLimits, ReadLimit) { &schema)); ASSERT_OK_AND_ASSIGN(std::unique_ptr message, - ReadMessage(0, metadata_length, mmap_.get())); + ReadMessage(0, metadata_length, body_length, mmap_.get())); io::BufferReader reader(message->body()); @@ -1119,7 +1146,7 @@ TEST_F(RecursionLimits, StressLimit) { &schema)); ASSERT_OK_AND_ASSIGN(std::unique_ptr message, - ReadMessage(0, metadata_length, mmap_.get())); + ReadMessage(0, metadata_length, body_length, mmap_.get())); DictionaryMemo empty_memo; @@ -3003,25 +3030,38 @@ void GetReadRecordBatchReadRanges( auto read_ranges = tracked->get_read_ranges(); - // there are 3 read IOs before reading body: - // 1) read magic and footer length IO - // 2) read footer IO - // 3) read record batch metadata IO - EXPECT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size()); const int32_t magic_size = static_cast(strlen(ipc::internal::kArrowMagicBytes)); // read magic and footer length IO auto file_end_size = magic_size + sizeof(int32_t); auto footer_length_offset = buffer->size() - file_end_size; auto footer_length = bit_util::FromLittleEndian( util::SafeLoadAs(buffer->data() + footer_length_offset)); + + // read magic and footer length IO EXPECT_EQ(read_ranges[0].length, file_end_size); // read footer IO EXPECT_EQ(read_ranges[1].length, footer_length); - // read record batch metadata. The exact size is tricky to determine but it doesn't - // matter for this test and it should be smaller than the footer. - EXPECT_LE(read_ranges[2].length, footer_length); - for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) { - EXPECT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]); + + // there are 3 read IOs before reading body: + // 1) read magic and footer length IO + // 2) read footer IO + // 3) read record batch metadata IO + if (included_fields.empty()) { + EXPECT_EQ(read_ranges.size(), 3); + + int64_t total_body = 0; + for (auto len : expected_body_read_lengths) total_body += len; + + EXPECT_GT(read_ranges[2].length, total_body); + } else { + EXPECT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size()); + + // read record batch metadata. The exact size is tricky to determine but it doesn't + // matter for this test and it should be smaller than the footer. + EXPECT_LE(read_ranges[2].length, footer_length); + for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) { + EXPECT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]); + } } } @@ -3171,7 +3211,9 @@ class PreBufferingTest : public ::testing::TestWithParam { metadata_reads++; } } - ASSERT_EQ(metadata_reads, reader_->num_record_batches() - num_indices_pre_buffered); + // With ReadMessage optimization, non-prebuffered reads verify metadata and body + // in a single large read, so we no longer see small metadata-only reads here. + ASSERT_EQ(metadata_reads, 0); ASSERT_EQ(data_reads, reader_->num_record_batches()); } diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 8e125fc5ede..efe4a478927 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -1203,9 +1203,15 @@ Result> ReadMessageFromBlock( const FileBlock& block, io::RandomAccessFile* file, const FieldsLoaderFunction& fields_loader) { RETURN_NOT_OK(CheckAligned(block)); - ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length, - file, fields_loader)); - return CheckBodyLength(std::move(message), block); + if (fields_loader) { + ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length, + file, fields_loader)); + return CheckBodyLength(std::move(message), block); + } else { + ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, block.metadata_length, + block.body_length, file)); + return CheckBodyLength(std::move(message), block); + } } Future> ReadMessageFromBlockAsync(