Skip to content

Conversation

@jeanouii
Copy link
Contributor

During failover in transacted sessions with async dispatch (MessageListener),
prefetched messages sitting in the unconsumedMessages buffer were not being
tracked in previouslyDeliveredMessages. This caused them to be incorrectly
identified as duplicates on redelivery and poison-acked to the DLQ.

@jeanouii jeanouii force-pushed the fix/AMQ-9829-Track-prefetched-messages-for-duplicate-suppression-during-failover branch 2 times, most recently from 4e32721 to df1285a Compare January 16, 2026 14:21
@mattrpav mattrpav self-requested a review January 16, 2026 15:05
@mattrpav
Copy link
Contributor

The change to ActiveMQMessageConsumer needs some discussion, add'l unit tests and perhaps some changes.

A couple things:

  1. The map isn't cleared on close()
  2. Consumer's are cached a lot so we need to make sure there aren't memory leaks or issues with re-using consumers -- at the minimum we need some unit testing to check for leaks of that new map.
  3. If the map is keyed off of transactionId and only one transaction can be opened per-consumer at a time -- why do we need a new class?

ref: 354e764

@jeanouii
Copy link
Contributor Author

@mattrpav I did not change the logic for the map, but I did make sure it was cleared out. So I'm not sure what you mean. Can you elaborate?

As far as I can see, the map gets cleared on every transaction boundary (commit/rollback) and on consumer close, so there's no memory leak possible, right?

What do you mean by why do we need a new class?

@mattrpav
Copy link
Contributor

mattrpav commented Jan 16, 2026

@jeanouii My initial git commit made it appear that the PreviouslyDeliveredMap was newly added. After digging deeper, I see that your change makes modifications and the map is in fact cleared via doClose() call chain.

Pulling this over into a separate PR will help make the review easier, and if it is a bug fix we will for sure want to back-port it.

Looks like a good find!

@jeanouii
Copy link
Contributor Author

Of course, I'll cherry pick the other commit that went to another PR and create a PR to revert the commit on main

@jeanouii jeanouii force-pushed the fix/AMQ-9829-Track-prefetched-messages-for-duplicate-suppression-during-failover branch 3 times, most recently from 2456684 to 55b8867 Compare January 19, 2026 10:28
if (!isAutoAcknowledgeBatch()) {
synchronized(deliveredMessages) {
deliveredMessages.addFirst(md);
if (session.isTransacted()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand on how this is only impacted by transacted sessions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bug (prefetched messages not redelivered after transaction failure) was related to transactions only. So I did not wanted to impact other scenarios. But there is outside of this a rational.

The previouslyDeliveredMessages map (and prefetchedOnly) is only used to track rollback/redelivery behavior, which only exists for transacted sessions. In a non‑transacted session there is no rollback; delivery state is driven by the ack mode (AUTO/CLIENT/DUPS_OK).

@cshannon
Copy link
Contributor

I plan to take a look at this tomorrow

Copy link
Contributor

@cshannon cshannon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeanouii - This looks really good, thanks for the fix! The tests are really well done as well.

The only comment I had I made inline, I noticed we were iterating over the list twice so I made a suggestion to be more efficient to just iterate once. Otherwise this looks pretty good to me to merge. See what you think of my suggestion.

List<MessageDispatch> list = unconsumedMessages.removeAll();
if (!this.info.isBrowser()) {
if (session.isTransacted()) {
capturePrefetchedMessagesForDuplicateSuppression(list);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method iterates over the list and then right below we iterate again to rollback duplicates. Im curious if we can somehow refactor to just iterate once. Maybe something like:

for (MessageDispatch old : list) {
    if (session.isTransacted()) {
        capturePrefetchedMessagesForDuplicateSuppression(old);
    }
    session.connection.rollbackDuplicate(this, old.getMessage());
}
private void capturePrefetchedMessagesForDuplicateSuppression(final MessageDispatch pending) {
    if (previouslyDeliveredMessages == null) {
        previouslyDeliveredMessages = new PreviouslyDeliveredMap<>(session.getTransactionContext().getTransactionId());
    }
    if (pending.getMessage() != null) {
        previouslyDeliveredMessages.put(pending.getMessage().getMessageId(), new PreviouslyDelivered(pending, false, true));
    }
    LOG.trace("{} tracking existing transacted {} prefetched ({})", getConsumerId(), previouslyDeliveredMessages.transactionId, pending);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cshannon you are totally correct. There is no need to iterate twice. Good catch!

Your proposal works fully.
We could probably even simply the method like this

private void capturePrefetchedMessagesForDuplicateSuppression(final MessageDispatch pending) {
    if (pending.getMessage() == null) return; // nothing to do
    if (previouslyDeliveredMessages == null) {
        previouslyDeliveredMessages = new PreviouslyDeliveredMap<>(session.getTransactionContext().getTransactionId());
    }
    previouslyDeliveredMessages.put(pending.getMessage().getMessageId(), new PreviouslyDelivered(pending, false, true));
    LOG.trace("{} tracking existing transacted {} prefetched ({})", getConsumerId(), previouslyDeliveredMessages.transactionId, pending);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good catch from @cshannon . I think we can avoid dual looping here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All pushed, you can double check @cshannon and @jbonofre if you are happy with it. I squashed the commits so if you are happy, it can be merged straight

…ailover

During failover in transacted sessions with async dispatch (MessageListener),
  prefetched messages sitting in the unconsumedMessages buffer were not being
  tracked in previouslyDeliveredMessages. This caused them to be incorrectly
  identified as duplicates on redelivery and poison-acked to the DLQ.
@jeanouii jeanouii force-pushed the fix/AMQ-9829-Track-prefetched-messages-for-duplicate-suppression-during-failover branch from 0adfd82 to f938d06 Compare January 20, 2026 17:53
Copy link
Contributor

@cshannon cshannon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@cshannon
Copy link
Contributor

cshannon commented Jan 21, 2026

@jbonofre - i think this is good to merge unless you have anything else or any other comments? i suppose we could try running the tests again after rebasing against the latest test fixes but it's probably also just fine to merge it to main and let the tests run on main

Copy link
Member

@jbonofre jbonofre left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change looks good to me. I think the CI "fixes" are incremental, so let me merge this one and move forward to identify new flaky tests.

@jbonofre jbonofre merged commit 1149a9f into apache:main Jan 22, 2026
6 of 7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants