Skip to content

Ingest sst files rather than their keyvalue content. #12079

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
55687a1
Ingest sst files rather than their keyvalue content.
Apr 7, 2025
b64d216
Address kakaiu review
Apr 8, 2025
8c700f5
Try to make fetchKeys smaller
Apr 8, 2025
138d142
Reset these files
Apr 9, 2025
c411dca
Fix ingest issue
Apr 10, 2025
4fb4764
Address review by kakaiu
Apr 15, 2025
549e205
Bigger notice on early loop break
Apr 15, 2025
ac0307f
Address Kakaiu review feedback
Apr 15, 2025
647c689
See if this fixes joshua failing tests
Apr 15, 2025
1756790
Formatting
Apr 15, 2025
7f648d5
Refactor how we read bulkdump sample files
Apr 16, 2025
5e8ca3f
Fix mispush
Apr 16, 2025
4921ce0
try fix byte estimate wrong
kakaiu Apr 17, 2025
7eb4094
Minor fix in processSampleFiles
Apr 18, 2025
ad0ed1a
Remove another KeyValueRef
Apr 18, 2025
51b7d34
* fdbserver/KeyValueStoreRocksDB.actor.cpp
Apr 18, 2025
915bb7d
* fdbclient/include/fdbclient/IKeyValueStore.actor.h
Apr 18, 2025
a782b5b
Fix compile breakage
Apr 18, 2025
a70af03
Try and fix the cicd failure:
Apr 18, 2025
982f6b3
Force c++ linker for perf tool
Apr 18, 2025
825e824
Add a wait on version change after clear before moving to ingest.
Apr 20, 2025
ade5ae5
Add async calling of rocksdb ingestSST and CompactRange.
Apr 22, 2025
1ad37f7
* fdbserver/KeyValueStoreRocksDB.actor.cpp
Apr 22, 2025
596c432
Add sst ingested bytes, files, and duration metrics.
Apr 22, 2025
2e43909
Undo a bunch of the metrics machinery after discussion. Instead we'll
Apr 23, 2025
c285b92
Restore old MockGlobalState
Apr 24, 2025
a117e10
Rebase. Cleanup.
Apr 24, 2025
494c25d
gcc on centos7 crashes complain about replacerange. Add a wait on commit
Apr 24, 2025
46368f1
Fix this showing up in bulkload logs when we look for severity=40
Apr 24, 2025
bb7901f
Unrelated ctest crashing in bugfify setup. Make sure of network befor…
Apr 24, 2025
3770148
Disable ingest to see if tests pass then
Apr 24, 2025
ddd9099
Revert "Disable ingest to see if tests pass then"
Apr 25, 2025
c910bfc
Make it so ENABLE_MUTATION_CHECKSUM ENABLE_ACCUMULATIVE_CHECKSUM enba…
Apr 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions fdbclient/ClientKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,8 @@ void ClientKnobs::initialize(Randomize randomize) {
init( REST_KMS_ALLOW_NOT_SECURE_CONNECTION, false ); if ( randomize && BUGGIFY ) REST_KMS_ALLOW_NOT_SECURE_CONNECTION = !REST_KMS_ALLOW_NOT_SECURE_CONNECTION;
init( SIM_KMS_VAULT_MAX_KEYS, 4096 );

init( ENABLE_MUTATION_CHECKSUM, false ); if ( randomize && BUGGIFY ) ENABLE_MUTATION_CHECKSUM = deterministicRandom()->coinflip();
init( ENABLE_ACCUMULATIVE_CHECKSUM, false ); if ( randomize && BUGGIFY ) ENABLE_ACCUMULATIVE_CHECKSUM = deterministicRandom()->coinflip();
init( ENABLE_MUTATION_CHECKSUM, false ); if ( randomize && BUGGIFY && g_network->isSimulated()) ENABLE_MUTATION_CHECKSUM = deterministicRandom()->coinflip();
init( ENABLE_ACCUMULATIVE_CHECKSUM, false ); if ( randomize && BUGGIFY && g_network->isSimulated()) ENABLE_ACCUMULATIVE_CHECKSUM = deterministicRandom()->coinflip();
init( ENABLE_ACCUMULATIVE_CHECKSUM_LOGGING, false );
// clang-format on
}
Expand Down
11 changes: 8 additions & 3 deletions fdbclient/ServerKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -617,10 +617,10 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
// Writebatch key-value checksum
init( ROCKSDB_WRITEBATCH_PROTECTION_BYTES_PER_KEY, 0 ); if ( randomize && BUGGIFY ) ROCKSDB_WRITEBATCH_PROTECTION_BYTES_PER_KEY = 8; // Default: 0 (disabled). Supported values: 0, 8
// Memtable key-value checksum
init( ROCKSDB_MEMTABLE_PROTECTION_BYTES_PER_KEY, 0 ); if ( randomize && BUGGIFY ) ROCKSDB_MEMTABLE_PROTECTION_BYTES_PER_KEY = 8; // Default: 0 (disabled). Supported values: 0, 1, 2, 4, 8.
init( ROCKSDB_MEMTABLE_PROTECTION_BYTES_PER_KEY, 0 ); if ( randomize && BUGGIFY && g_network->isSimulated()) ROCKSDB_MEMTABLE_PROTECTION_BYTES_PER_KEY = 8; // Default: 0 (disabled). Supported values: 0, 1, 2, 4, 8.
// Block cache key-value checksum. Checksum is validated during read, so has non-trivial impact on read performance.
init( ROCKSDB_BLOCK_PROTECTION_BYTES_PER_KEY, 0 ); if ( randomize && BUGGIFY ) ROCKSDB_BLOCK_PROTECTION_BYTES_PER_KEY = 8; // Default: 0 (disabled). Supported values: 0, 1, 2, 4, 8.
init( ROCKSDB_ENABLE_NONDETERMINISM, false );
init( ROCKSDB_BLOCK_PROTECTION_BYTES_PER_KEY, 0 ); if ( randomize && BUGGIFY && g_network->isSimulated()) ROCKSDB_BLOCK_PROTECTION_BYTES_PER_KEY = 8; // Default: 0 (disabled). Supported values: 0, 1, 2, 4, 8.
init( ROCKSDB_ENABLE_NONDETERMINISM, false );
init( SHARDED_ROCKSDB_ALLOW_WRITE_STALL_ON_FLUSH, false );
init( SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO, 0.01 ); if (isSimulated) SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO = deterministicRandom()->random01();
init( SHARD_METADATA_SCAN_BYTES_LIMIT, 10485760 ); // 10MB
Expand Down Expand Up @@ -1411,4 +1411,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
(5.0 * VERSIONS_PER_SECOND);
clientKnobs->INIT_MID_SHARD_BYTES = MIN_SHARD_BYTES;
}

init(BULK_LOAD_USE_SST_INGEST, true); // Enable SST ingestion by default
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you do if (isSimulated) BULK_LOAD_USE_SST_INGEST = deterministicRandom()->coinflip() to improve the test coverage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (isSimulated) {
BULK_LOAD_USE_SST_INGEST = deterministicRandom()->coinflip();
}
}
56 changes: 47 additions & 9 deletions fdbclient/include/fdbclient/IKeyValueStore.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class IKeyValueStore : public IClosable {
// Returns true if the KV store supports shards, i.e., implements addRange(), removeRange(), and
// persistRangeMapping().
virtual bool shardAware() const { return false; }
// Returns true if the store supports external SST file ingestion.
virtual bool supportsSstIngestion() const { return false; }
virtual void set(KeyValueRef keyValue, const Arena* arena = nullptr) = 0;
virtual void clear(KeyRangeRef range, const Arena* arena = nullptr) = 0;
virtual Future<Void> canCommit() { return Void(); }
Expand Down Expand Up @@ -134,6 +136,9 @@ class IKeyValueStore : public IClosable {
// Delete a checkpoint.
virtual Future<Void> deleteCheckpoint(const CheckpointMetaData& checkpoint) { throw not_implemented(); }

// Compact a range of keys in the store
virtual Future<Void> compactRange(KeyRangeRef range) { throw not_implemented(); }

/*
Concurrency contract
Causal consistency:
Expand All @@ -157,6 +162,13 @@ class IKeyValueStore : public IClosable {
// Obtain the encryption mode of the storage. The encryption mode needs to match the encryption mode of the cluster.
virtual Future<EncryptionAtRestMode> encryptionMode() = 0;

// the files in localFileSets.
// Throws an error if the store does not support SST ingestion or if ingestion fails.
// It is the responsibility of the caller to ensure the directory exists and the fileSetMap is valid.
virtual Future<Void> ingestSSTFiles(std::shared_ptr<BulkLoadFileSetKeyMap> localFileSets) {
throw not_implemented();
}

protected:
virtual ~IKeyValueStore() {}
};
Expand All @@ -167,18 +179,44 @@ ACTOR static Future<Void> replaceRange_impl(IKeyValueStore* self,
state int sinceYield = 0;
state const KeyValueRef* kvItr = data.begin();
state KeyRangeRef rangeRef = range;
if (rangeRef.empty()) {
state bool errorOccurred = false;

try {
if (rangeRef.empty()) {
return Void();
}

// Clear the range first
self->clear(rangeRef);

// Set the new data
for (; kvItr != data.end(); kvItr++) {
try {
self->set(*kvItr);
if (++sinceYield > 1000) {
wait(yield());
sinceYield = 0;
}
} catch (Error& e) {
errorOccurred = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to change this? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only real change here is the addition of a wait on commit (later) before we progress. The rest is trying to get better logging when error.

In the ctest runs there was crashing doing replace range (centos7, gcc, only). I added commit to emulate what we do over in ingest where we wait on version change after clear... . Was thinking this crash like what we were seeing before waiting on version change after clear over on ingest... It looks to have helped. I can try removing... after the centos7/gcc build stabilizes.

ActorSingleCallback<(anonymous namespace)::ServeChangeFeedVersionUpdateRequestsActor, 0, ChangeFeedVersionUpdateRequest>::error(Error) at /root/build_output/fdbserver/storageserver.actor.g.cpp:65360
 (inlined by) error at /root/src/foundationdb/flow/include/flow/flow.h:1564
Callback<Void>::remove() at /root/src/foundationdb/flow/include/flow/flow.h:519
 (inlined by) a_exitChoose13 at /root/build_output/fdbserver/storageserver.actor.g.cpp:51501
 (inlined by) a_callback_fire at /root/build_output/fdbserver/storageserver.actor.g.cpp:51511
 (inlined by) fire at /root/src/foundationdb/flow/include/flow/flow.h:1536
(anonymous namespace)::ReplaceRange_implActorState<(anonymous namespace)::ReplaceRange_implActor>::a_body1loopBody1(int) at /root/build_output/fdbclient/include/fdbclient/IKeyValueStore.actor.g.h:269
KeyRangeRef::empty() const at /root/src/foundationdb/fdbclient/include/fdbclient/FDBTypes.h:338
 (inlined by) a_body1 at /root/src/foundationdb/fdbclient/include/fdbclient/IKeyValueStore.actor.h:182
 (inlined by) ReplaceRange_implActor at /root/build_output/fdbclient/include/fdbclient/IKeyValueStore.actor.g.h:465
 (inlined by) replaceRange_impl at /root/src/foundationdb/fdbclient/include/fdbclient/IKeyValueStore.actor.h:176
(anonymous namespace)::UpdateActorState<(anonymous namespace)::UpdateActor>::a_body1Catch2(Error const&, int) [clone .constprop.0] [clone .isra.0] at /root/build_output/fdbserver/storageserver.actor.g.cpp:49005
 (inlined by) a_body1Catch2cont1 at /root/src/foundationdb/fdbserver/storageserver.actor.cpp:12604
 (inlined by) a_body1Catch2 at /root/build_output/fdbserver/storageserver.actor.g.cpp:49022
logLongByteSampleRecovery(Future<Void> const&) at /root/build_output/fdbserver/storageserver.actor.g.cpp:61352
 (inlined by) a_body1Catch1 at /root/build_output/fdbserver/storageserver.actor.g.cpp:61384
 (inlined by) a_body1 at /root/build_output/fdbserver/storageserver.actor.g.cpp:61361
 (inlined by) LogLongByteSampleRecoveryActor at /root/build_output/fdbserver/storageserver.actor.g.cpp:61592
 (inlined by) logLongByteSampleRecovery(Future<Void> const&) at /root/src/foundationdb/fdbserver/storageserver.actor.cpp:14363
ReplyPromise<WatchValueReply>::operator=(ReplyPromise<WatchValueReply>&&) at /root/src/foundationdb/fdbrpc/include/fdbrpc/fdbrpc.h:181
 (inlined by) WatchValueRequest::operator=(WatchValueRequest&&) at /root/src/foundationdb/fdbclient/include/fdbclient/StorageServerInterface.h:330
 (inlined by) void detail::LoadMember<LoadContext<ArenaObjectReader> >::operator()<WatchValueRequest>(WatchValueRequest&) at /root/src/foundationdb/flow/include/flow/flat_buffers.h:1045
 (inlined by) void detail::for_each<detail::LoadMember<LoadContext<ArenaObjectReader> >, WatchValueRequest&>(detail::LoadMember<LoadContext<ArenaObjectReader> >&&, WatchValueRequest&) at /root/src/foundationdb/flow/include/flow/flat_buffers.h:685
 (inlined by) void detail::LoadSaveHelper<detail::FakeRoot<WatchValueRequest>, LoadContext<ArenaObjectReader> >::SerializeFun::operator()<WatchValueRequest>(WatchValueRequest&) at /root/src/foundationdb/flow/include/flow/flat_buffers.h:1121
 (inlined by) std::enable_if<is_fb_function<detail::LoadSaveHelper<detail::FakeRoot<WatchValueRequest>, LoadContext<ArenaObjectReader> >::SerializeFun>, void>::type serializer<detail::LoadSaveHelper<detail::FakeRoot<WatchValueRequest>, LoadContext<ArenaObjectReader> >::SerializeFun, WatchValueRequest>(detail::LoadSaveHelper<detail::FakeRoot<WatchValueRequest>, LoadContext<ArenaObjectReader> >::SerializeFun&, WatchValueRequest&) at /root/src/foundationdb/flow/include/flow/ObjectSerializerTraits.h:90
 (inlined by) void detail::FakeRoot<WatchValueRequest>::serialize_impl<detail::LoadSaveHelper<detail::FakeRoot<WatchValueRequest>, LoadContext<ArenaObjectReader> >::SerializeFun, 0ul>(detail::LoadSaveHelper<detail::FakeRoot<WatchValueRequest>, LoadContext<ArenaObjectReader> >::SerializeFun&, std::integer_sequence<unsigned long, 0ul>) at /root/src/foundationdb/flow/include/flow/flat_buffers.h:1301
 (inlined by) void detail::FakeRoot<WatchValueRequest>::serialize<detail::LoadSaveHelper<detail::FakeRoot<WatchValueRequest>, LoadContext<ArenaObjectReader> >::SerializeFun>(detail::LoadSaveHelper<detail::FakeRoot<WatchValueRequest>, LoadContext<ArenaObjectReader> >::SerializeFun&) at /root/src/foundationdb/flow/include/flow/flat_buffers.h:1295
 (inlined by) std::enable_if<expect_serialize_member<detail::FakeRoot<WatchValueRequest> >, void>::type detail::LoadSaveHelper<detail::FakeRoot<WatchValueRequest>, LoadContext<ArenaObjectReader> >::load<detail::FakeRoot<WatchValueRequest> >(detail::FakeRoot<WatchValueRequest>&, unsigned char const*) at /root/src/foundationdb/flow/include/flow/flat_buffers.h:1132
 (inlined by) void detail::load_helper<detail::FakeRoot<WatchValueRequest>, LoadContext<ArenaObjectReader> >(detail::FakeRoot<WatchValueRequest>&, unsigned char const*, LoadContext<ArenaObjectReader> const&) at /root/src/foundationdb/flow/include/flow/flat_buffers.h:1274
 (inlined by) void detail::load<detail::FakeRoot<WatchValueRequest>, LoadContext<ArenaObjectReader> >(detail::FakeRoot<WatchValueRequest>&, unsigned char const*, LoadContext<ArenaObjectReader>&) at /root/src/foundationdb/flow/include/flow/flat_buffers.h:1326
 (inlined by) void load_members<LoadContext<ArenaObjectReader>, WatchValueRequest>(unsigned char const*, LoadContext<ArenaObjectReader>&, WatchValueRequest&) at /root/src/foundationdb/flow/include/flow/flat_buffers.h:1347
 (inlined by) void _ObjectReader<ArenaObjectReader>::deserialize<WatchValueRequest>(unsigned int, WatchValueRequest&) at /root/src/foundationdb/flow/include/flow/ObjectSerializer.h:87
ActorCallback<(anonymous namespace)::NetworkSenderActor<EKPGetLatestBaseCipherKeysReply>, 0, EKPGetLatestBaseCipherKeysReply>::error(Error) at /root/build_output/fdbrpc/include/fdbrpc/networksender.actor.g.h:206
 (inlined by) error at /root/src/foundationdb/flow/include/flow/flow.h:1542
std::__cxx11::moneypunct<wchar_t, false>::_M_initialize_moneypunct(__locale_struct*, char const*) at ??:?
std::pair<MessageType const, std::string>::pair<MessageType, char const (&) [22], true>(MessageType&&, char const (&) [22]) at /opt/rh/devtoolset-11/root/usr/include/c++/11/bits/stl_pair.h:353
 (inlined by) __static_initialization_and_destruction_0 at /root/src/foundationdb/fdbclient/include/fdbclient/Status.h:96

TraceEvent(SevError, "ReplaceRangeError")
.error(e)
.detail("Key", kvItr->key)
.detail("ValueSize", kvItr->value.size());
throw;
}
}

// Wait for the changes to be durable
wait(self->commit());
return Void();
}
self->clear(rangeRef);
for (; kvItr != data.end(); kvItr++) {
self->set(*kvItr);
if (++sinceYield > 1000) {
wait(yield());
sinceYield = 0;
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
TraceEvent(SevError, "ReplaceRangeFailed").error(e).detail("Range", rangeRef).detail("DataSize", data.size());
throw;
}
return Void();
}

#include "flow/unactorcompiler.h"
Expand Down
2 changes: 2 additions & 0 deletions fdbclient/include/fdbclient/ServerKnobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,8 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ServerKnobs : public KnobsImpl<ServerKno
// Swift: Enable the Swift runtime hooks and use Swift implementations where possible
bool FLOW_WITH_SWIFT;

bool BULK_LOAD_USE_SST_INGEST; // Enable direct SST file ingestion for RocksDB storage engines

ServerKnobs(Randomize, ClientKnobs*, IsSimulated);
void initialize(Randomize, ClientKnobs*, IsSimulated);
};
2 changes: 1 addition & 1 deletion fdbclient/tests/fdb_cluster_fixture.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ function start_fdb_cluster {
"${local_build_dir}" \
--knobs "${knobs}" \
--stateless_count 1 --replication_count 1 --logs_count 1 \
--storage_count "${ss_count}" --storage_type ssd \
--storage_count "${ss_count}" --storage_type ssd-rocksdb-v1 \
Copy link
Member

@kakaiu kakaiu Apr 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to randomly choose between sqlite and rocksdb? for testing coverage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dunno. rocksdb is the standard, not sqllite.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want bulkload feature to stably work for both sqlite and rocksdb

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me make a new PR for this that allows setting of storage engine to use in ctest.

--dump_pids on \
> >(tee "${output}") \
2> >(tee "${output}" >&2)
Expand Down
142 changes: 141 additions & 1 deletion fdbserver/KeyValueStoreRocksDB.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
#include <memory>
#include <tuple>
#include <vector>
#include <fstream>

#endif // WITH_ROCKSDB

Expand Down Expand Up @@ -1318,6 +1319,82 @@ struct RocksDBKeyValueStore : IKeyValueStore {

void init() override {}

struct IngestSSTFilesAction : TypedAction<Writer, IngestSSTFilesAction> {
IngestSSTFilesAction(std::shared_ptr<BulkLoadFileSetKeyMap> localFileSets) : localFileSets(localFileSets) {}

double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }

std::shared_ptr<BulkLoadFileSetKeyMap> localFileSets;
ThreadReturnPromise<Void> done;
};

void action(IngestSSTFilesAction& a) {
// Create a list of SST files to ingest
std::vector<std::string> sstFiles;
for (const auto& [range, fileSet] : *a.localFileSets) {
if (fileSet.hasDataFile()) {
sstFiles.push_back(fileSet.getDataFileFullPath());
}
}

if (sstFiles.empty()) {
TraceEvent(SevInfo, "RocksDBIngestSSTFilesNoFiles", id);
a.done.send(Void()); // Nothing to ingest
return;
}

// Configure ingestion options
rocksdb::IngestExternalFileOptions options;
options.move_files = true;
options.verify_checksums_before_ingest = true;
options.allow_blocking_flush = true;
// write_global_seqno is default true which means on ingest the SST file is rewritten w/ seqno injected for
// each KV.

// Ingest the SST files
// The default column family parameter is necessary here; w/o it the ingested keyvalues are unreadable
rocksdb::Status status = db->IngestExternalFile(cf, sstFiles, options);

if (!status.ok()) {
logRocksDBError(id, status, "IngestSSTFiles");
a.done.sendError(statusToError(status));
return;
}

a.done.send(Void());
}

struct CompactRangeAction : TypedAction<Writer, CompactRangeAction> {
CompactRangeAction(KeyRangeRef range) : range(range) {}

double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }

const KeyRange range;
ThreadReturnPromise<Void> done;
};

void action(CompactRangeAction& a) {
// Configure compaction options
rocksdb::CompactRangeOptions options;
// Force RocksDB to rewrite file to last level
options.bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kForceOptimized;

// Convert key range to slices
auto begin = toSlice(a.range.begin);
auto end = toSlice(a.range.end);

// Perform the compaction
rocksdb::Status status = db->CompactRange(options, cf, &begin, &end);

if (!status.ok()) {
logRocksDBError(id, status, "CompactRange");
a.done.sendError(statusToError(status));
return;
}

a.done.send(Void());
}

struct OpenAction : TypedAction<Writer, OpenAction> {
std::string path;
ThreadReturnPromise<Void> done;
Expand Down Expand Up @@ -2141,6 +2218,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
void close() override { doClose(this, false); }

KeyValueStoreType getType() const override { return KeyValueStoreType(KeyValueStoreType::SSD_ROCKSDB_V1); }
bool supportsSstIngestion() const override { return true; }

Future<Void> init() override {
if (openFuture.isValid()) {
Expand Down Expand Up @@ -2492,6 +2570,20 @@ struct RocksDBKeyValueStore : IKeyValueStore {
return EncryptionAtRestMode(EncryptionAtRestMode::DISABLED);
}

Future<Void> ingestSSTFiles(std::shared_ptr<BulkLoadFileSetKeyMap> localFileSets) override {
auto a = new Writer::IngestSSTFilesAction(localFileSets);
auto res = a->done.getFuture();
writeThread->post(a);
return res;
}

Future<Void> compactRange(KeyRangeRef range) override {
auto a = new Writer::CompactRangeAction(range);
auto res = a->done.getFuture();
writeThread->post(a);
return res;
}

DB db = nullptr;
std::shared_ptr<SharedRocksDBState> sharedState;
std::shared_ptr<PerfContextMetrics> perfContextMetrics;
Expand Down Expand Up @@ -2963,6 +3055,54 @@ TEST_CASE("noSim/RocksDB/RangeClear") {
wait(closed);
return Void();
}
} // namespace

TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/IngestSSTFileVisibility") {
state std::string testDir = "test_ingest_sst_visibility";
state UID testStoreID = deterministicRandom()->randomUniqueID();
state RocksDBKeyValueStore* kvStore = new RocksDBKeyValueStore(testDir, testStoreID);

// Initialize the store
wait(kvStore->init());

// Create an SST file
state std::string sstFilename = "test.sst"; // Base filename
state std::string sstFileFullPath = joinPath(testDir, sstFilename); // Full path for writer
rocksdb::SstFileWriter sstWriter(rocksdb::EnvOptions(), kvStore->sharedState->getOptions());
ASSERT(sstWriter.Open(sstFileFullPath).ok()); // Use full path here
ASSERT(sstWriter.Put("test_key", "test_value").ok());
ASSERT(sstWriter.Finish().ok());

// Create and populate the file set map (which is a vector)
state std::shared_ptr<BulkLoadFileSetKeyMap> fileSetMap = std::make_shared<BulkLoadFileSetKeyMap>();
state std::string dummyManifestFile = "dummy_manifest.txt"; // Dummy filename for validation

// Create the BulkLoadFileSet using its constructor.
// Pass the test directory, dummy manifest, and the base SST filename.
BulkLoadFileSet fileSet(testDir, // rootPath
/*relativePath=*/"",
dummyManifestFile, // manifestFileName
sstFilename, // dataFileName (use base name)
/*byteSampleFileName=*/"",
BulkLoadChecksum()); // checksum

fileSetMap->emplace_back(allKeys, fileSet); // Use emplace_back for std::vector

// Ingest the SST file using the populated map
wait(kvStore->ingestSSTFiles(fileSetMap));

// Verify the key is visible
Optional<Value> value = wait(kvStore->readValue("test_key"_sr, Optional<ReadOptions>()));
ASSERT(value.present());
ASSERT(value.get() == "test_value"_sr);

// Clean up
Future<Void> closed = kvStore->onClosed(); // Get future before dispose
kvStore->dispose();
wait(closed); // Wait for close completion
platform::eraseDirectoryRecursive(testDir);

return Void();
}

} // namespace
#endif // WITH_ROCKSDB
Loading