Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hadoop-hdds/container-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>${rocksdb.version}</version>
<classifier>${rocksdbjni.classifier}</classifier>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
2 changes: 2 additions & 0 deletions hadoop-hdds/framework/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>${rocksdb.version}</version>
<classifier>${rocksdbjni.classifier}</classifier>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public ManagedBlockBasedTableConfig getBlockBasedTableConfig() {
ManagedBlockBasedTableConfig config = new ManagedBlockBasedTableConfig();
config.setBlockCache(new ManagedLRUCache(blockCacheSize))
.setBlockSize(blockSize)
.setFormatVersion(BLOCK_BASED_TABLE_FORMAT_VERSION)
.setPinL0FilterAndIndexBlocksInCache(true)
.setFilterPolicy(new ManagedBloomFilter());
return config;
Expand Down Expand Up @@ -145,6 +146,9 @@ public ManagedBlockBasedTableConfig getBlockBasedTableConfig() {
}
};

// Keep SST/block-based table format stable across RocksDB upgrades.
private static final int BLOCK_BASED_TABLE_FORMAT_VERSION = 5;

public static long toLong(double value) {
BigDecimal temp = BigDecimal.valueOf(value);
return temp.longValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,12 @@ public void deleteFilesNotMatchingPrefix(TablePrefixInfo prefixInfo) throws Rock
boolean isKeyWithPrefixPresent = RocksDiffUtils.isKeyWithPrefixPresent(
prefixForColumnFamily, firstDbKey, lastDbKey);
if (!isKeyWithPrefixPresent) {
ColumnFamilyHandle handle = getColumnFamilyHandle(sstFileColumnFamily);
if (handle == null) {
LOG.warn("Skipping sst file deletion for {}: no handle found for column family {}",
liveFileMetaData.fileName(), sstFileColumnFamily);
continue;
}
LOG.info("Deleting sst file: {} with start key: {} and end key: {} "
+ "corresponding to column family {} from db: {}. "
+ "Prefix for the column family: {}.",
Expand All @@ -887,7 +893,7 @@ public void deleteFilesNotMatchingPrefix(TablePrefixInfo prefixInfo) throws Rock
StringUtils.bytes2String(liveFileMetaData.columnFamilyName()),
db.get().getName(),
prefixForColumnFamily);
db.deleteFile(liveFileMetaData);
db.deleteFile(handle, liveFileMetaData);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ Answer<Integer> newAnswer(String name, byte... b) {
public void testForEachRemaining() throws Exception {
when(rocksIteratorMock.isValid())
.thenReturn(true, true, true, true, true, true, true, false);
when(rocksIteratorMock.key(any()))
when(rocksIteratorMock.key(any(ByteBuffer.class)))
.then(newAnswerInt("key1", 0x00))
.then(newAnswerInt("key2", 0x00))
.then(newAnswerInt("key3", 0x01))
.then(newAnswerInt("key4", 0x02))
.thenThrow(new NoSuchElementException());
when(rocksIteratorMock.value(any()))
when(rocksIteratorMock.value(any(ByteBuffer.class)))
.then(newAnswerInt("val1", 0x7f))
.then(newAnswerInt("val2", 0x7f))
.then(newAnswerInt("val3", 0x7e))
Expand Down Expand Up @@ -152,8 +152,8 @@ public void testNextCallsIsValidThenGetsTheValueAndStepsToNext()
}

verifier.verify(rocksIteratorMock).isValid();
verifier.verify(rocksIteratorMock).key(any());
verifier.verify(rocksIteratorMock).value(any());
verifier.verify(rocksIteratorMock).key(any(ByteBuffer.class));
verifier.verify(rocksIteratorMock).value(any(ByteBuffer.class));
verifier.verify(rocksIteratorMock).next();

CodecTestUtil.gc();
Expand Down Expand Up @@ -192,9 +192,9 @@ public void testSeekToLastSeeks() throws Exception {
@Test
public void testSeekReturnsTheActualKey() throws Exception {
when(rocksIteratorMock.isValid()).thenReturn(true);
when(rocksIteratorMock.key(any()))
when(rocksIteratorMock.key(any(ByteBuffer.class)))
.then(newAnswerInt("key1", 0x00));
when(rocksIteratorMock.value(any()))
when(rocksIteratorMock.value(any(ByteBuffer.class)))
.then(newAnswerInt("val1", 0x7f));

try (RDBStoreCodecBufferIterator i = newIterator();
Expand All @@ -208,8 +208,8 @@ public void testSeekReturnsTheActualKey() throws Exception {
verifier.verify(rocksIteratorMock, times(1))
.seek(any(ByteBuffer.class));
verifier.verify(rocksIteratorMock, times(1)).isValid();
verifier.verify(rocksIteratorMock, times(1)).key(any());
verifier.verify(rocksIteratorMock, times(1)).value(any());
verifier.verify(rocksIteratorMock, times(1)).key(any(ByteBuffer.class));
verifier.verify(rocksIteratorMock, times(1)).value(any(ByteBuffer.class));
assertArrayEquals(new byte[]{0x00}, val.getKey().getArray());
assertArrayEquals(new byte[]{0x7f}, val.getValue().getArray());
}
Expand All @@ -220,7 +220,7 @@ public void testSeekReturnsTheActualKey() throws Exception {
@Test
public void testGettingTheKeyIfIteratorIsValid() throws Exception {
when(rocksIteratorMock.isValid()).thenReturn(true);
when(rocksIteratorMock.key(any()))
when(rocksIteratorMock.key(any(ByteBuffer.class)))
.then(newAnswerInt("key1", 0x00));

byte[] key = null;
Expand All @@ -233,7 +233,7 @@ public void testGettingTheKeyIfIteratorIsValid() throws Exception {
InOrder verifier = inOrder(rocksIteratorMock);

verifier.verify(rocksIteratorMock, times(1)).isValid();
verifier.verify(rocksIteratorMock, times(1)).key(any());
verifier.verify(rocksIteratorMock, times(1)).key(any(ByteBuffer.class));
assertArrayEquals(new byte[]{0x00}, key);

CodecTestUtil.gc();
Expand All @@ -242,9 +242,9 @@ public void testGettingTheKeyIfIteratorIsValid() throws Exception {
@Test
public void testGettingTheValueIfIteratorIsValid() throws Exception {
when(rocksIteratorMock.isValid()).thenReturn(true);
when(rocksIteratorMock.key(any()))
when(rocksIteratorMock.key(any(ByteBuffer.class)))
.then(newAnswerInt("key1", 0x00));
when(rocksIteratorMock.value(any()))
when(rocksIteratorMock.value(any(ByteBuffer.class)))
.then(newAnswerInt("val1", 0x7f));

byte[] key = null;
Expand All @@ -260,7 +260,7 @@ public void testGettingTheValueIfIteratorIsValid() throws Exception {
InOrder verifier = inOrder(rocksIteratorMock);

verifier.verify(rocksIteratorMock, times(1)).isValid();
verifier.verify(rocksIteratorMock, times(1)).key(any());
verifier.verify(rocksIteratorMock, times(1)).key(any(ByteBuffer.class));
assertArrayEquals(new byte[]{0x00}, key);
assertArrayEquals(new byte[]{0x7f}, value);

Expand All @@ -272,7 +272,7 @@ public void testRemovingFromDBActuallyDeletesFromTable() throws Exception {
final byte[] testKey = new byte[10];
ThreadLocalRandom.current().nextBytes(testKey);
when(rocksIteratorMock.isValid()).thenReturn(true);
when(rocksIteratorMock.key(any()))
when(rocksIteratorMock.key(any(ByteBuffer.class)))
.then(newAnswer("key1", testKey));

try (RDBStoreCodecBufferIterator i = newIterator(null)) {
Expand Down Expand Up @@ -320,7 +320,7 @@ public void testNullPrefixedIterator() throws Exception {
when(rocksIteratorMock.isValid()).thenReturn(true);
assertTrue(i.hasNext());
verify(rocksIteratorMock, times(1)).isValid();
verify(rocksIteratorMock, times(0)).key(any());
verify(rocksIteratorMock, times(0)).key(any(ByteBuffer.class));

i.seekToLast();
verify(rocksIteratorMock, times(1)).seekToLast();
Expand All @@ -343,11 +343,11 @@ public void testNormalPrefixedIterator() throws Exception {
clearInvocations(rocksIteratorMock);

when(rocksIteratorMock.isValid()).thenReturn(true);
when(rocksIteratorMock.key(any()))
when(rocksIteratorMock.key(any(ByteBuffer.class)))
.then(newAnswer("key1", prefixBytes));
assertTrue(i.hasNext());
verify(rocksIteratorMock, times(1)).isValid();
verify(rocksIteratorMock, times(1)).key(any());
verify(rocksIteratorMock, times(1)).key(any(ByteBuffer.class));

Exception e =
assertThrows(Exception.class, () -> i.seekToLast(), "Prefixed iterator does not support seekToLast");
Expand Down
2 changes: 2 additions & 0 deletions hadoop-hdds/managed-rocksdb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>${rocksdb.version}</version>
<classifier>${rocksdbjni.classifier}</classifier>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@
public class ManagedBloomFilter extends BloomFilter {
private final UncheckedAutoCloseable leakTracker = track(this);

@Override
public boolean equals(Object obj) {
return super.equals(obj);
}

@Override
public int hashCode() {
return super.hashCode();
}

@Override
public void close() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public class ManagedDBOptions extends DBOptions {
private final UncheckedAutoCloseable leakTracker = track(this);
private final AtomicReference<Logger> loggerRef = new AtomicReference<>();

@Override
public DBOptions setLogger(Logger logger) {
IOUtils.close(LOG, loggerRef.getAndSet(logger));
return super.setLogger(logger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -112,18 +113,23 @@ public static ManagedRocksDB openWithLatestOptions(
}

/**
* Delete liveMetaDataFile from rocks db using RocksDB#deleteFile Api.
* This function makes the RocksDB#deleteFile Api synchronized by waiting
* for the deletes to happen.
* @param fileToBeDeleted File to be deleted.
* Delete the SST file range from rocks db and wait for file deletion.
* @param columnFamilyHandle column family of the target sst file.
* @param fileToBeDeleted file metadata to be deleted.
* @throws RocksDatabaseException if the underlying db throws an exception
* or the file is not deleted within a time limit.
*/
public void deleteFile(LiveFileMetaData fileToBeDeleted) throws RocksDatabaseException {
String sstFileName = fileToBeDeleted.fileName();
public void deleteFile(
ColumnFamilyHandle columnFamilyHandle,
LiveFileMetaData fileToBeDeleted) throws RocksDatabaseException {
File file = new File(fileToBeDeleted.path(), fileToBeDeleted.fileName());
final byte[] smallestKey = fileToBeDeleted.smallestKey();
final byte[] largestKey = fileToBeDeleted.largestKey();
try {
get().deleteFile(sstFileName);
get().deleteFilesInRanges(
columnFamilyHandle,
Arrays.asList(smallestKey, largestKey),
true);
} catch (RocksDBException e) {
throw new RocksDatabaseException("Failed to delete " + file, e);
}
Expand Down
4 changes: 4 additions & 0 deletions hadoop-hdds/rocks-native/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>${rocksdb.version}</version>
<classifier>${rocksdbjni.classifier}</classifier>
<!-- <scope>test</scope> but transitive via hdds-managed-rocksdb -->
</dependency>
<dependency>
Expand Down Expand Up @@ -173,6 +175,8 @@
<artifactItem>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>${rocksdb.version}</version>
<classifier>${rocksdbjni.classifier}</classifier>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/rocksdbjni</outputDirectory>
Expand Down
29 changes: 14 additions & 15 deletions hadoop-hdds/rocks-native/src/main/patches/rocks-native.patch
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ new file mode 100644
index 000000000..5ba8a82ee
--- /dev/null
+++ b/tools/raw_sst_file_reader.cc
@@ -0,0 +1,272 @@
@@ -0,0 +1,271 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
Expand Down Expand Up @@ -380,9 +380,9 @@ index 000000000..5ba8a82ee
+
+ rep_->file_.reset(new RandomAccessFileReader(std::move(file), file_path));
+
+ FilePrefetchBuffer prefetch_buffer(
+ 0 /* readahead_size */, 0 /* max_readahead_size */, true /* enable */,
+ false /* track_min_offset */);
+ FilePrefetchBuffer prefetch_buffer(ReadaheadParams(),
+ !fopts.use_mmap_reads /* enable */,
+ false /* track_min_offset */);
+ if (s.ok()) {
+ const uint64_t kSstDumpTailPrefetchSize = 512 * 1024;
+ uint64_t prefetch_size = (file_size > kSstDumpTailPrefetchSize)
Expand All @@ -391,11 +391,10 @@ index 000000000..5ba8a82ee
+ uint64_t prefetch_off = file_size - prefetch_size;
+ IOOptions opts;
+ s = prefetch_buffer.Prefetch(opts, rep_->file_.get(), prefetch_off,
+ static_cast<size_t>(prefetch_size),
+ Env::IO_TOTAL /* rate_limiter_priority */);
+ static_cast<size_t>(prefetch_size));
+
+ s = ReadFooterFromFile(opts, rep_->file_.get(), &prefetch_buffer, file_size,
+ &footer);
+ s = ReadFooterFromFile(opts, rep_->file_.get(), *fs, &prefetch_buffer,
+ file_size, &footer);
+ }
+ if (s.ok()) {
+ magic_number = footer.table_magic_number();
Expand All @@ -411,10 +410,9 @@ index 000000000..5ba8a82ee
+ }
+
+ s = ROCKSDB_NAMESPACE::ReadTableProperties(
+ rep_->file_.get(), file_size, magic_number, rep_->ioptions_, &(rep_->table_properties_),
+ /* memory_allocator= */ nullptr, (magic_number == kBlockBasedTableMagicNumber)
+ ? &prefetch_buffer
+ : nullptr);
+ rep_->file_.get(), file_size, magic_number, rep_->ioptions_, rep_->read_options_,
+ &(rep_->table_properties_), /* memory_allocator= */ nullptr,
+ (magic_number == kBlockBasedTableMagicNumber) ? &prefetch_buffer : nullptr);
+ // For old sst format, ReadTableProperties might fail but file can be read
+ if (s.ok()) {
+ s = SetTableOptionsByMagicNumber(magic_number);
Expand Down Expand Up @@ -448,9 +446,10 @@ index 000000000..5ba8a82ee
+
+Status RawSstFileReader::NewTableReader(uint64_t file_size) {
+ auto t_opt =
+ TableReaderOptions(rep_->ioptions_, rep_->moptions_.prefix_extractor, rep_->soptions_,
+ rep_->internal_comparator_, false /* skip_filters */,
+ false /* imortal */, true /* force_direct_prefetch */);
+ TableReaderOptions(rep_->ioptions_, rep_->moptions_.prefix_extractor,
+ rep_->moptions_.compression_manager.get(), rep_->soptions_,
+ rep_->internal_comparator_, 0 /* block_protection_bytes_per_key */,
+ false /* skip_filters */, false /* immortal */, true /* force_direct_prefetch */);
+ // Allow open file with global sequence number for backward compatibility.
+ t_opt.largest_seqno = kMaxSequenceNumber;
+
Expand Down
2 changes: 2 additions & 0 deletions hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>${rocksdb.version}</version>
<classifier>${rocksdbjni.classifier}</classifier>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Loading