-
Notifications
You must be signed in to change notification settings - Fork 1.5k
AMQ-9829 Track prefetched messages for duplicate suppression during failover #1616
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AMQ-9829 Track prefetched messages for duplicate suppression during failover #1616
Conversation
4e32721 to
df1285a
Compare
|
The change to ActiveMQMessageConsumer needs some discussion, add'l unit tests and perhaps some changes. A couple things:
ref: 354e764 |
|
@mattrpav I did not change the logic for the map, but I did make sure it was cleared out. So I'm not sure what you mean. Can you elaborate? As far as I can see, the map gets cleared on every transaction boundary (commit/rollback) and on consumer close, so there's no memory leak possible, right? What do you mean by why do we need a new class? |
|
@jeanouii My initial git commit made it appear that the PreviouslyDeliveredMap was newly added. After digging deeper, I see that your change makes modifications and the map is in fact cleared via doClose() call chain. Pulling this over into a separate PR will help make the review easier, and if it is a bug fix we will for sure want to back-port it. Looks like a good find! |
|
Of course, I'll cherry pick the other commit that went to another PR and create a PR to revert the commit on main |
2456684 to
55b8867
Compare
| if (!isAutoAcknowledgeBatch()) { | ||
| synchronized(deliveredMessages) { | ||
| deliveredMessages.addFirst(md); | ||
| if (session.isTransacted()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you expand on how this is only impacted by transacted sessions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bug (prefetched messages not redelivered after transaction failure) was related to transactions only. So I did not wanted to impact other scenarios. But there is outside of this a rational.
The previouslyDeliveredMessages map (and prefetchedOnly) is only used to track rollback/redelivery behavior, which only exists for transacted sessions. In a non‑transacted session there is no rollback; delivery state is driven by the ack mode (AUTO/CLIENT/DUPS_OK).
|
I plan to take a look at this tomorrow |
cshannon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jeanouii - This looks really good, thanks for the fix! The tests are really well done as well.
The only comment I had I made inline, I noticed we were iterating over the list twice so I made a suggestion to be more efficient to just iterate once. Otherwise this looks pretty good to me to merge. See what you think of my suggestion.
| List<MessageDispatch> list = unconsumedMessages.removeAll(); | ||
| if (!this.info.isBrowser()) { | ||
| if (session.isTransacted()) { | ||
| capturePrefetchedMessagesForDuplicateSuppression(list); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method iterates over the list and then right below we iterate again to rollback duplicates. Im curious if we can somehow refactor to just iterate once. Maybe something like:
for (MessageDispatch old : list) {
if (session.isTransacted()) {
capturePrefetchedMessagesForDuplicateSuppression(old);
}
session.connection.rollbackDuplicate(this, old.getMessage());
}private void capturePrefetchedMessagesForDuplicateSuppression(final MessageDispatch pending) {
if (previouslyDeliveredMessages == null) {
previouslyDeliveredMessages = new PreviouslyDeliveredMap<>(session.getTransactionContext().getTransactionId());
}
if (pending.getMessage() != null) {
previouslyDeliveredMessages.put(pending.getMessage().getMessageId(), new PreviouslyDelivered(pending, false, true));
}
LOG.trace("{} tracking existing transacted {} prefetched ({})", getConsumerId(), previouslyDeliveredMessages.transactionId, pending);
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cshannon you are totally correct. There is no need to iterate twice. Good catch!
Your proposal works fully.
We could probably even simply the method like this
private void capturePrefetchedMessagesForDuplicateSuppression(final MessageDispatch pending) {
if (pending.getMessage() == null) return; // nothing to do
if (previouslyDeliveredMessages == null) {
previouslyDeliveredMessages = new PreviouslyDeliveredMap<>(session.getTransactionContext().getTransactionId());
}
previouslyDeliveredMessages.put(pending.getMessage().getMessageId(), new PreviouslyDelivered(pending, false, true));
LOG.trace("{} tracking existing transacted {} prefetched ({})", getConsumerId(), previouslyDeliveredMessages.transactionId, pending);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good catch from @cshannon . I think we can avoid dual looping here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…ailover During failover in transacted sessions with async dispatch (MessageListener), prefetched messages sitting in the unconsumedMessages buffer were not being tracked in previouslyDeliveredMessages. This caused them to be incorrectly identified as duplicates on redelivery and poison-acked to the DLQ.
0adfd82 to
f938d06
Compare
cshannon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
@jbonofre - i think this is good to merge unless you have anything else or any other comments? i suppose we could try running the tests again after rebasing against the latest test fixes but it's probably also just fine to merge it to main and let the tests run on main |
jbonofre
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change looks good to me. I think the CI "fixes" are incremental, so let me merge this one and move forward to identify new flaky tests.
During failover in transacted sessions with async dispatch (MessageListener),
prefetched messages sitting in the unconsumedMessages buffer were not being
tracked in previouslyDeliveredMessages. This caused them to be incorrectly
identified as duplicates on redelivery and poison-acked to the DLQ.