From 2f6ab2b67c6f107b0cef477ac86b708a285f45fa Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 18 Dec 2025 21:51:21 +0800 Subject: [PATCH 01/16] add debug logs for request insertion --- lib/ClientConnection.cc | 18 +++++++++++------- lib/ClientConnection.h | 6 ++++-- lib/ConsumerImpl.cc | 23 ++++++++++++++--------- lib/ProducerImpl.cc | 10 ++++++---- 4 files changed, 35 insertions(+), 22 deletions(-) diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 86dfd9d1..de73ccca 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1005,15 +1005,17 @@ Future ClientConnection::newConsumerStats(uint6 void ClientConnection::newTopicLookup(const std::string& topicName, bool authoritative, const std::string& listenerName, uint64_t requestId, const LookupDataResultPromisePtr& promise) { - newLookup(Commands::newLookup(topicName, authoritative, requestId, listenerName), requestId, promise); + newLookup(Commands::newLookup(topicName, authoritative, requestId, listenerName), requestId, "LOOKUP", + promise); } void ClientConnection::newPartitionedMetadataLookup(const std::string& topicName, uint64_t requestId, const LookupDataResultPromisePtr& promise) { - newLookup(Commands::newPartitionMetadataRequest(topicName, requestId), requestId, promise); + newLookup(Commands::newPartitionMetadataRequest(topicName, requestId), requestId, "PARTITIONED_METADATA", + promise); } -void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, +void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, const char* requestType, const LookupDataResultPromisePtr& promise) { Lock lock(mutex_); std::shared_ptr lookupDataResult; @@ -1042,6 +1044,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, pendingLookupRequests_.insert(std::make_pair(requestId, requestData)); numOfPendingLookupRequest_++; lock.unlock(); + LOG_DEBUG(cnxString_ << "Inserted lookup request " << requestType << " (req_id: " << requestId << ")"); sendCommand(cmd); } @@ -1158,12 +1161,15 @@ void ClientConnection::sendPendingCommands() { } } -Future ClientConnection::sendRequestWithId(const SharedBuffer& cmd, int requestId) { +Future ClientConnection::sendRequestWithId(const SharedBuffer& cmd, int requestId, + const char* requestType) { Lock lock(mutex_); if (isClosed()) { lock.unlock(); Promise promise; + LOG_DEBUG(cnxString_ << "Fail " << requestType << "(req_id: " << requestId + << ") to a closed connection"); promise.setFailed(ResultNotConnected); return promise.getFuture(); } @@ -1182,6 +1188,7 @@ Future ClientConnection::sendRequestWithId(const SharedBuf pendingRequests_.insert(std::make_pair(requestId, requestData)); lock.unlock(); + LOG_DEBUG(cnxString_ << "Inserted request " << requestType << " (req_id: " << requestId << ")"); sendCommand(cmd); return requestData.promise.getFuture(); } @@ -1625,9 +1632,6 @@ void ClientConnection::handleConsumerStatsResponse( void ClientConnection::handleLookupTopicRespose( const proto::CommandLookupTopicResponse& lookupTopicResponse) { - LOG_DEBUG(cnxString_ << "Received lookup response from server. req_id: " - << lookupTopicResponse.request_id()); - Lock lock(mutex_); auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id()); if (it != pendingLookupRequests_.end()) { diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 18a7d846..0d2085cb 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -185,7 +185,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this sendRequestWithId(const SharedBuffer& cmd, int requestId); + Future sendRequestWithId(const SharedBuffer& cmd, int requestId, + const char* requestType); const std::string& brokerAddress() const; @@ -264,7 +265,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_thissendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId); + cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, + "CLOSE_CONSUMER"); cnx->removeConsumer(consumerId_); LOG_INFO(consumerStr_ << "Closed consumer for race condition: " << consumerId_); } else { @@ -235,6 +236,7 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c // Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after // sending the subscribe request. cnx->registerConsumer(consumerId_, get_shared_this_ptr()); + LOG_DEBUG(cnx->cnxString() << "Registered consumer " << consumerId_); if (duringSeek()) { ackGroupingTrackerPtr_->flushAndClean(); @@ -259,7 +261,7 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c // Keep a reference to ensure object is kept alive. auto self = get_shared_this_ptr(); setFirstRequestIdAfterConnect(requestId); - cnx->sendRequestWithId(cmd, requestId) + cnx->sendRequestWithId(cmd, requestId, "SUBSCRIBE") .addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) { Result handleResult = handleCreateConsumer(cnx, result); if (handleResult == ResultOk) { @@ -301,7 +303,8 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result LOG_INFO(getName() << "Closing subscribed consumer since it was already closed"); int requestId = client->newRequestId(); auto name = getName(); - cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId) + cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, + "CLOSE_CONSUMER") .addListener([name](Result result, const ResponseData&) { if (result == ResultOk) { LOG_INFO(name << "Closed consumer successfully after subscribe completed"); @@ -354,7 +357,8 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result // in case it was indeed created, otherwise it might prevent new subscribe operation, // since we are not closing the connection auto requestId = newRequestId(); - cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId); + cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, + "CLOSE_CONSUMER"); } if (consumerCreatedPromise_.isComplete()) { @@ -408,7 +412,7 @@ void ConsumerImpl::unsubscribeAsync(const ResultCallback& originalCallback) { auto requestId = newRequestId(); SharedBuffer cmd = Commands::newUnsubscribe(consumerId_, requestId); auto self = get_shared_this_ptr(); - cnx->sendRequestWithId(cmd, requestId) + cnx->sendRequestWithId(cmd, requestId, "UNSUBSCRIBE") .addListener([self, callback](Result result, const ResponseData&) { callback(result); }); } else { Result result = ResultNotConnected; @@ -1374,7 +1378,7 @@ void ConsumerImpl::closeAsync(const ResultCallback& originalCallback) { auto requestId = newRequestId(); auto self = get_shared_this_ptr(); - cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId) + cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, "CLOSE_CONSUMER") .addListener([self, callback](Result result, const ResponseData&) { callback(result); }); } @@ -1752,7 +1756,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c auto weakSelf = weak_from_this(); - cnx->sendRequestWithId(seek, requestId) + cnx->sendRequestWithId(seek, requestId, "SEEK") .addListener([this, weakSelf, callback, originalSeekMessageId](Result result, const ResponseData& responseData) { auto self = weakSelf.lock(); @@ -1928,7 +1932,7 @@ void ConsumerImpl::doImmediateAck(const ClientConnectionPtr& cnx, const MessageI auto requestId = newRequestId(); cnx->sendRequestWithId( Commands::newAck(consumerId_, msgId.ledgerId(), msgId.entryId(), ackSet, ackType, requestId), - requestId) + requestId, "ACK") .addListener([callback](Result result, const ResponseData&) { if (callback) { callback(result); @@ -1958,7 +1962,8 @@ void ConsumerImpl::doImmediateAck(const ClientConnectionPtr& cnx, const std::set if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) { if (config_.isAckReceiptEnabled()) { auto requestId = newRequestId(); - cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, ackMsgIds, requestId), requestId) + cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, ackMsgIds, requestId), requestId, + "ACK") .addListener([callback](Result result, const ResponseData&) { if (callback) { callback(result); diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 9d6a9a08..7fd14c7c 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -160,7 +160,7 @@ Future ProducerImpl::connectionOpened(const ClientConnectionPtr& c // Keep a reference to ensure object is kept alive. auto self = shared_from_this(); setFirstRequestIdAfterConnect(requestId); - cnx->sendRequestWithId(cmd, requestId) + cnx->sendRequestWithId(cmd, requestId, "PRODUCER") .addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) { Result handleResult = handleCreateProducer(cnx, result, responseData); if (handleResult == ResultOk) { @@ -204,7 +204,8 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result auto client = client_.lock(); if (client) { int requestId = client->newRequestId(); - cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId); + cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId, + "CLOSE_PRODUCER"); } } if (!producerCreatedPromise_.isComplete()) { @@ -266,7 +267,8 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result auto client = client_.lock(); if (client) { int requestId = client->newRequestId(); - cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId); + cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId, + "CLOSE_PRODUCER"); } } @@ -818,7 +820,7 @@ void ProducerImpl::closeAsync(CloseCallback originalCallback) { int requestId = client->newRequestId(); auto self = shared_from_this(); - cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId) + cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId, "CLOSE_PRODUCER") .addListener([self, callback](Result result, const ResponseData&) { callback(result); }); } From 9d32866ed70d075a46f171676e2c046fe462aa4b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 13 Jan 2026 21:23:45 +0800 Subject: [PATCH 02/16] Add tests for the race --- lib/ClientConnection.cc | 40 ++++++++++++++++++++++++++++++++++++++- lib/ClientConnection.h | 15 +++++++++++++++ tests/ConsumerSeekTest.cc | 19 +++++++++++++++++++ 3 files changed, 73 insertions(+), 1 deletion(-) diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index de73ccca..57470795 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include "AsioDefines.h" @@ -1189,7 +1190,44 @@ Future ClientConnection::sendRequestWithId(const SharedBuf lock.unlock(); LOG_DEBUG(cnxString_ << "Inserted request " << requestType << " (req_id: " << requestId << ")"); - sendCommand(cmd); + if (mockingRequests_) { + if (auto iter = mockRequestDelays_.find(requestType); iter != mockRequestDelays_.end()) { + auto self = shared_from_this(); + if (strcmp(requestType, "SEEK") == 0) { + // Mock the `CLOSE_CONSUMER` command sent by broker, for simplicity, disconnect all consumers + executor_->postWork([this, self] { + std::vector consumerIds; + { + Lock lock(mutex_); + for (const auto& entry : consumers_) { + if (auto consumer = entry.second.lock()) { + consumerIds.push_back(consumer->getConsumerId()); + } + } + } + for (auto consumerId : consumerIds) { + proto::CommandCloseConsumer closeConsumerCmd; + closeConsumerCmd.set_consumer_id(consumerId); + self->handleCloseConsumer(closeConsumerCmd); + } + }); + } + auto timer = executor_->createDeadlineTimer(); + timer->expires_after(std::chrono::milliseconds(iter->second)); + LOG_INFO("Request " << requestType << " (req_id: " << requestId << ") is being delayed for " + << iter->second << " ms"); + timer->async_wait([self, cmd, requestId, timer](const ASIO_ERROR& ec) { + LOG_INFO("Complete request id: " << requestId); + proto::CommandSuccess success; + success.set_request_id(requestId); + self->handleSuccess(success); + }); + } else { + sendCommand(cmd); + } + } else { + sendCommand(cmd); + } return requestData.promise.getFuture(); } diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 0d2085cb..ce857f45 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -123,6 +123,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this; public: typedef std::shared_ptr SocketPtr; @@ -209,6 +211,14 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this newGetSchema(const std::string& topicName, const std::string& version, uint64_t requestId); + void mockRequestDelay(RequestDelayType requestDelays) { + if (mockingRequests_) { + throw new std::runtime_error("Already mocking requests"); + } + mockRequestDelays_.swap(requestDelays); + mockingRequests_ = true; + } + private: struct PendingRequestData { Promise promise; @@ -393,6 +403,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this& consumerStatsRequests); void startConsumerStatsTimer(std::vector consumerStatsRequests); diff --git a/tests/ConsumerSeekTest.cc b/tests/ConsumerSeekTest.cc index f03ea5e3..c70cdad9 100644 --- a/tests/ConsumerSeekTest.cc +++ b/tests/ConsumerSeekTest.cc @@ -24,7 +24,9 @@ #include #include "HttpHelper.h" +#include "lib/Latch.h" #include "lib/LogUtils.h" +#include "tests/PulsarFriend.h" DECLARE_LOG_OBJECT() @@ -200,6 +202,23 @@ TEST_F(ConsumerSeekTest, testNoInternalConsumer) { ASSERT_EQ(ResultOk, consumer.seek(MessageId::earliest())); } +// Verify the `seek` method won't be blocked forever in any order of the Subscribe response and Seek response +TEST_F(ConsumerSeekTest, testSubscribeSeekRaces) { + Client client(lookupUrl); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe("testSubscribeSeekRaces", "sub", consumer)); + + for (auto&& connection : PulsarFriend::getConnections(client)) { + connection->mockRequestDelay({{"SUBSCRIBE", 1000}, {"SEEK", 600}}); + } + + Latch latch(1); + consumer.seekAsync(0L, [&latch](Result result) { latch.countdown(); }); + ASSERT_TRUE(latch.wait(std::chrono::seconds(5))); + + client.close(); +} + INSTANTIATE_TEST_SUITE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false)); } // namespace pulsar From 814afb19b0c64baee7ca0055efed6df27a6a0ce6 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 14 Jan 2026 12:01:57 +0800 Subject: [PATCH 03/16] fix testSubscribeSeekRaces --- lib/ConsumerImpl.cc | 42 ++++++++++++++++++------------------------ lib/ConsumerImpl.h | 19 +++++++++---------- lib/Synchronized.h | 6 ++++++ 3 files changed, 33 insertions(+), 34 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 8dadc5d8..4f6c7f24 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -238,7 +238,7 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c cnx->registerConsumer(consumerId_, get_shared_this_ptr()); LOG_DEBUG(cnx->cnxString() << "Registered consumer " << consumerId_); - if (duringSeek()) { + if (hasPendingSeek_.load(std::memory_order_acquire)) { ackGroupingTrackerPtr_->flushAndClean(); } @@ -269,6 +269,7 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c } else { promise.setFailed(handleResult); } + completeSeekCallback(ResultOk); }); return promise.getFuture(); @@ -1129,7 +1130,7 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) { * `startMessageId_` is updated so that we can discard messages after delivery restarts. */ void ConsumerImpl::clearReceiveQueue() { - if (duringSeek()) { + if (hasPendingSeek_.load(std::memory_order_acquire)) { if (hasSoughtByTimestamp()) { // Invalidate startMessageId_ so that isPriorBatchIndex and isPriorEntryIndex checks will be // skipped, and hasMessageAvailableAsync won't use startMessageId_ in compare. @@ -1137,11 +1138,6 @@ void ConsumerImpl::clearReceiveQueue() { } else { startMessageId_ = seekMessageId_.get(); } - SeekStatus expected = SeekStatus::COMPLETED; - if (seekStatus_.compare_exchange_strong(expected, SeekStatus::NOT_STARTED)) { - auto seekCallback = seekCallback_.release(); - executor_->postWork([seekCallback] { seekCallback(ResultOk); }); - } return; } else if (subscriptionMode_ == Commands::SubscriptionModeDurable) { return; @@ -1554,7 +1550,9 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, const ResultCallback& callb } const auto requestId = newRequestId(); - seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), SeekArg{msgId}, callback); + auto nonNullCallback = (callback != nullptr) ? callback : [](Result) {}; + seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), SeekArg{msgId}, + std::move(nonNullCallback)); } void ConsumerImpl::seekAsync(uint64_t timestamp, const ResultCallback& callback) { @@ -1568,8 +1566,9 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, const ResultCallback& callback) } const auto requestId = newRequestId(); + auto nonNullCallback = (callback != nullptr) ? callback : [](Result) {}; seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), SeekArg{timestamp}, - callback); + std::move(nonNullCallback)); } bool ConsumerImpl::isReadCompacted() { return readCompacted_; } @@ -1727,7 +1726,7 @@ bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ == uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; } void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, const SeekArg& seekArg, - const ResultCallback& callback) { + ResultCallback&& callback) { ClientConnectionPtr cnx = getCnx().lock(); if (!cnx) { LOG_ERROR(getName() << " Client Connection not ready for Consumer"); @@ -1735,10 +1734,9 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c return; } - auto expected = SeekStatus::NOT_STARTED; - if (!seekStatus_.compare_exchange_strong(expected, SeekStatus::IN_PROGRESS)) { - LOG_ERROR(getName() << " attempted to seek " << seekArg << " when the status is " - << static_cast(expected)); + auto expected = false; + if (hasPendingSeek_.compare_exchange_strong(expected, true)) { + LOG_ERROR(getName() << " attempted to seek " << seekArg << " when there is a pending seek"); callback(ResultNotAllowedError); return; } @@ -1750,8 +1748,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c seekMessageId_ = *boost::get(&seekArg); hasSoughtByTimestamp_.store(false, std::memory_order_release); } - seekStatus_ = SeekStatus::IN_PROGRESS; - seekCallback_ = callback; + seekCallback_ = std::move(callback); LOG_INFO(getName() << " Seeking subscription to " << seekArg); auto weakSelf = weak_from_this(); @@ -1771,20 +1768,17 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c Lock lock(mutexForMessageId_); lastDequedMessageId_ = MessageId::earliest(); lock.unlock(); - if (getCnx().expired()) { - // It's during reconnection, complete the seek future after connection is established - seekStatus_ = SeekStatus::COMPLETED; - } else { + + if (!getCnx().expired()) { if (!hasSoughtByTimestamp()) { startMessageId_ = seekMessageId_.get(); } - seekCallback_.release()(result); - } + completeSeekCallback(result); + } // else: complete the seek future after connection is established } else { LOG_ERROR(getName() << "Failed to seek: " << result); seekMessageId_ = originalSeekMessageId; - seekStatus_ = SeekStatus::NOT_STARTED; - seekCallback_.release()(result); + completeSeekCallback(result); } }); } diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 63eb51d6..5e1d28ee 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -21,6 +21,7 @@ #include +#include #include #include #include @@ -77,13 +78,6 @@ const static std::string SYSTEM_PROPERTY_REAL_TOPIC = "REAL_TOPIC"; const static std::string PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID"; const static std::string DLQ_GROUP_TOPIC_SUFFIX = "-DLQ"; -enum class SeekStatus : std::uint8_t -{ - NOT_STARTED, - IN_PROGRESS, - COMPLETED -}; - class ConsumerImpl : public ConsumerImplBase { public: ConsumerImpl(const ClientImplPtr& client, const std::string& topic, const std::string& subscriptionName, @@ -230,7 +224,13 @@ class ConsumerImpl : public ConsumerImplBase { } void seekAsyncInternal(long requestId, const SharedBuffer& seek, const SeekArg& seekArg, - const ResultCallback& callback); + ResultCallback&& callback); + void completeSeekCallback(Result result) { + if (auto callback = seekCallback_.release()) { + callback(result); + } + hasPendingSeek_.store(false, std::memory_order_release); + } void processPossibleToDLQ(const MessageId& messageId, const ProcessDLQCallBack& cb); std::mutex mutexForReceiveWithZeroQueueSize; @@ -274,14 +274,13 @@ class ConsumerImpl : public ConsumerImplBase { MessageId lastDequedMessageId_{MessageId::earliest()}; MessageId lastMessageIdInBroker_{MessageId::earliest()}; - std::atomic seekStatus_{SeekStatus::NOT_STARTED}; Synchronized seekCallback_{[](Result) {}}; Synchronized> startMessageId_; + std::atomic_bool hasPendingSeek_{false}; Synchronized seekMessageId_{MessageId::earliest()}; std::atomic hasSoughtByTimestamp_{false}; bool hasSoughtByTimestamp() const { return hasSoughtByTimestamp_.load(std::memory_order_acquire); } - bool duringSeek() const { return seekStatus_ != SeekStatus::NOT_STARTED; } class ChunkedMessageCtx { public: diff --git a/lib/Synchronized.h b/lib/Synchronized.h index 5449a9fe..68ae37be 100644 --- a/lib/Synchronized.h +++ b/lib/Synchronized.h @@ -41,6 +41,12 @@ class Synchronized { return *this; } + Synchronized& operator=(T&& value) { + std::lock_guard lock(mutex_); + value_ = value; + return *this; + } + private: T value_; mutable std::mutex mutex_; From 1303c059ed2097a4124c4c54707aafcec49d1e13 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 14 Jan 2026 14:34:33 +0800 Subject: [PATCH 04/16] fix tests --- lib/ClientConnection.cc | 39 +++----------- lib/ClientConnection.h | 19 +++---- lib/ConsumerImpl.cc | 2 +- lib/ConsumerImpl.h | 8 +-- lib/MockServer.h | 105 ++++++++++++++++++++++++++++++++++++++ tests/ConsumerSeekTest.cc | 45 +++++++++++++--- 6 files changed, 165 insertions(+), 53 deletions(-) create mode 100644 lib/MockServer.h diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 57470795..4f7a1dd1 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -32,6 +32,7 @@ #include "ConsumerImpl.h" #include "ExecutorService.h" #include "LogUtils.h" +#include "MockServer.h" #include "OpSendMsg.h" #include "ProducerImpl.h" #include "PulsarApi.pb.h" @@ -1190,39 +1191,11 @@ Future ClientConnection::sendRequestWithId(const SharedBuf lock.unlock(); LOG_DEBUG(cnxString_ << "Inserted request " << requestType << " (req_id: " << requestId << ")"); - if (mockingRequests_) { - if (auto iter = mockRequestDelays_.find(requestType); iter != mockRequestDelays_.end()) { - auto self = shared_from_this(); - if (strcmp(requestType, "SEEK") == 0) { - // Mock the `CLOSE_CONSUMER` command sent by broker, for simplicity, disconnect all consumers - executor_->postWork([this, self] { - std::vector consumerIds; - { - Lock lock(mutex_); - for (const auto& entry : consumers_) { - if (auto consumer = entry.second.lock()) { - consumerIds.push_back(consumer->getConsumerId()); - } - } - } - for (auto consumerId : consumerIds) { - proto::CommandCloseConsumer closeConsumerCmd; - closeConsumerCmd.set_consumer_id(consumerId); - self->handleCloseConsumer(closeConsumerCmd); - } - }); - } - auto timer = executor_->createDeadlineTimer(); - timer->expires_after(std::chrono::milliseconds(iter->second)); - LOG_INFO("Request " << requestType << " (req_id: " << requestId << ") is being delayed for " - << iter->second << " ms"); - timer->async_wait([self, cmd, requestId, timer](const ASIO_ERROR& ec) { - LOG_INFO("Complete request id: " << requestId); - proto::CommandSuccess success; - success.set_request_id(requestId); - self->handleSuccess(success); - }); - } else { + if (mockingRequests_.load(std::memory_order_acquire)) { + if (mockServer_ == nullptr) { + LOG_WARN(cnxString_ << "Mock server is unexpectedly null when processing " << requestType); + sendCommand(cmd); + } else if (!mockServer_->sendRequest(requestType, requestId)) { sendCommand(cmd); } } else { diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index ce857f45..aae53d23 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -115,6 +115,7 @@ struct ResponseData { typedef std::shared_ptr> NamespaceTopicsPtr; +class MockServer; class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this { enum State : uint8_t { @@ -211,12 +212,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this newGetSchema(const std::string& topicName, const std::string& version, uint64_t requestId); - void mockRequestDelay(RequestDelayType requestDelays) { - if (mockingRequests_) { - throw new std::runtime_error("Already mocking requests"); - } - mockRequestDelays_.swap(requestDelays); - mockingRequests_ = true; + void attachMockServer(const std::shared_ptr& mockServer) { + mockServer_ = mockServer; + // Mark that requests will first go through the mock server, if the mock server cannot process it, + // fall back to the normal logic + mockingRequests_.store(true, std::memory_order_release); } private: @@ -320,6 +320,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this state_{Pending}; TimeDuration operationsTimeout_; AuthenticationPtr authentication_; @@ -403,10 +405,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this mockServer_; void handleConsumerStatsTimeout(const ASIO_ERROR& ec, const std::vector& consumerStatsRequests); @@ -422,6 +422,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this +#include +#include +#include + +#include "ClientConnection.h" +#include "ConsumerImpl.h" +#include "ExecutorService.h" +#include "LogUtils.h" +#include "PulsarApi.pb.h" + +namespace pulsar { + +class MockServer { + public: + using RequestDelayType = std::unordered_map; + + MockServer(const ClientConnectionPtr& connection) : connection_(connection) {} + + void setRequestDelay(std::initializer_list delays) { + std::lock_guard lock(mutex_); + for (auto&& delay : delays) { + requestDelays_[delay.first] = delay.second; + } + } + + bool sendRequest(const std::string& request, uint64_t requestId) { + auto connection = connection_.lock(); + if (!connection) { + return false; + } + std::lock_guard lock(mutex_); + if (auto iter = requestDelays_.find(request); iter != requestDelays_.end()) { + // Mock the `CLOSE_CONSUMER` command sent by broker, for simplicity, disconnect all consumers + if (request == "SEEK") { + connection->executor_->postWork([connection] { + std::vector consumerIds; + { + std::lock_guard lock{connection->mutex_}; + for (auto&& kv : connection->consumers_) { + if (auto consumer = kv.second.lock()) { + consumerIds.push_back(consumer->getConsumerId()); + } + } + } + for (auto consumerId : consumerIds) { + proto::CommandCloseConsumer closeConsumerCmd; + closeConsumerCmd.set_consumer_id(consumerId); + connection->handleCloseConsumer(closeConsumerCmd); + } + }); + } + long delayMs = iter->second; + auto timer = connection->executor_->createDeadlineTimer(); + timer->expires_from_now(std::chrono::milliseconds(delayMs)); + timer->async_wait([connection, requestId, request, timer](const auto& ec) { + if (ec) { + LOG_INFO("Timer cancelled for request " << request << " with id " << requestId); + return; + } + if (connection->isClosed()) { + LOG_INFO("Connection is closed, not completing request " << request << " with id " + << requestId); + return; + } + LOG_INFO("Completing delayed request " << request << " with id " << requestId); + proto::CommandSuccess success; + success.set_request_id(requestId); + connection->handleSuccess(success); + }); + return true; + } else { + return false; + } + } + + private: + mutable std::mutex mutex_; + std::unordered_map requestDelays_; + ClientConnectionWeakPtr connection_; + + DECLARE_LOG_OBJECT() +}; + +} // namespace pulsar diff --git a/tests/ConsumerSeekTest.cc b/tests/ConsumerSeekTest.cc index c70cdad9..c3589a93 100644 --- a/tests/ConsumerSeekTest.cc +++ b/tests/ConsumerSeekTest.cc @@ -19,13 +19,17 @@ #include #include +#include +#include +#include #include #include #include #include "HttpHelper.h" -#include "lib/Latch.h" +#include "lib/ClientConnection.h" #include "lib/LogUtils.h" +#include "lib/MockServer.h" #include "tests/PulsarFriend.h" DECLARE_LOG_OBJECT() @@ -202,19 +206,46 @@ TEST_F(ConsumerSeekTest, testNoInternalConsumer) { ASSERT_EQ(ResultOk, consumer.seek(MessageId::earliest())); } +static void assertSeekWithTimeout(Consumer& consumer) { + using namespace std::chrono_literals; + auto promise = std::make_shared>(); + consumer.seekAsync(0L, [promise](Result result) { promise->set_value(result); }); + auto future = promise->get_future(); + ASSERT_EQ(future.wait_for(5s), std::future_status::ready); + ASSERT_EQ(future.get(), ResultOk); +} + // Verify the `seek` method won't be blocked forever in any order of the Subscribe response and Seek response TEST_F(ConsumerSeekTest, testSubscribeSeekRaces) { Client client(lookupUrl); Consumer consumer; ASSERT_EQ(ResultOk, client.subscribe("testSubscribeSeekRaces", "sub", consumer)); - for (auto&& connection : PulsarFriend::getConnections(client)) { - connection->mockRequestDelay({{"SUBSCRIBE", 1000}, {"SEEK", 600}}); - } + auto connection = *PulsarFriend::getConnections(client).begin(); + auto mockServer = std::make_shared(connection); + connection->attachMockServer(mockServer); + + mockServer->setRequestDelay({{"SUBSCRIBE", 1000}, {"SEEK", 500}}); + assertSeekWithTimeout(consumer); + + mockServer->setRequestDelay({{"SUBSCRIBE", 500}, {"SEEK", 1000}}); + assertSeekWithTimeout(consumer); + + client.close(); +} + +TEST_F(ConsumerSeekTest, testReconnectionSlow) { + Client client(lookupUrl, ClientConfiguration().setInitialBackoffIntervalMs(500)); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe("testSubscribeSeekRaces", "sub", consumer)); + + auto connection = *PulsarFriend::getConnections(client).begin(); + auto mockServer = std::make_shared(connection); + connection->attachMockServer(mockServer); - Latch latch(1); - consumer.seekAsync(0L, [&latch](Result result) { latch.countdown(); }); - ASSERT_TRUE(latch.wait(std::chrono::seconds(5))); + // Make seek response received before `connectionOpened` is called + mockServer->setRequestDelay({{"SEEK", 100}}); + assertSeekWithTimeout(consumer); client.close(); } From b030d2c47ff9d18d4c7f129e245e8b2ee140c710 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 14 Jan 2026 19:11:46 +0800 Subject: [PATCH 05/16] track incomplete requests --- lib/MockServer.h | 36 ++++++++++++++++++++++++++++++------ tests/ConsumerSeekTest.cc | 11 +++++++++-- 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/lib/MockServer.h b/lib/MockServer.h index 259eecfd..ddfd2c55 100644 --- a/lib/MockServer.h +++ b/lib/MockServer.h @@ -19,6 +19,7 @@ #pragma once #include +#include #include #include #include @@ -31,7 +32,7 @@ namespace pulsar { -class MockServer { +class MockServer : public std::enable_shared_from_this { public: using RequestDelayType = std::unordered_map; @@ -72,18 +73,25 @@ class MockServer { } long delayMs = iter->second; auto timer = connection->executor_->createDeadlineTimer(); + auto key = request + std::to_string(requestId); + pendingTimers_[key] = timer; timer->expires_from_now(std::chrono::milliseconds(delayMs)); - timer->async_wait([connection, requestId, request, timer](const auto& ec) { + + auto self = shared_from_this(); + timer->async_wait([this, self, key, connection, requestId, timer](const auto& ec) { + { + std::lock_guard lock(mutex_); + pendingTimers_.erase(key); + } if (ec) { - LOG_INFO("Timer cancelled for request " << request << " with id " << requestId); + LOG_INFO("Timer cancelled for request " << key); return; } if (connection->isClosed()) { - LOG_INFO("Connection is closed, not completing request " << request << " with id " - << requestId); + LOG_INFO("Connection is closed, not completing request " << key); return; } - LOG_INFO("Completing delayed request " << request << " with id " << requestId); + LOG_INFO("Completing delayed request " << key); proto::CommandSuccess success; success.set_request_id(requestId); connection->handleSuccess(success); @@ -94,9 +102,25 @@ class MockServer { } } + // Return the number of pending timers cancelled + auto close() { + std::lock_guard lock(mutex_); + auto result = pendingTimers_.size(); + for (auto&& kv : pendingTimers_) { + try { + LOG_INFO("Cancelling timer for " << kv.first); + kv.second->cancel(); + } catch (...) { + LOG_WARN("Failed to cancel timer for " << kv.first); + } + } + return result; + } + private: mutable std::mutex mutex_; std::unordered_map requestDelays_; + std::unordered_map pendingTimers_; ClientConnectionWeakPtr connection_; DECLARE_LOG_OBJECT() diff --git a/tests/ConsumerSeekTest.cc b/tests/ConsumerSeekTest.cc index c3589a93..ea8a78be 100644 --- a/tests/ConsumerSeekTest.cc +++ b/tests/ConsumerSeekTest.cc @@ -209,7 +209,12 @@ TEST_F(ConsumerSeekTest, testNoInternalConsumer) { static void assertSeekWithTimeout(Consumer& consumer) { using namespace std::chrono_literals; auto promise = std::make_shared>(); - consumer.seekAsync(0L, [promise](Result result) { promise->set_value(result); }); + std::weak_ptr> weakPromise = promise; + consumer.seekAsync(0L, [weakPromise](Result result) { + if (auto promise = weakPromise.lock()) { + promise->set_value(result); + } + }); auto future = promise->get_future(); ASSERT_EQ(future.wait_for(5s), std::future_status::ready); ASSERT_EQ(future.get(), ResultOk); @@ -231,13 +236,14 @@ TEST_F(ConsumerSeekTest, testSubscribeSeekRaces) { mockServer->setRequestDelay({{"SUBSCRIBE", 500}, {"SEEK", 1000}}); assertSeekWithTimeout(consumer); + ASSERT_EQ(mockServer->close(), 0); client.close(); } TEST_F(ConsumerSeekTest, testReconnectionSlow) { Client client(lookupUrl, ClientConfiguration().setInitialBackoffIntervalMs(500)); Consumer consumer; - ASSERT_EQ(ResultOk, client.subscribe("testSubscribeSeekRaces", "sub", consumer)); + ASSERT_EQ(ResultOk, client.subscribe("testReconnectionSlow", "sub", consumer)); auto connection = *PulsarFriend::getConnections(client).begin(); auto mockServer = std::make_shared(connection); @@ -247,6 +253,7 @@ TEST_F(ConsumerSeekTest, testReconnectionSlow) { mockServer->setRequestDelay({{"SEEK", 100}}); assertSeekWithTimeout(consumer); + ASSERT_EQ(mockServer->close(), 0); client.close(); } From f982fe5fa22faf7998f8505118b1353a72ab2533 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 15 Jan 2026 11:24:54 +0800 Subject: [PATCH 06/16] refactor the fix --- lib/ConsumerImpl.cc | 75 +++++++++++++++++++++++++++++++-------------- lib/ConsumerImpl.h | 14 +++------ 2 files changed, 56 insertions(+), 33 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index e08802d2..db9b1f57 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -23,6 +23,7 @@ #include #include +#include #include "AckGroupingTracker.h" #include "AckGroupingTrackerDisabled.h" @@ -235,19 +236,28 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c // Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after // sending the subscribe request. - cnx->registerConsumer(consumerId_, get_shared_this_ptr()); - LOG_DEBUG(cnx->cnxString() << "Registered consumer " << consumerId_); + optional subscribeMessageId; + bool duringSeek = false; + { + std::lock_guard lock(mutex_); + setCnx(cnx); + cnx->registerConsumer(consumerId_, get_shared_this_ptr()); + LOG_DEBUG(cnx->cnxString() << "Registered consumer " << consumerId_); - if (hasPendingSeek_.load(std::memory_order_acquire)) { + { + std::lock_guard lock(mutexForMessageId_); + clearReceiveQueue(); + subscribeMessageId = (subscriptionMode_ == Commands::SubscriptionModeNonDurable) + ? startMessageId_.get() + : std::nullopt; + } + + duringSeek = seekCallback_.has_value(); + } + if (duringSeek) { ackGroupingTrackerPtr_->flushAndClean(); } - Lock lockForMessageId(mutexForMessageId_); - clearReceiveQueue(); - const auto subscribeMessageId = - (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? startMessageId_.get() : std::nullopt; - lockForMessageId.unlock(); - unAckedMessageTrackerPtr_->clear(); ClientImplPtr client = client_.lock(); @@ -269,7 +279,6 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c } else { promise.setFailed(handleResult); } - completeSeekCallback(ResultOk); }); return promise.getFuture(); @@ -1130,7 +1139,11 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) { * `startMessageId_` is updated so that we can discard messages after delivery restarts. */ void ConsumerImpl::clearReceiveQueue() { - if (hasPendingSeek_.load(std::memory_order_acquire)) { + // NOTE: This method must be called with `mutex_` held for thread safety where + if (seekCallback_.has_value()) { + executor_->postWork( + [callback{std::exchange(seekCallback_, std::nullopt).value()}] { callback(ResultOk); }); + if (hasSoughtByTimestamp()) { // Invalidate startMessageId_ so that isPriorBatchIndex and isPriorEntryIndex checks will be // skipped, and hasMessageAvailableAsync won't use startMessageId_ in compare. @@ -1733,9 +1746,16 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c callback(ResultNotConnected); return; } - - auto expected = false; - if (!hasPendingSeek_.compare_exchange_strong(expected, true)) { + bool hasPendingSeek = false; + { + std::lock_guard lock(mutex_); + if (seekCallback_.has_value()) { + hasPendingSeek = true; + } else { + seekCallback_ = std::move(callback); + } + } + if (hasPendingSeek) { LOG_ERROR(getName() << " attempted to seek " << seekArg << " when there is a pending seek"); callback(ResultNotAllowedError); return; @@ -1748,37 +1768,46 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c seekMessageId_ = *boost::get(&seekArg); hasSoughtByTimestamp_.store(false, std::memory_order_release); } - seekCallback_ = std::move(callback); LOG_INFO(getName() << " Seeking subscription to " << seekArg); auto weakSelf = weak_from_this(); cnx->sendRequestWithId(seek, requestId, "SEEK") - .addListener([this, weakSelf, callback, originalSeekMessageId](Result result, - const ResponseData& responseData) { + .addListener([this, weakSelf, originalSeekMessageId](Result result, + const ResponseData& responseData) { auto self = weakSelf.lock(); if (!self) { - callback(result); return; } if (result == ResultOk) { LOG_INFO(getName() << "Seek successfully"); ackGroupingTrackerPtr_->flushAndClean(); incomingMessages_.clear(); - Lock lock(mutexForMessageId_); - lastDequedMessageId_ = MessageId::earliest(); - lock.unlock(); + { + std::lock_guard lock(mutexForMessageId_); + lastDequedMessageId_ = MessageId::earliest(); + } + std::lock_guard lock(mutex_); if (!getCnx().expired()) { if (!hasSoughtByTimestamp()) { startMessageId_ = seekMessageId_.get(); } - completeSeekCallback(result); + if (!seekCallback_.has_value()) { + LOG_ERROR(getName() << "Seek callback is not set"); + return; + } + executor_->postWork( + [self, callback{std::exchange(seekCallback_, std::nullopt).value()}]() { + callback(ResultOk); + }); } // else: complete the seek future after connection is established } else { LOG_ERROR(getName() << "Failed to seek: " << result); seekMessageId_ = originalSeekMessageId; - completeSeekCallback(result); + executor_->postWork([self, callback{std::exchange(seekCallback_, std::nullopt).value()}]() { + callback(ResultOk); + }); } }); } diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 9a1fe7cb..4c5768be 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -225,14 +226,6 @@ class ConsumerImpl : public ConsumerImplBase { void seekAsyncInternal(long requestId, const SharedBuffer& seek, const SeekArg& seekArg, ResultCallback&& callback); - void completeSeekCallback(Result result) { - bool expected = true; - if (hasPendingSeek_.compare_exchange_strong(expected, false)) { - if (auto callback = seekCallback_.release()) { - callback(result); - } - } - } void processPossibleToDLQ(const MessageId& messageId, const ProcessDLQCallBack& cb); std::mutex mutexForReceiveWithZeroQueueSize; @@ -276,9 +269,10 @@ class ConsumerImpl : public ConsumerImplBase { MessageId lastDequedMessageId_{MessageId::earliest()}; MessageId lastMessageIdInBroker_{MessageId::earliest()}; - Synchronized seekCallback_{[](Result) {}}; + // NOTE: The modification must be protected by `mutex_` + std::optional seekCallback_; + Synchronized> startMessageId_; - std::atomic_bool hasPendingSeek_{false}; Synchronized seekMessageId_{MessageId::earliest()}; std::atomic hasSoughtByTimestamp_{false}; From 9acf2a6e64d811a18875186e0de19cc78ef8c1ba Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 15 Jan 2026 12:17:06 +0800 Subject: [PATCH 07/16] Fix ExtensibleLoadManagerTest --- lib/ConsumerImpl.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index db9b1f57..b2cc93a7 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -334,8 +334,8 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result return ResultAlreadyClosed; } + mutexLock.unlock(); LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString()); - setCnx(cnx); incomingMessages_.clear(); possibleSendToDeadLetterTopicMessages_.clear(); backoff_.reset(); From 40cec67632f7dfbab785856340901a1d0ba631a4 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 15 Jan 2026 14:36:26 +0800 Subject: [PATCH 08/16] fix --- lib/ConsumerImpl.cc | 171 ++++++++++++++++++++++---------------------- lib/ConsumerImpl.h | 59 +++++++++------ lib/HandlerBase.cc | 2 +- lib/HandlerBase.h | 2 +- lib/MockServer.h | 1 + 5 files changed, 126 insertions(+), 109 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index b2cc93a7..95e64760 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -24,6 +24,7 @@ #include #include +#include #include "AckGroupingTracker.h" #include "AckGroupingTrackerDisabled.h" @@ -124,7 +125,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr& client, const std::string& topic negativeAcksTracker_(std::make_shared(client, *this, conf)), ackGroupingTrackerPtr_(newAckGroupingTracker(topic, conf, client)), readCompacted_(conf.isReadCompacted()), - startMessageId_(getStartMessageId(startMessageId, conf.isStartMessageIdInclusive())), + startMessageId_(pulsar::getStartMessageId(startMessageId, conf.isStartMessageIdInclusive())), maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()), autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()), expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()), @@ -237,25 +238,16 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c // Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after // sending the subscribe request. optional subscribeMessageId; - bool duringSeek = false; { - std::lock_guard lock(mutex_); + LockGuard lock{mutex_}; setCnx(cnx); cnx->registerConsumer(consumerId_, get_shared_this_ptr()); LOG_DEBUG(cnx->cnxString() << "Registered consumer " << consumerId_); - { - std::lock_guard lock(mutexForMessageId_); - clearReceiveQueue(); - subscribeMessageId = (subscriptionMode_ == Commands::SubscriptionModeNonDurable) - ? startMessageId_.get() - : std::nullopt; - } - - duringSeek = seekCallback_.has_value(); - } - if (duringSeek) { - ackGroupingTrackerPtr_->flushAndClean(); + clearReceiveQueue(); + subscribeMessageId = + (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? startMessageId_ : std::nullopt; + lastDequedMessageId_ = MessageId::earliest(); } unAckedMessageTrackerPtr_->clear(); @@ -279,6 +271,15 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c } else { promise.setFailed(handleResult); } + // Complete the seek callback after completing `promise`, otherwise `reconnectionPending_` will + // still be true when the seek operation is done. + LockGuard lock{mutex_}; + if (seekStatus_ == SeekStatus::COMPLETED) { + executor_->postWork([seekCallback{std::exchange(seekCallback_, std::nullopt).value()}]() { + seekCallback(ResultOk); + }); + seekStatus_ = SeekStatus::NOT_STARTED; + } }); return promise.getFuture(); @@ -516,9 +517,10 @@ optional ConsumerImpl::processMessageChunk(const SharedBuffer& pay auto& chunkedMsgCtx = it->second; if (it == chunkedMessageCache_.end() || !chunkedMsgCtx.validateChunkId(chunkId)) { - auto startMessageId = startMessageId_.get().value_or(MessageId::earliest()); - if (!config_.isStartMessageIdInclusive() && startMessageId.ledgerId() == messageId.ledgerId() && - startMessageId.entryId() == messageId.entryId()) { + auto startMessageId = getStartMessageId(); + if (!config_.isStartMessageIdInclusive() && startMessageId && + startMessageId->ledgerId() == messageId.ledgerId() && + startMessageId->entryId() == messageId.entryId()) { // When the start message id is not inclusive, the last chunk of the previous chunked message will // be delivered, which is expected and we only need to filter it out. chunkedMessageCache_.remove(uuid); @@ -635,17 +637,14 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: words[i] = msg.ack_set(i); } BitSet ackSet{std::move(words)}; - Lock lock(mutex_); numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m, ackSet, msg.redelivery_count()); } else { // try convert key value data. m.impl_->convertPayloadToKeyValue(config_.getSchema()); - const auto startMessageId = startMessageId_.get(); - if (isPersistent_ && startMessageId && - m.getMessageId().ledgerId() == startMessageId.value().ledgerId() && - m.getMessageId().entryId() == startMessageId.value().entryId() && - isPriorEntryIndex(m.getMessageId().entryId())) { + const auto startMessageId = getStartMessageId(); + if (isPersistent_ && startMessageId && m.getMessageId().ledgerId() == startMessageId->ledgerId() && + isPrior(m.getMessageId().entryId(), startMessageId->entryId())) { LOG_DEBUG(getName() << " Ignoring message from before the startMessageId: " << startMessageId.value()); return; @@ -767,7 +766,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection auto batchSize = batchedMessage.impl_->metadata.num_messages_in_batch(); LOG_DEBUG("Received Batch messages of size - " << batchSize << " -- msgId: " << batchedMessage.getMessageId()); - const auto startMessageId = startMessageId_.get(); + const auto startMessageId = getStartMessageId(); int skippedMessages = 0; @@ -797,9 +796,9 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection // If we are receiving a batch message, we need to discard messages that were prior // to the startMessageId - if (isPersistent_ && msgId.ledgerId() == startMessageId.value().ledgerId() && - msgId.entryId() == startMessageId.value().entryId() && - isPriorBatchIndex(msgId.batchIndex())) { + if (isPersistent_ && msgId.ledgerId() == startMessageId->ledgerId() && + msgId.entryId() == startMessageId->entryId() && + isPrior(msgId.batchIndex(), startMessageId->batchIndex())) { LOG_DEBUG(getName() << "Ignoring message from before the startMessageId" << msg.getMessageId()); ++skippedMessages; @@ -939,7 +938,7 @@ void ConsumerImpl::internalListener() { trackMessage(msg.getMessageId()); try { consumerStatsBasePtr_->receivedMessage(msg, ResultOk); - lastDequedMessageId_ = msg.getMessageId(); + setLastDequedMessageId(msg.getMessageId()); Consumer consumer{get_shared_this_ptr()}; Message interceptMsg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); messageListener_(consumer, interceptMsg); @@ -1112,10 +1111,7 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) { } void ConsumerImpl::messageProcessed(Message& msg, bool track) { - Lock lock(mutexForMessageId_); - lastDequedMessageId_ = msg.getMessageId(); - lock.unlock(); - + setLastDequedMessageId(msg.getMessageId()); incomingMessagesSize_.fetch_sub(msg.getLength()); ClientConnectionPtr currentCnx = getCnx().lock(); @@ -1137,19 +1133,18 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) { * was * not seen by the application * `startMessageId_` is updated so that we can discard messages after delivery restarts. + * NOTE: `mutex_` must be locked before calling this method. */ void ConsumerImpl::clearReceiveQueue() { - // NOTE: This method must be called with `mutex_` held for thread safety where - if (seekCallback_.has_value()) { - executor_->postWork( - [callback{std::exchange(seekCallback_, std::nullopt).value()}] { callback(ResultOk); }); - - if (hasSoughtByTimestamp()) { - // Invalidate startMessageId_ so that isPriorBatchIndex and isPriorEntryIndex checks will be - // skipped, and hasMessageAvailableAsync won't use startMessageId_ in compare. - startMessageId_ = std::nullopt; + if (seekStatus_ != SeekStatus::NOT_STARTED) { + // Flush the pending ACKs in case newly arrived messages are filtered out by the previous pending ACKs + ackGroupingTrackerPtr_->flushAndClean(); + if (std::holds_alternative(lastSeekArg_)) { + startMessageId_ = std::get(lastSeekArg_); } else { - startMessageId_ = seekMessageId_.get(); + // Invalidate startMessageId_ so that `isPrior` checks will be skipped, and + // `hasMessageAvailableAsync` won't use `startMessageId_` in compare. + startMessageId_ = std::nullopt; } return; } else if (subscriptionMode_ == Commands::SubscriptionModeDurable) { @@ -1568,7 +1563,7 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, const ResultCallback& callb std::move(nonNullCallback)); } -void ConsumerImpl::seekAsync(uint64_t timestamp, const ResultCallback& callback) { +void ConsumerImpl::seekAsync(SeekTimestampType timestamp, const ResultCallback& callback) { const auto state = state_.load(); if (state == Closed || state == Closing) { LOG_ERROR(getName() << "Client connection already closed."); @@ -1593,16 +1588,16 @@ void ConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& c } bool compareMarkDeletePosition; { - std::lock_guard lock{mutexForMessageId_}; + LockGuard lock{mutex_}; compareMarkDeletePosition = // there is no message received by consumer, so we cannot compare the last position with the last // received position lastDequedMessageId_ == MessageId::earliest() && // If the start message id is latest, we should seek to the actual last message first. - (startMessageId_.get().value_or(MessageId::earliest()) == MessageId::latest() || + (startMessageId_.value_or(MessageId::earliest()) == MessageId::latest() || // If there is a previous seek operation by timestamp, the start message id will be incorrect, so // we cannot compare the start position with the last position. - hasSoughtByTimestamp()); + std::holds_alternative(lastSeekArg_)); } if (compareMarkDeletePosition) { auto self = get_shared_this_ptr(); @@ -1623,7 +1618,12 @@ void ConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& c callback(ResultOk, false); } }; - if (self->config_.isStartMessageIdInclusive() && !self->hasSoughtByTimestamp()) { + bool lastSeekIsByTimestamp = false; + { + LockGuard lock{self->mutex_}; + lastSeekIsByTimestamp = std::holds_alternative(self->lastSeekArg_); + } + if (self->config_.isStartMessageIdInclusive() && !lastSeekIsByTimestamp) { self->seekAsync(response.getLastMessageId(), [callback, handleResponse](Result result) { if (result != ResultOk) { callback(result, {}); @@ -1680,9 +1680,10 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time .addListener([this, self, callback](Result result, const GetLastMessageIdResponse& response) { if (result == ResultOk) { LOG_DEBUG(getName() << "getLastMessageId: " << response); - Lock lock(mutexForMessageId_); - lastMessageIdInBroker_ = response.getLastMessageId(); - lock.unlock(); + { + LockGuard lock{mutex_}; + lastMessageIdInBroker_ = response.getLastMessageId(); + } } else { LOG_ERROR(getName() << "Failed to getLastMessageId: " << result); } @@ -1747,51 +1748,57 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c return; } bool hasPendingSeek = false; + // Save the previous last seek arg in case seek failed + decltype(lastSeekArg_) previousLastSeekArg; { std::lock_guard lock(mutex_); - if (seekCallback_.has_value()) { + if (seekStatus_ != SeekStatus::NOT_STARTED) { hasPendingSeek = true; } else { + if (seekCallback_.has_value()) { + // This should never happen + LOG_ERROR(getName() << "Previous seek callback is not triggered unexpectedly"); + executor_->postWork([callback{std::exchange(seekCallback_, std::nullopt).value()}] { + callback(ResultTimeout); + }); + } seekCallback_ = std::move(callback); + previousLastSeekArg = lastSeekArg_; + lastSeekArg_ = seekArg; } } if (hasPendingSeek) { - LOG_ERROR(getName() << " attempted to seek " << seekArg << " when there is a pending seek"); + std::visit( + [this](auto&& arg) { + LOG_ERROR(getName() << "Attempted to seek " << arg << " when there is a pending seek"); + }, + seekArg); callback(ResultNotAllowedError); return; } - const auto originalSeekMessageId = seekMessageId_.get(); - if (boost::get(&seekArg)) { - hasSoughtByTimestamp_.store(true, std::memory_order_release); - } else { - seekMessageId_ = *boost::get(&seekArg); - hasSoughtByTimestamp_.store(false, std::memory_order_release); - } - LOG_INFO(getName() << " Seeking subscription to " << seekArg); + std::visit([this](auto&& arg) { LOG_INFO(getName() << "Seeking subscription to " << arg); }, seekArg); auto weakSelf = weak_from_this(); cnx->sendRequestWithId(seek, requestId, "SEEK") - .addListener([this, weakSelf, originalSeekMessageId](Result result, - const ResponseData& responseData) { + .addListener([this, weakSelf, previousLastSeekArg](Result result, const ResponseData& responseData) { auto self = weakSelf.lock(); if (!self) { return; } if (result == ResultOk) { - LOG_INFO(getName() << "Seek successfully"); - ackGroupingTrackerPtr_->flushAndClean(); - incomingMessages_.clear(); - { - std::lock_guard lock(mutexForMessageId_); - lastDequedMessageId_ = MessageId::earliest(); - } - - std::lock_guard lock(mutex_); - if (!getCnx().expired()) { - if (!hasSoughtByTimestamp()) { - startMessageId_ = seekMessageId_.get(); + LockGuard lock(mutex_); + if (getCnx().expired() || reconnectionPending_) { + // It's during reconnection, complete the seek future after connection is established + seekStatus_ = SeekStatus::COMPLETED; + LOG_INFO(getName() << "Delay the seek future until the reconnection is done"); + } else { + LOG_INFO(getName() << "Seek successfully"); + ackGroupingTrackerPtr_->flushAndClean(); + incomingMessages_.clear(); + if (std::holds_alternative(lastSeekArg_)) { + startMessageId_ = std::get(lastSeekArg_); } if (!seekCallback_.has_value()) { LOG_ERROR(getName() << "Seek callback is not set"); @@ -1801,10 +1808,12 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c [self, callback{std::exchange(seekCallback_, std::nullopt).value()}]() { callback(ResultOk); }); + seekStatus_ = SeekStatus::NOT_STARTED; } // else: complete the seek future after connection is established } else { LOG_ERROR(getName() << "Failed to seek: " << result); - seekMessageId_ = originalSeekMessageId; + LockGuard lock{mutex_}; + lastSeekArg_ = previousLastSeekArg; executor_->postWork([self, callback{std::exchange(seekCallback_, std::nullopt).value()}]() { callback(ResultOk); }); @@ -1812,16 +1821,6 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c }); } -bool ConsumerImpl::isPriorBatchIndex(int32_t idx) { - return config_.isStartMessageIdInclusive() ? idx < startMessageId_.get().value().batchIndex() - : idx <= startMessageId_.get().value().batchIndex(); -} - -bool ConsumerImpl::isPriorEntryIndex(int64_t idx) { - return config_.isStartMessageIdInclusive() ? idx < startMessageId_.get().value().entryId() - : idx <= startMessageId_.get().value().entryId(); -} - bool ConsumerImpl::hasEnoughMessagesForBatchReceive() const { if (batchReceivePolicy_.getMaxNumMessages() <= 0 && batchReceivePolicy_.getMaxNumBytes() <= 0) { return false; diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 4c5768be..c739aa11 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -22,7 +22,6 @@ #include #include -#include #include #include #include @@ -30,6 +29,7 @@ #include #include #include +#include #include "BrokerConsumerStatsImpl.h" #include "Commands.h" @@ -79,6 +79,13 @@ const static std::string SYSTEM_PROPERTY_REAL_TOPIC = "REAL_TOPIC"; const static std::string PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID"; const static std::string DLQ_GROUP_TOPIC_SUFFIX = "-DLQ"; +enum class SeekStatus : std::uint8_t +{ + NOT_STARTED, // there is no pending seek RPC so that it's allowed to seek + IN_PROGRESS, // the seek RPC is in progress + COMPLETED // the seek RPC is done but the connection is not established yet +}; + class ConsumerImpl : public ConsumerImplBase { public: ConsumerImpl(const ClientImplPtr& client, const std::string& topic, const std::string& subscriptionName, @@ -133,7 +140,9 @@ class ConsumerImpl : public ConsumerImplBase { void getBrokerConsumerStatsAsync(const BrokerConsumerStatsCallback& callback) override; void getLastMessageIdAsync(const BrokerGetLastMessageIdCallback& callback) override; void seekAsync(const MessageId& msgId, const ResultCallback& callback) override; - void seekAsync(uint64_t timestamp, const ResultCallback& callback) override; + using SeekTimestampType = uint64_t; + using SeekArg = std::variant; + void seekAsync(SeekTimestampType timestamp, const ResultCallback& callback) override; void negativeAcknowledge(const MessageId& msgId) override; bool isConnected() const override; uint64_t getNumberOfConnectedConsumer() override; @@ -186,8 +195,10 @@ class ConsumerImpl : public ConsumerImplBase { void drainIncomingMessageQueue(size_t count); uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage, const BitSet& ackSet, int redeliveryCount); - bool isPriorBatchIndex(int32_t idx); - bool isPriorEntryIndex(int64_t idx); + template + bool isPrior(T index, T startIndex) const noexcept { + return config_.isStartMessageIdInclusive() ? (index < startIndex) : (index <= startIndex); + } void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl, const BrokerConsumerStatsCallback&); enum class DecryptionResult : uint8_t @@ -213,13 +224,13 @@ class ConsumerImpl : public ConsumerImplBase { const BrokerGetLastMessageIdCallback& callback); void clearReceiveQueue(); - using SeekArg = boost::variant; friend std::ostream& operator<<(std::ostream& os, const SeekArg& seekArg) { - auto ptr = boost::get(&seekArg); - if (ptr) { - os << *ptr; + if (std::holds_alternative(seekArg)) { + os << std::get(seekArg); + } else if (std::holds_alternative(seekArg)) { + os << std::get(seekArg); } else { - os << *boost::get(&seekArg); + os << "(empty)"; } return os; } @@ -264,19 +275,13 @@ class ConsumerImpl : public ConsumerImplBase { std::shared_ptr> deadLetterProducer_; std::mutex createProducerLock_; - // Make the access to `lastDequedMessageId_` and `lastMessageIdInBroker_` thread safe - mutable std::mutex mutexForMessageId_; MessageId lastDequedMessageId_{MessageId::earliest()}; MessageId lastMessageIdInBroker_{MessageId::earliest()}; + optional startMessageId_; - // NOTE: The modification must be protected by `mutex_` - std::optional seekCallback_; - - Synchronized> startMessageId_; - Synchronized seekMessageId_{MessageId::earliest()}; - std::atomic hasSoughtByTimestamp_{false}; - - bool hasSoughtByTimestamp() const { return hasSoughtByTimestamp_.load(std::memory_order_acquire); } + SeekStatus seekStatus_{SeekStatus::NOT_STARTED}; + optional seekCallback_; + SeekArg lastSeekArg_; class ChunkedMessageCtx { public: @@ -375,7 +380,8 @@ class ConsumerImpl : public ConsumerImplBase { const ClientConnectionPtr& cnx, MessageId& messageId); bool hasMoreMessages() const { - std::lock_guard lock{mutexForMessageId_}; + LockGuard lock{mutex_}; + if (lastMessageIdInBroker_.entryId() == -1L) { return false; } @@ -383,7 +389,7 @@ class ConsumerImpl : public ConsumerImplBase { const auto inclusive = config_.isStartMessageIdInclusive(); if (lastDequedMessageId_ == MessageId::earliest()) { // If startMessageId_ is none, use latest so that this method will return false - const auto startMessageId = startMessageId_.get().value_or(MessageId::latest()); + const auto startMessageId = startMessageId_.value_or(MessageId::latest()); return inclusive ? (lastMessageIdInBroker_ >= startMessageId) : (lastMessageIdInBroker_ > startMessageId); } else { @@ -391,11 +397,22 @@ class ConsumerImpl : public ConsumerImplBase { } } + auto getStartMessageId() const { + LockGuard lock{mutex_}; + return startMessageId_; + } + auto setLastDequedMessageId(const MessageId& messageId) { + LockGuard lock{mutex_}; + lastDequedMessageId_ = messageId; + } + void doImmediateAck(const ClientConnectionPtr& cnx, const MessageId& msgId, CommandAck_AckType ackType, const ResultCallback& callback); void doImmediateAck(const ClientConnectionPtr& cnx, const std::set& msgIds, const ResultCallback& callback); + using LockGuard = std::lock_guard; + friend class PulsarFriend; friend class MultiTopicsConsumerImpl; diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc index 37c6e2d5..1a4f573e 100644 --- a/lib/HandlerBase.cc +++ b/lib/HandlerBase.cc @@ -44,9 +44,9 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, state_(NotStarted), backoff_(backoff), epoch_(0), + reconnectionPending_(false), timer_(executor_->createDeadlineTimer()), creationTimer_(executor_->createDeadlineTimer()), - reconnectionPending_(false), redirectedClusterURI_("") {} HandlerBase::~HandlerBase() { diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index acce15d9..0e733f00 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -143,6 +143,7 @@ class HandlerBase : public std::enable_shared_from_this { std::atomic state_; Backoff backoff_; uint64_t epoch_; + std::atomic reconnectionPending_; Result convertToTimeoutIfNecessary(Result result, ptime startTimestamp) const; @@ -160,7 +161,6 @@ class HandlerBase : public std::enable_shared_from_this { DeadlineTimerPtr creationTimer_; mutable std::mutex connectionMutex_; - std::atomic reconnectionPending_; ClientConnectionWeakPtr connection_; std::string redirectedClusterURI_; std::atomic firstRequestIdAfterConnect_{-1L}; diff --git a/lib/MockServer.h b/lib/MockServer.h index ddfd2c55..e3db551c 100644 --- a/lib/MockServer.h +++ b/lib/MockServer.h @@ -77,6 +77,7 @@ class MockServer : public std::enable_shared_from_this { pendingTimers_[key] = timer; timer->expires_from_now(std::chrono::milliseconds(delayMs)); + LOG_INFO("Mock sending request " << key << " with delay " << delayMs << " ms"); auto self = shared_from_this(); timer->async_wait([this, self, key, connection, requestId, timer](const auto& ec) { { From 5a0f1e3ae2a670a834d6b11800474b6d4c530e5b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 15 Jan 2026 14:37:24 +0800 Subject: [PATCH 09/16] remove unused file --- lib/Synchronized.h | 53 ---------------------------------------------- 1 file changed, 53 deletions(-) delete mode 100644 lib/Synchronized.h diff --git a/lib/Synchronized.h b/lib/Synchronized.h deleted file mode 100644 index 68ae37be..00000000 --- a/lib/Synchronized.h +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -#pragma once - -#include - -template -class Synchronized { - public: - explicit Synchronized(const T& value) : value_(value) {} - - T get() const { - std::lock_guard lock(mutex_); - return value_; - } - - T&& release() { - std::lock_guard lock(mutex_); - return std::move(value_); - } - - Synchronized& operator=(const T& value) { - std::lock_guard lock(mutex_); - value_ = value; - return *this; - } - - Synchronized& operator=(T&& value) { - std::lock_guard lock(mutex_); - value_ = value; - return *this; - } - - private: - T value_; - mutable std::mutex mutex_; -}; From eef5976308bf911bdb4ab8a839b01adb35b64210 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 15 Jan 2026 14:37:54 +0800 Subject: [PATCH 10/16] fix --- lib/ConsumerImpl.h | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index c739aa11..a1738a8a 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -39,7 +39,6 @@ #include "MapCache.h" #include "MessageIdImpl.h" #include "NegativeAcksTracker.h" -#include "Synchronized.h" #include "TestUtil.h" #include "TimeUtils.h" #include "UnboundedBlockingQueue.h" From be871576a69046c457e6f08ce85c10f461858fc5 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 15 Jan 2026 14:40:03 +0800 Subject: [PATCH 11/16] remove unused operator<< --- lib/ConsumerImpl.h | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index a1738a8a..6df26aee 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -223,16 +223,6 @@ class ConsumerImpl : public ConsumerImplBase { const BrokerGetLastMessageIdCallback& callback); void clearReceiveQueue(); - friend std::ostream& operator<<(std::ostream& os, const SeekArg& seekArg) { - if (std::holds_alternative(seekArg)) { - os << std::get(seekArg); - } else if (std::holds_alternative(seekArg)) { - os << std::get(seekArg); - } else { - os << "(empty)"; - } - return os; - } void seekAsyncInternal(long requestId, const SharedBuffer& seek, const SeekArg& seekArg, ResultCallback&& callback); From 9cdf6164d3e7f67b8d76a9d956e9021f0f2b0164 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 15 Jan 2026 18:12:15 +0800 Subject: [PATCH 12/16] fix testSeekInProgress --- lib/ConsumerImpl.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 95e64760..b02ae88c 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -1755,6 +1755,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c if (seekStatus_ != SeekStatus::NOT_STARTED) { hasPendingSeek = true; } else { + seekStatus_ = SeekStatus::IN_PROGRESS; if (seekCallback_.has_value()) { // This should never happen LOG_ERROR(getName() << "Previous seek callback is not triggered unexpectedly"); From 4596ed526beec48e91fb07ad257156b07926066b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 15 Jan 2026 18:28:52 +0800 Subject: [PATCH 13/16] fix seek arg check --- lib/ConsumerImpl.cc | 25 ++++++++++++++++--------- lib/ConsumerImpl.h | 2 +- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index b02ae88c..0da56192 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -1139,12 +1139,16 @@ void ConsumerImpl::clearReceiveQueue() { if (seekStatus_ != SeekStatus::NOT_STARTED) { // Flush the pending ACKs in case newly arrived messages are filtered out by the previous pending ACKs ackGroupingTrackerPtr_->flushAndClean(); - if (std::holds_alternative(lastSeekArg_)) { - startMessageId_ = std::get(lastSeekArg_); + if (lastSeekArg_.has_value()) { + if (std::holds_alternative(lastSeekArg_.value())) { + startMessageId_ = std::get(lastSeekArg_.value()); + } else { + // Invalidate startMessageId_ so that `isPrior` checks will be skipped, and + // `hasMessageAvailableAsync` won't use `startMessageId_` in compare. + startMessageId_ = std::nullopt; + } } else { - // Invalidate startMessageId_ so that `isPrior` checks will be skipped, and - // `hasMessageAvailableAsync` won't use `startMessageId_` in compare. - startMessageId_ = std::nullopt; + LOG_ERROR(getName() << "SeekStatus is not NOT_STARTED but lastSeekArg_ is not set"); } return; } else if (subscriptionMode_ == Commands::SubscriptionModeDurable) { @@ -1597,7 +1601,7 @@ void ConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& c (startMessageId_.value_or(MessageId::earliest()) == MessageId::latest() || // If there is a previous seek operation by timestamp, the start message id will be incorrect, so // we cannot compare the start position with the last position. - std::holds_alternative(lastSeekArg_)); + (lastSeekArg_.has_value() && std::holds_alternative(lastSeekArg_.value()))); } if (compareMarkDeletePosition) { auto self = get_shared_this_ptr(); @@ -1621,7 +1625,10 @@ void ConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& c bool lastSeekIsByTimestamp = false; { LockGuard lock{self->mutex_}; - lastSeekIsByTimestamp = std::holds_alternative(self->lastSeekArg_); + if (self->lastSeekArg_.has_value() && + std::holds_alternative(self->lastSeekArg_.value())) { + lastSeekIsByTimestamp = true; + } } if (self->config_.isStartMessageIdInclusive() && !lastSeekIsByTimestamp) { self->seekAsync(response.getLastMessageId(), [callback, handleResponse](Result result) { @@ -1798,8 +1805,8 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c LOG_INFO(getName() << "Seek successfully"); ackGroupingTrackerPtr_->flushAndClean(); incomingMessages_.clear(); - if (std::holds_alternative(lastSeekArg_)) { - startMessageId_ = std::get(lastSeekArg_); + if (lastSeekArg_.has_value() && std::holds_alternative(lastSeekArg_.value())) { + startMessageId_ = std::get(lastSeekArg_.value()); } if (!seekCallback_.has_value()) { LOG_ERROR(getName() << "Seek callback is not set"); diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 6df26aee..0da82a2d 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -270,7 +270,7 @@ class ConsumerImpl : public ConsumerImplBase { SeekStatus seekStatus_{SeekStatus::NOT_STARTED}; optional seekCallback_; - SeekArg lastSeekArg_; + optional lastSeekArg_; class ChunkedMessageCtx { public: From 98b06ec8f99d8856d80619a8cc90869ab8ed4de4 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 15 Jan 2026 19:05:55 +0800 Subject: [PATCH 14/16] address comments --- lib/ConsumerImpl.cc | 9 ++++----- lib/MockServer.h | 1 + 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 0da56192..a645f580 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -235,8 +235,6 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c return promise.getFuture(); } - // Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after - // sending the subscribe request. optional subscribeMessageId; { LockGuard lock{mutex_}; @@ -266,11 +264,11 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c cnx->sendRequestWithId(cmd, requestId, "SUBSCRIBE") .addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) { Result handleResult = handleCreateConsumer(cnx, result); - if (handleResult == ResultOk) { - promise.setSuccess(); - } else { + if (handleResult != ResultOk) { promise.setFailed(handleResult); + return; } + promise.setSuccess(); // Complete the seek callback after completing `promise`, otherwise `reconnectionPending_` will // still be true when the seek operation is done. LockGuard lock{mutex_}; @@ -1821,6 +1819,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c } else { LOG_ERROR(getName() << "Failed to seek: " << result); LockGuard lock{mutex_}; + seekStatus_ = SeekStatus::NOT_STARTED; lastSeekArg_ = previousLastSeekArg; executor_->postWork([self, callback{std::exchange(seekCallback_, std::nullopt).value()}]() { callback(ResultOk); diff --git a/lib/MockServer.h b/lib/MockServer.h index e3db551c..971116c1 100644 --- a/lib/MockServer.h +++ b/lib/MockServer.h @@ -115,6 +115,7 @@ class MockServer : public std::enable_shared_from_this { LOG_WARN("Failed to cancel timer for " << kv.first); } } + pendingTimers_.clear(); return result; } From 9159bb55340cacc33695c95245210b670c450c01 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 15 Jan 2026 19:15:21 +0800 Subject: [PATCH 15/16] improve testReconnectionSlow --- lib/MockServer.h | 14 ++++++++++++-- tests/ConsumerSeekTest.cc | 5 +++-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/lib/MockServer.h b/lib/MockServer.h index 971116c1..ae207ddd 100644 --- a/lib/MockServer.h +++ b/lib/MockServer.h @@ -36,7 +36,9 @@ class MockServer : public std::enable_shared_from_this { public: using RequestDelayType = std::unordered_map; - MockServer(const ClientConnectionPtr& connection) : connection_(connection) {} + MockServer(const ClientConnectionPtr& connection) : connection_(connection) { + requestDelays_["CLOSE_CONSUMER"] = 1; + } void setRequestDelay(std::initializer_list delays) { std::lock_guard lock(mutex_); @@ -54,7 +56,15 @@ class MockServer : public std::enable_shared_from_this { if (auto iter = requestDelays_.find(request); iter != requestDelays_.end()) { // Mock the `CLOSE_CONSUMER` command sent by broker, for simplicity, disconnect all consumers if (request == "SEEK") { - connection->executor_->postWork([connection] { + auto closeConsumerDelayMs = requestDelays_["CLOSE_CONSUMER"]; + auto timer = connection->executor_->createDeadlineTimer(); + pendingTimers_["CLOSE_CONSUMER" + std::to_string(requestId)] = timer; + timer->expires_from_now(std::chrono::milliseconds(closeConsumerDelayMs)); + timer->async_wait([connection](const auto& ec) { + if (ec) { + LOG_INFO("Timer cancelled for CLOSE_CONSUMER"); + return; + } std::vector consumerIds; { std::lock_guard lock{connection->mutex_}; diff --git a/tests/ConsumerSeekTest.cc b/tests/ConsumerSeekTest.cc index ea8a78be..f66c27d7 100644 --- a/tests/ConsumerSeekTest.cc +++ b/tests/ConsumerSeekTest.cc @@ -250,10 +250,11 @@ TEST_F(ConsumerSeekTest, testReconnectionSlow) { connection->attachMockServer(mockServer); // Make seek response received before `connectionOpened` is called - mockServer->setRequestDelay({{"SEEK", 100}}); + mockServer->setRequestDelay({{"SEEK", 500}, {"CLOSE_CONSUMER", 1000}}); assertSeekWithTimeout(consumer); - ASSERT_EQ(mockServer->close(), 0); + // The CLOSE_CONSUMER request is in still flight + ASSERT_EQ(mockServer->close(), 1); client.close(); } From 038c683277c156cf322f7bd352a070f26bc5120a Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 15 Jan 2026 21:24:30 +0800 Subject: [PATCH 16/16] fix timer for CLOSE_CONSUMER not removed --- lib/MockServer.h | 90 +++++++++++++++++++++++------------------------- 1 file changed, 44 insertions(+), 46 deletions(-) diff --git a/lib/MockServer.h b/lib/MockServer.h index ae207ddd..8e4a2136 100644 --- a/lib/MockServer.h +++ b/lib/MockServer.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -56,53 +57,25 @@ class MockServer : public std::enable_shared_from_this { if (auto iter = requestDelays_.find(request); iter != requestDelays_.end()) { // Mock the `CLOSE_CONSUMER` command sent by broker, for simplicity, disconnect all consumers if (request == "SEEK") { - auto closeConsumerDelayMs = requestDelays_["CLOSE_CONSUMER"]; - auto timer = connection->executor_->createDeadlineTimer(); - pendingTimers_["CLOSE_CONSUMER" + std::to_string(requestId)] = timer; - timer->expires_from_now(std::chrono::milliseconds(closeConsumerDelayMs)); - timer->async_wait([connection](const auto& ec) { - if (ec) { - LOG_INFO("Timer cancelled for CLOSE_CONSUMER"); - return; - } - std::vector consumerIds; - { - std::lock_guard lock{connection->mutex_}; - for (auto&& kv : connection->consumers_) { - if (auto consumer = kv.second.lock()) { - consumerIds.push_back(consumer->getConsumerId()); - } - } - } - for (auto consumerId : consumerIds) { - proto::CommandCloseConsumer closeConsumerCmd; - closeConsumerCmd.set_consumer_id(consumerId); - connection->handleCloseConsumer(closeConsumerCmd); - } - }); + schedule(connection, "CLOSE_CONSUMER" + std::to_string(requestId), + requestDelays_["CLOSE_CONSUMER"], [connection] { + std::vector consumerIds; + { + std::lock_guard lock{connection->mutex_}; + for (auto&& kv : connection->consumers_) { + if (auto consumer = kv.second.lock()) { + consumerIds.push_back(consumer->getConsumerId()); + } + } + } + for (auto consumerId : consumerIds) { + proto::CommandCloseConsumer closeConsumerCmd; + closeConsumerCmd.set_consumer_id(consumerId); + connection->handleCloseConsumer(closeConsumerCmd); + } + }); } - long delayMs = iter->second; - auto timer = connection->executor_->createDeadlineTimer(); - auto key = request + std::to_string(requestId); - pendingTimers_[key] = timer; - timer->expires_from_now(std::chrono::milliseconds(delayMs)); - - LOG_INFO("Mock sending request " << key << " with delay " << delayMs << " ms"); - auto self = shared_from_this(); - timer->async_wait([this, self, key, connection, requestId, timer](const auto& ec) { - { - std::lock_guard lock(mutex_); - pendingTimers_.erase(key); - } - if (ec) { - LOG_INFO("Timer cancelled for request " << key); - return; - } - if (connection->isClosed()) { - LOG_INFO("Connection is closed, not completing request " << key); - return; - } - LOG_INFO("Completing delayed request " << key); + schedule(connection, request + std::to_string(requestId), iter->second, [connection, requestId] { proto::CommandSuccess success; success.set_request_id(requestId); connection->handleSuccess(success); @@ -135,6 +108,31 @@ class MockServer : public std::enable_shared_from_this { std::unordered_map pendingTimers_; ClientConnectionWeakPtr connection_; + void schedule(ClientConnectionPtr& connection, const std::string& key, long delayMs, + std::function&& task) { + auto timer = connection->executor_->createDeadlineTimer(); + pendingTimers_[key] = timer; + timer->expires_from_now(std::chrono::milliseconds(delayMs)); + LOG_INFO("Mock scheduling " << key << " with delay " << delayMs << " ms"); + auto self = shared_from_this(); + timer->async_wait([this, self, key, connection, task{std::move(task)}](const auto& ec) { + { + std::lock_guard lock(mutex_); + pendingTimers_.erase(key); + } + if (ec) { + LOG_INFO("Timer cancelled for " << key); + return; + } + if (connection->isClosed()) { + LOG_INFO("Connection is closed, not completing request " << key); + return; + } + LOG_INFO("Completing delayed request " << key); + task(); + }); + } + DECLARE_LOG_OBJECT() };