-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Inject new task count calculation during the rollover #18860
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
kfaraz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the follow up, @Fly-Style !
Left some initial drive-by comments. Will do a more thorough review later today/tomorrow.
| @SuppressWarnings("resource") | ||
| @Test | ||
| @Timeout(300) | ||
| void test_scaleDownDuringTaskRollover() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this test not be moved to the CostBasedAutoScalerIntegrationTest itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided to make it separate, because we're testing abstract autoscaler actions during the task rollover, no specifically the cost-based one. We might create lets say CPU-based autoscaler later with the requirement to scale-down during the rollover.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because we're testing abstract autoscaler actions during the task rollover, no specifically the cost-based one.
I am not sure if this is entirely true, since cost-based auto-scaler is the only one that supports task count change on rollover right now.
The new test should be in the existing CostBasedAutoScalerIntegrationTest since it uses the same cluster setup (AFAICT) and verifies an important aspect of the cost-based auto-scaler.
In the future, when we add more auto-scalers, we can add separate tests for them.
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Outdated
Show resolved
Hide resolved
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Outdated
Show resolved
Hide resolved
kfaraz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the follow up, @Fly-Style !
Left some initial drive-by comments. Will do a more thorough review later today/tomorrow.
...src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
Show resolved
Hide resolved
...g-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Show resolved
Hide resolved
...rg/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
Outdated
Show resolved
Hide resolved
...rg/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
Outdated
Show resolved
Hide resolved
...rg/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
Show resolved
Hide resolved
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Show resolved
Hide resolved
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Outdated
Show resolved
Hide resolved
01f5fee to
2719a17
Compare
2719a17 to
5936fd0
Compare
6f3ce38 to
971f978
Compare
971f978 to
fd654e2
Compare
…scaler interface method
065c5fa to
833cb88
Compare
.../main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Show resolved
Hide resolved
.../main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Outdated
Show resolved
Hide resolved
|
|
||
| if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) { | ||
| int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover(); | ||
| if (rolloverTaskCount > 0 && rolloverTaskCount < ioConfig.getTaskCount()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also allow scale up on task rollover?
.../main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Outdated
Show resolved
Hide resolved
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
Outdated
Show resolved
Hide resolved
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Show resolved
Hide resolved
kfaraz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. +1 after CI passes.
a4b199f to
299ee81
Compare
| @Override | ||
| protected OrderedSequenceNumber<String> makeSequenceNumber(String seq, boolean isExclusive) | ||
| { | ||
| return new OrderedSequenceNumber<>(seq, isExclusive) |
Check warning
Code scanning / CodeQL
Inconsistent compareTo Warning test
compareTo
| EasyMock.expect(spec.isSuspended()).andReturn(true).anyTimes(); | ||
| EasyMock.replay(spec); | ||
|
|
||
| EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation Note test
SeekableStreamSupervisorSpec.getDataSchema
| EasyMock.replay(spec); | ||
|
|
||
| EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); | ||
| EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation Note test
SeekableStreamSupervisorSpec.getDataSchema
| { | ||
| EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes(); | ||
| EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); | ||
| EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation Note test
SeekableStreamSupervisorSpec.getDataSchema
| } | ||
|
|
||
| @Test | ||
| public void test_maybeScaleDuringTaskRollover_noAutoScaler_doesNotScale() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep these test methods in the SupervisorSpecTest itself? Seems like we have copied over a lot of the boiler plate code that could have been avoided by keeping these methods there.
Alternatively, have these two tests extend some common base test class or use common test fixtures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really, I don't want to break incapsulation of maybeScaleDuringTaskRollover method for that purpose.
That was a point to introduce such test class - to be present under same package name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, that should be easy to fix. The old SeekableStreamSupervisorSpecTest is itself in the wrong pacakge. 😂
Please move the existing test class to org.apache.druid.indexing.seekablestream.supervisor where both SeekableStreamSupervisor and SeekableStreamSupervisorSpec already live. Would this address your issue?
As a follow-up PR for #18819, the patch fixes temporal behaviour when scale down happens in the same manner, as scaleup, by injecting the new task count calculation logic during the task rollover time,
This PR has: