Skip to content

Commit 02d8131

Browse files
committed
HBASE-29742 Compaction scan returns single cells instead of rows after 10MB
1 parent 6d342cc commit 02d8131

File tree

3 files changed

+30
-6
lines changed

3 files changed

+30
-6
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
348348
ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
349349
.setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
350350
compactScannerSizeLimit)
351-
.build();
351+
.setKeepBlockProgress(false).build();
352352
throughputController.start(compactionName);
353353
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
354354
long shippedCallSizeLimit =

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,14 @@ public class ScannerContext {
103103
boolean keepProgress;
104104
private static boolean DEFAULT_KEEP_PROGRESS = false;
105105

106+
/**
107+
* For some operations (specifically Compactions) we use the block size limit to avoid next()
108+
* calls taking too much time. In those cases the entire region is handled with a single
109+
* ScannerContext, and we do want to reset the block progress after the next() call returns.
110+
*/
111+
boolean keepBlockProgress;
112+
private static boolean DEFAULT_KEEP_BLOCK_PROGRESS = true;
113+
106114
/**
107115
* Allows temporarily ignoring limits and skipping tracking of batch and size progress. Used when
108116
* skipping to the next row, in which case all processed cells are thrown away so should not count
@@ -129,6 +137,11 @@ public class ScannerContext {
129137

130138
ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean trackMetrics,
131139
ServerSideScanMetrics scanMetrics) {
140+
this(keepProgress, DEFAULT_KEEP_BLOCK_PROGRESS, limitsToCopy, trackMetrics, scanMetrics);
141+
}
142+
143+
ScannerContext(boolean keepProgress, boolean keepBlockProgress, LimitFields limitsToCopy,
144+
boolean trackMetrics, ServerSideScanMetrics scanMetrics) {
132145
this.limits = new LimitFields();
133146
if (limitsToCopy != null) {
134147
this.limits.copy(limitsToCopy);
@@ -138,6 +151,7 @@ public class ScannerContext {
138151
progress = new ProgressFields(0, 0, 0, 0);
139152

140153
this.keepProgress = keepProgress;
154+
this.keepBlockProgress = keepBlockProgress;
141155
this.scannerState = DEFAULT_STATE;
142156
this.metrics =
143157
trackMetrics ? (scanMetrics != null ? scanMetrics : new ServerSideScanMetrics()) : null;
@@ -259,11 +273,15 @@ void setBatchProgress(int batchProgress) {
259273
/**
260274
* Clear away any progress that has been made so far. All progress fields are reset to initial
261275
* values. Only clears progress that should reset between rows. {@link #getBlockSizeProgress()} is
262-
* not reset because it increments for all blocks scanned whether the result is included or
263-
* filtered.
276+
* not reset by default because it increments for all blocks scanned whether the result is
277+
* included or filtered.
264278
*/
265279
void clearProgress() {
266-
progress.setFields(0, 0, 0, getBlockSizeProgress());
280+
if (keepBlockProgress) {
281+
progress.setFields(0, 0, 0, getBlockSizeProgress());
282+
} else {
283+
progress.setFields(0, 0, 0, 0);
284+
}
267285
}
268286

269287
/**
@@ -421,6 +439,7 @@ public static Builder newBuilder(boolean keepProgress) {
421439

422440
public static final class Builder {
423441
boolean keepProgress = DEFAULT_KEEP_PROGRESS;
442+
boolean keepBlockProgress = DEFAULT_KEEP_BLOCK_PROGRESS;
424443
boolean trackMetrics = false;
425444
LimitFields limits = new LimitFields();
426445
ServerSideScanMetrics scanMetrics = null;
@@ -437,6 +456,11 @@ public Builder setKeepProgress(boolean keepProgress) {
437456
return this;
438457
}
439458

459+
public Builder setKeepBlockProgress(boolean keepBlockProgress) {
460+
this.keepBlockProgress = keepBlockProgress;
461+
return this;
462+
}
463+
440464
public Builder setTrackMetrics(boolean trackMetrics) {
441465
this.trackMetrics = trackMetrics;
442466
return this;
@@ -468,7 +492,7 @@ public Builder setScanMetrics(ServerSideScanMetrics scanMetrics) {
468492
}
469493

470494
public ScannerContext build() {
471-
return new ScannerContext(keepProgress, limits, trackMetrics, scanMetrics);
495+
return new ScannerContext(keepProgress, keepBlockProgress, limits, trackMetrics, scanMetrics);
472496
}
473497
}
474498

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
441441
ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
442442
.setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
443443
compactScannerSizeLimit)
444-
.build();
444+
.setKeepBlockProgress(false).build();
445445

446446
throughputController.start(compactionName);
447447
Shipper shipper = (scanner instanceof Shipper) ? (Shipper) scanner : null;

0 commit comments

Comments
 (0)