Skip to content

Commit 439bc30

Browse files
committed
Add Compressor interface
1 parent 9e2b4d3 commit 439bc30

File tree

88 files changed

+3281
-1952
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

88 files changed

+3281
-1952
lines changed

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -921,6 +921,7 @@ set(SOURCES
921921
util/comparator.cc
922922
util/compression.cc
923923
util/compression_context_cache.cc
924+
util/compressor.cc
924925
util/concurrent_task_limiter_impl.cc
925926
util/crc32c.cc
926927
util/data_structure.cc
@@ -1528,6 +1529,7 @@ if(WITH_TESTS)
15281529
util/autovector_test.cc
15291530
util/bloom_test.cc
15301531
util/coding_test.cc
1532+
util/compression_test.cc
15311533
util/crc32c_test.cc
15321534
util/defer_test.cc
15331535
util/dynamic_bloom_test.cc

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1283,6 +1283,9 @@ cache_test: $(OBJ_DIR)/cache/cache_test.o $(TEST_LIBRARY) $(LIBRARY)
12831283
coding_test: $(OBJ_DIR)/util/coding_test.o $(TEST_LIBRARY) $(LIBRARY)
12841284
$(AM_LINK)
12851285

1286+
compression_test: $(OBJ_DIR)/util/compression_test.o $(TEST_LIBRARY) $(LIBRARY)
1287+
$(AM_LINK)
1288+
12861289
hash_test: $(OBJ_DIR)/util/hash_test.o $(TEST_LIBRARY) $(LIBRARY)
12871290
$(AM_LINK)
12881291

TARGETS

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
251251
"util/comparator.cc",
252252
"util/compression.cc",
253253
"util/compression_context_cache.cc",
254+
"util/compressor.cc",
254255
"util/concurrent_task_limiter_impl.cc",
255256
"util/crc32c.cc",
256257
"util/crc32c_arm64.cc",
@@ -603,6 +604,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
603604
"util/comparator.cc",
604605
"util/compression.cc",
605606
"util/compression_context_cache.cc",
607+
"util/compressor.cc",
606608
"util/concurrent_task_limiter_impl.cc",
607609
"util/crc32c.cc",
608610
"util/crc32c_arm64.cc",
@@ -5015,6 +5017,12 @@ cpp_unittest_wrapper(name="compressed_secondary_cache_test",
50155017
extra_compiler_flags=[])
50165018

50175019

5020+
cpp_unittest_wrapper(name="compression_test",
5021+
srcs=["util/compression_test.cc"],
5022+
deps=[":rocksdb_test_lib"],
5023+
extra_compiler_flags=[])
5024+
5025+
50185026
cpp_unittest_wrapper(name="configurable_test",
50195027
srcs=["options/configurable_test.cc"],
50205028
deps=[":rocksdb_test_lib"],

cache/compressed_secondary_cache.cc

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -77,15 +77,16 @@ std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
7777
s = helper->create_cb(Slice(ptr->get(), handle_value_charge),
7878
create_context, allocator, &value, &charge);
7979
} else {
80-
UncompressionContext uncompression_context(cache_options_.compression_type);
81-
UncompressionInfo uncompression_info(uncompression_context,
82-
UncompressionDict::GetEmptyDict(),
83-
cache_options_.compression_type);
80+
auto compressor =
81+
BuiltinCompressor::GetCompressor(cache_options_.compression_type);
82+
UncompressionInfo uncompression_info(UncompressionDict::GetEmptyDict(),
83+
cache_options_.compress_format_version,
84+
allocator);
8485

8586
size_t uncompressed_size{0};
86-
CacheAllocationPtr uncompressed = UncompressData(
87-
uncompression_info, (char*)ptr->get(), handle_value_charge,
88-
&uncompressed_size, cache_options_.compress_format_version, allocator);
87+
CacheAllocationPtr uncompressed = uncompression_info.UncompressData(
88+
compressor.get(), (char*)ptr->get(), handle_value_charge,
89+
&uncompressed_size);
8990

9091
if (!uncompressed) {
9192
cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
@@ -148,16 +149,15 @@ Status CompressedSecondaryCache::Insert(const Slice& key,
148149
if (cache_options_.compression_type != kNoCompression &&
149150
!cache_options_.do_not_compress_roles.Contains(helper->role)) {
150151
PERF_COUNTER_ADD(compressed_sec_cache_uncompressed_bytes, size);
151-
CompressionOptions compression_opts;
152-
CompressionContext compression_context(cache_options_.compression_type);
152+
auto compressor =
153+
BuiltinCompressor::GetCompressor(cache_options_.compression_type);
153154
uint64_t sample_for_compression{0};
154-
CompressionInfo compression_info(
155-
compression_opts, compression_context, CompressionDict::GetEmptyDict(),
156-
cache_options_.compression_type, sample_for_compression);
155+
CompressionInfo compression_info(CompressionDict::GetEmptyDict(),
156+
cache_options_.compress_format_version,
157+
sample_for_compression);
157158

158159
bool success =
159-
CompressData(val, compression_info,
160-
cache_options_.compress_format_version, &compressed_val);
160+
compression_info.CompressData(compressor.get(), val, &compressed_val);
161161

162162
if (!success) {
163163
return Status::Corruption("Error compressing value.");

db/arena_wrapped_db_iter.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "rocksdb/options.h"
1616
#include "table/internal_iterator.h"
1717
#include "table/iterator_wrapper.h"
18+
#include "util/string_util.h"
1819
#include "util/user_comparator_wrapper.h"
1920

2021
namespace ROCKSDB_NAMESPACE {

db/blob/blob_file_builder.cc

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ BlobFileBuilder::BlobFileBuilder(
6666
immutable_options_(immutable_options),
6767
min_blob_size_(mutable_cf_options->min_blob_size),
6868
blob_file_size_(mutable_cf_options->blob_file_size),
69-
blob_compression_type_(mutable_cf_options->blob_compression_type),
69+
blob_compressor_(mutable_cf_options->blob_compressor),
7070
prepopulate_blob_cache_(mutable_cf_options->prepopulate_blob_cache),
7171
file_options_(file_options),
7272
db_id_(std::move(db_id)),
@@ -91,6 +91,10 @@ BlobFileBuilder::BlobFileBuilder(
9191
assert(blob_file_paths_->empty());
9292
assert(blob_file_additions_);
9393
assert(blob_file_additions_->empty());
94+
95+
if (blob_compressor_ == nullptr) {
96+
blob_compressor_ = BuiltinCompressor::GetCompressor(kNoCompression);
97+
}
9498
}
9599

96100
BlobFileBuilder::~BlobFileBuilder() = default;
@@ -150,7 +154,7 @@ Status BlobFileBuilder::Add(const Slice& key, const Slice& value,
150154
}
151155

152156
BlobIndex::EncodeBlob(blob_index, blob_file_number, blob_offset, blob.size(),
153-
blob_compression_type_);
157+
blob_compressor_->GetCompressionType());
154158

155159
return Status::OK();
156160
}
@@ -227,7 +231,8 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
227231
constexpr bool has_ttl = false;
228232
constexpr ExpirationRange expiration_range;
229233

230-
BlobLogHeader header(column_family_id_, blob_compression_type_, has_ttl,
234+
BlobLogHeader header(column_family_id_,
235+
blob_compressor_->GetCompressionType(), has_ttl,
231236
expiration_range);
232237

233238
{
@@ -255,26 +260,18 @@ Status BlobFileBuilder::CompressBlobIfNeeded(
255260
assert(compressed_blob->empty());
256261
assert(immutable_options_);
257262

258-
if (blob_compression_type_ == kNoCompression) {
263+
if (blob_compressor_->GetCompressionType() == kNoCompression) {
259264
return Status::OK();
260265
}
261266

262-
CompressionOptions opts;
263-
CompressionContext context(blob_compression_type_);
264-
constexpr uint64_t sample_for_compression = 0;
265-
266-
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
267-
blob_compression_type_, sample_for_compression);
268-
269-
constexpr uint32_t compression_format_version = 2;
267+
CompressionInfo info;
270268

271269
bool success = false;
272270

273271
{
274272
StopWatch stop_watch(immutable_options_->clock, immutable_options_->stats,
275273
BLOB_DB_COMPRESSION_MICROS);
276-
success =
277-
CompressData(*blob, info, compression_format_version, compressed_blob);
274+
success = info.CompressData(blob_compressor_.get(), *blob, compressed_blob);
278275
}
279276

280277
if (!success) {

db/blob/blob_file_builder.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "rocksdb/env.h"
1616
#include "rocksdb/rocksdb_namespace.h"
1717
#include "rocksdb/types.h"
18+
#include "util/compressor.h"
1819

1920
namespace ROCKSDB_NAMESPACE {
2021

@@ -89,7 +90,7 @@ class BlobFileBuilder {
8990
const ImmutableOptions* immutable_options_;
9091
uint64_t min_blob_size_;
9192
uint64_t blob_file_size_;
92-
CompressionType blob_compression_type_;
93+
std::shared_ptr<Compressor> blob_compressor_;
9394
PrepopulateBlobCache prepopulate_blob_cache_;
9495
const FileOptions* file_options_;
9596
const std::string db_id_;

db/blob/blob_file_builder_test.cc

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -406,15 +406,12 @@ TEST_F(BlobFileBuilderTest, Compression) {
406406
ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), 1);
407407

408408
CompressionOptions opts;
409-
CompressionContext context(kSnappyCompression);
410-
constexpr uint64_t sample_for_compression = 0;
411-
412-
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
413-
kSnappyCompression, sample_for_compression);
409+
auto compressor = BuiltinCompressor::GetCompressor(kSnappyCompression);
410+
ASSERT_NE(compressor, nullptr);
414411

415412
std::string compressed_value;
416-
ASSERT_TRUE(Snappy_Compress(info, uncompressed_value.data(),
417-
uncompressed_value.size(), &compressed_value));
413+
ASSERT_OK(compressor->Compress(CompressionInfo(), uncompressed_value,
414+
&compressed_value));
418415

419416
ASSERT_EQ(blob_file_addition.GetTotalBlobBytes(),
420417
BlobLogRecord::kHeaderSize + key_size + compressed_value.size());

db/blob/blob_file_reader.cc

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@ Status BlobFileReader::Create(
4949

5050
Statistics* const statistics = immutable_options.stats;
5151

52-
CompressionType compression_type = kNoCompression;
52+
std::shared_ptr<Compressor> compressor;
5353

5454
{
5555
const Status s = ReadHeader(file_reader.get(), column_family_id, statistics,
56-
&compression_type);
56+
&compressor);
5757
if (!s.ok()) {
5858
return s;
5959
}
@@ -67,7 +67,7 @@ Status BlobFileReader::Create(
6767
}
6868

6969
blob_file_reader->reset(
70-
new BlobFileReader(std::move(file_reader), file_size, compression_type,
70+
new BlobFileReader(std::move(file_reader), file_size, compressor,
7171
immutable_options.clock, statistics));
7272

7373
return Status::OK();
@@ -136,9 +136,9 @@ Status BlobFileReader::OpenFile(
136136
Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
137137
uint32_t column_family_id,
138138
Statistics* statistics,
139-
CompressionType* compression_type) {
139+
std::shared_ptr<Compressor>* compressor) {
140140
assert(file_reader);
141-
assert(compression_type);
141+
assert(compressor);
142142

143143
Slice header_slice;
144144
Buffer buf;
@@ -181,7 +181,7 @@ Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
181181
return Status::Corruption("Column family ID mismatch");
182182
}
183183

184-
*compression_type = header.compression;
184+
*compressor = BuiltinCompressor::GetCompressor(header.compression);
185185

186186
return Status::OK();
187187
}
@@ -272,11 +272,11 @@ Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,
272272

273273
BlobFileReader::BlobFileReader(
274274
std::unique_ptr<RandomAccessFileReader>&& file_reader, uint64_t file_size,
275-
CompressionType compression_type, SystemClock* clock,
275+
const std::shared_ptr<Compressor>& compressor, SystemClock* clock,
276276
Statistics* statistics)
277277
: file_reader_(std::move(file_reader)),
278278
file_size_(file_size),
279-
compression_type_(compression_type),
279+
compressor_(compressor),
280280
clock_(clock),
281281
statistics_(statistics) {
282282
assert(file_reader_);
@@ -286,7 +286,7 @@ BlobFileReader::~BlobFileReader() = default;
286286

287287
Status BlobFileReader::GetBlob(
288288
const ReadOptions& read_options, const Slice& user_key, uint64_t offset,
289-
uint64_t value_size, CompressionType compression_type,
289+
uint64_t value_size, const std::shared_ptr<Compressor>& compressor,
290290
FilePrefetchBuffer* prefetch_buffer, MemoryAllocator* allocator,
291291
std::unique_ptr<BlobContents>* result, uint64_t* bytes_read) const {
292292
assert(result);
@@ -297,7 +297,7 @@ Status BlobFileReader::GetBlob(
297297
return Status::Corruption("Invalid blob offset");
298298
}
299299

300-
if (compression_type != compression_type_) {
300+
if (compressor->GetCompressionType() != compressor_->GetCompressionType()) {
301301
return Status::Corruption("Compression type mismatch when reading blob");
302302
}
303303

@@ -361,7 +361,7 @@ Status BlobFileReader::GetBlob(
361361

362362
{
363363
const Status s = UncompressBlobIfNeeded(
364-
value_slice, compression_type, allocator, clock_, statistics_, result);
364+
value_slice, compressor.get(), allocator, clock_, statistics_, result);
365365
if (!s.ok()) {
366366
return s;
367367
}
@@ -407,7 +407,8 @@ void BlobFileReader::MultiGetBlob(
407407
*req->status = Status::Corruption("Invalid blob offset");
408408
continue;
409409
}
410-
if (req->compression != compression_type_) {
410+
if (req->compressor->GetCompressionType() !=
411+
compressor_->GetCompressionType()) {
411412
*req->status =
412413
Status::Corruption("Compression type mismatch when reading a blob");
413414
continue;
@@ -506,7 +507,7 @@ void BlobFileReader::MultiGetBlob(
506507
// Uncompress blob if needed
507508
Slice value_slice(record_slice.data() + adjustments[i], req->len);
508509
*req->status =
509-
UncompressBlobIfNeeded(value_slice, compression_type_, allocator,
510+
UncompressBlobIfNeeded(value_slice, compressor_.get(), allocator,
510511
clock_, statistics_, &blob_reqs[i].second);
511512
if (req->status->ok()) {
512513
total_bytes += record_slice.size();
@@ -563,31 +564,28 @@ Status BlobFileReader::VerifyBlob(const Slice& record_slice,
563564
}
564565

565566
Status BlobFileReader::UncompressBlobIfNeeded(
566-
const Slice& value_slice, CompressionType compression_type,
567+
const Slice& value_slice, Compressor* compressor,
567568
MemoryAllocator* allocator, SystemClock* clock, Statistics* statistics,
568569
std::unique_ptr<BlobContents>* result) {
570+
assert(compressor);
569571
assert(result);
570572

571-
if (compression_type == kNoCompression) {
573+
if (compressor->GetCompressionType() == kNoCompression) {
572574
BlobContentsCreator::Create(result, nullptr, value_slice, allocator);
573575
return Status::OK();
574576
}
575577

576-
UncompressionContext context(compression_type);
577-
UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
578-
compression_type);
578+
UncompressionInfo info;
579579

580580
size_t uncompressed_size = 0;
581-
constexpr uint32_t compression_format_version = 2;
582581

583582
CacheAllocationPtr output;
584583

585584
{
586585
PERF_TIMER_GUARD(blob_decompress_time);
587586
StopWatch stop_watch(clock, statistics, BLOB_DB_DECOMPRESSION_MICROS);
588-
output = UncompressData(info, value_slice.data(), value_slice.size(),
589-
&uncompressed_size, compression_format_version,
590-
allocator);
587+
output = info.UncompressData(compressor, value_slice.data(),
588+
value_slice.size(), &uncompressed_size);
591589
}
592590

593591
TEST_SYNC_POINT_CALLBACK(

0 commit comments

Comments
 (0)