-
Notifications
You must be signed in to change notification settings - Fork 3.4k
POC to avoid usage of ReplicationResult #7528
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: HBASE-28957
Are you sure you want to change the base?
Conversation
|
🎊 +1 overall
This message was automatically generated. |
|
💔 -1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ankitsol You might want to raise PR against the HBASE-28957_rebased branch, but otherwise it looks good to me.
| replicateContext.entries.stream().map(WAL.Entry::getEdit).flatMap(e -> e.getCells().stream()) | ||
| .forEach(this::checkCell); | ||
| return ReplicationResult.COMMITTED; | ||
| getReplicationSource().cleanupHFileRefsAndPersistOffsets(replicateContext.getEntries()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need to call this here?
| // replicate the batches to sink side. | ||
| parallelReplicate(replicateContext, batches); | ||
| return ReplicationResult.COMMITTED; | ||
| getReplicationSource().cleanupHFileRefsAndPersistOffsets(replicateContext.getEntries()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I think I get why you call this method here, since here we can make sure that the wal entries have been persistent, it is OK for us to persist the offset. But for me, I prefer we follow the old way where call this in ReplicationSourceShipper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But for me, I prefer we follow the old way where call this in ReplicationSourceShipper.
I think it's doable. @ankitsol ?
| /** | ||
| * Replicate the given set of entries (in the context) to the other cluster. Can block until all | ||
| * the given entries are replicated. Upon this method is returned, all entries that were passed in | ||
| * the context are assumed to be persisted in the target cluster. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I mean is that, we should add a method may be called beforePersistingReplicationOffset, and call it before we call updateLogPosition in ReplicationSourceShipper method. For old implementation, we just do nothing as we can make sure that every thing is persistent, and for S3 based endpoint, we close the writer to persist the data on S3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You cannot close the writer every time when something was shipped, because closing and re-opening the same stream is a costly operation if even supported. We have to wait for enough data to be shipped (file size limit) or the configured time spent (time limit) before closing the current stream and opening a new one. This is controlled by the replication endpoint itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you also need to change the logic in ReplicationSourceShipper, to not always record the offset after shipping. And I do not think this can 'ONLY' be controlled by replication endpoint, in ReplicationSourceShipper you know the size of the WALEntries, and you also know how much time has elapsed after the last recording, so it is easy to implement the logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's an interesting idea. @vinayakphegde @ankitsol wdyt?
@Apache9 Let's say the ReplicationSourceShipper controls when to record the offset. How would it know which kind of replication endpoint is it working with? Need to record the offset after every shipment or use time/size limit? Shall we make it a new attribute of the endpoints?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer we control it by time/size limit.
Even if the endpoint can persist the data after every shipment, we do not need to record the offset every time right? We just need to make sure that once the ReplicationSourceShipper want to record the offset, all the data before this offset has been persistent. So we can introduce a beforePersistingReplicationOffset method for replication endpoint, if you persist the data after every shipment, you just need to do nothing. If it is S3 based endpoint, we close the output file to persist the data.
In this way, the ReplicationSourceShipper does not need to know whether the endpoint can persist the data or not after every shipment. And in the future, for HBaseInterClusterReplicationEndpoint, we could also introduce some asynchronous mechanism to increase performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Apache9 This seems to be a good approach to me. We also want to move ahead with time/size, both the options.
For implementing time based approach, I need one clarity. Should this counter run in a separate thread since this would be independent of shipEdits() logic?
Another question I have is I need to save this context of time and size somewhere, do you recommend it to be inReplicationSourceShipper or ReplicationSource?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if the endpoint can persist the data after every shipment, we do not need to record the offset every time right?
Are we talking about modifying the behaviour of existing replication endpoints?
Currently both data and offsets are persisted at every shipment. Would you like to change this to be controlled by time and size limits generally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Apache9 This seems like a good approach. We also want it to be both time and size based.
I have two questions regarding time based approach
- Should this time based count run on a separate thread. Currently in
ContinuousBackupReplicationEndpointwe implemented it as a separate thread - Where should be save time/size based context?
ReplicationSourceShipperorReplicationSource, considering 'ReplicationSourceShipper' is itself a thread
| } | ||
| } | ||
|
|
||
| private ReplicationResult getReplicationResult() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As said above, we should keep these methods here, and before calling these methods, we call the method in ReplicationEndpoint out to flush data out.
No description provided.