Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
.setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
compactScannerSizeLimit)
.build();
.setKeepBlockProgress(false).build();
throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
long shippedCallSizeLimit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ public class ScannerContext {
boolean keepProgress;
private static boolean DEFAULT_KEEP_PROGRESS = false;

/**
* For some operations (specifically Compactions) we use the block size limit to avoid next()
* calls taking too much time. In those cases the entire region is handled with a single
* ScannerContext, and we do want to reset the block progress after the next() call returns.
*/
boolean keepBlockProgress;
private static boolean DEFAULT_KEEP_BLOCK_PROGRESS = true;

/**
* Allows temporarily ignoring limits and skipping tracking of batch and size progress. Used when
* skipping to the next row, in which case all processed cells are thrown away so should not count
Expand All @@ -129,6 +137,11 @@ public class ScannerContext {

ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean trackMetrics,
ServerSideScanMetrics scanMetrics) {
this(keepProgress, DEFAULT_KEEP_BLOCK_PROGRESS, limitsToCopy, trackMetrics, scanMetrics);
}

ScannerContext(boolean keepProgress, boolean keepBlockProgress, LimitFields limitsToCopy,
boolean trackMetrics, ServerSideScanMetrics scanMetrics) {
this.limits = new LimitFields();
if (limitsToCopy != null) {
this.limits.copy(limitsToCopy);
Expand All @@ -138,6 +151,7 @@ public class ScannerContext {
progress = new ProgressFields(0, 0, 0, 0);

this.keepProgress = keepProgress;
this.keepBlockProgress = keepBlockProgress;
this.scannerState = DEFAULT_STATE;
this.metrics =
trackMetrics ? (scanMetrics != null ? scanMetrics : new ServerSideScanMetrics()) : null;
Expand Down Expand Up @@ -291,11 +305,15 @@ void setTimeProgress(long timeProgress) {
/**
* Clear away any progress that has been made so far. All progress fields are reset to initial
* values. Only clears progress that should reset between rows. {@link #getBlockSizeProgress()} is
* not reset because it increments for all blocks scanned whether the result is included or
* filtered.
* not reset by default because it increments for all blocks scanned whether the result is
* included or filtered.
*/
void clearProgress() {
progress.setFields(0, 0, 0, getBlockSizeProgress());
if (keepBlockProgress) {
progress.setFields(0, 0, 0, getBlockSizeProgress());
} else {
progress.setFields(0, 0, 0, 0);
}
}

/**
Expand Down Expand Up @@ -453,6 +471,7 @@ public static Builder newBuilder(boolean keepProgress) {

public static final class Builder {
boolean keepProgress = DEFAULT_KEEP_PROGRESS;
boolean keepBlockProgress = DEFAULT_KEEP_BLOCK_PROGRESS;
boolean trackMetrics = false;
LimitFields limits = new LimitFields();
ServerSideScanMetrics scanMetrics = null;
Expand All @@ -469,6 +488,11 @@ public Builder setKeepProgress(boolean keepProgress) {
return this;
}

public Builder setKeepBlockProgress(boolean keepBlockProgress) {
this.keepBlockProgress = keepBlockProgress;
return this;
}

public Builder setTrackMetrics(boolean trackMetrics) {
this.trackMetrics = trackMetrics;
return this;
Expand Down Expand Up @@ -500,7 +524,7 @@ public Builder setScanMetrics(ServerSideScanMetrics scanMetrics) {
}

public ScannerContext build() {
return new ScannerContext(keepProgress, limits, trackMetrics, scanMetrics);
return new ScannerContext(keepProgress, keepBlockProgress, limits, trackMetrics, scanMetrics);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
.setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
compactScannerSizeLimit)
.build();
.setKeepBlockProgress(false).build();

throughputController.start(compactionName);
Shipper shipper = (scanner instanceof Shipper) ? (Shipper) scanner : null;
Expand Down