Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion fdbclient/ServerKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_MEMTABLE_PROTECTION_BYTES_PER_KEY, 0 ); if ( randomize && BUGGIFY ) ROCKSDB_MEMTABLE_PROTECTION_BYTES_PER_KEY = 8; // Default: 0 (disabled). Supported values: 0, 1, 2, 4, 8.
// Block cache key-value checksum. Checksum is validated during read, so has non-trivial impact on read performance.
init( ROCKSDB_BLOCK_PROTECTION_BYTES_PER_KEY, 0 ); if ( randomize && BUGGIFY ) ROCKSDB_BLOCK_PROTECTION_BYTES_PER_KEY = 8; // Default: 0 (disabled). Supported values: 0, 1, 2, 4, 8.
init( ROCKSDB_ENABLE_NONDETERMINISM, false );
init( ROCKSDB_ENABLE_NONDETERMINISM, false );
init( SHARDED_ROCKSDB_ALLOW_WRITE_STALL_ON_FLUSH, false );
init( SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO, 0.01 ); if (isSimulated) SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO = deterministicRandom()->random01();
init( SHARD_METADATA_SCAN_BYTES_LIMIT, 10485760 ); // 10MB
Expand Down Expand Up @@ -1410,4 +1410,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
(5.0 * VERSIONS_PER_SECOND);
clientKnobs->INIT_MID_SHARD_BYTES = MIN_SHARD_BYTES;
}

init(BULK_LOAD_USE_SST_INGEST, true); // Enable SST ingestion by default
if (isSimulated) {
BULK_LOAD_USE_SST_INGEST = deterministicRandom()->coinflip();
}
}
7 changes: 7 additions & 0 deletions fdbclient/include/fdbclient/IKeyValueStore.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class IKeyValueStore : public IClosable {
// Returns true if the KV store supports shards, i.e., implements addRange(), removeRange(), and
// persistRangeMapping().
virtual bool shardAware() const { return false; }
// Returns true if the store supports external SST file ingestion.
virtual bool supportsSstIngestion() const { return false; }
virtual void set(KeyValueRef keyValue, const Arena* arena = nullptr) = 0;
virtual void clear(KeyRangeRef range, const Arena* arena = nullptr) = 0;
virtual Future<Void> canCommit() { return Void(); }
Expand Down Expand Up @@ -157,6 +159,11 @@ class IKeyValueStore : public IClosable {
// Obtain the encryption mode of the storage. The encryption mode needs to match the encryption mode of the cluster.
virtual Future<EncryptionAtRestMode> encryptionMode() = 0;

virtual Future<Void> ingestSSTFiles(std::string bulkLoadLocalDir,
std::shared_ptr<BulkLoadFileSetKeyMap> localFileSets) {
throw not_implemented();
}

protected:
virtual ~IKeyValueStore() {}
};
Expand Down
2 changes: 2 additions & 0 deletions fdbclient/include/fdbclient/ServerKnobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -1456,6 +1456,8 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ServerKnobs : public KnobsImpl<ServerKno
// Swift: Enable the Swift runtime hooks and use Swift implementations where possible
bool FLOW_WITH_SWIFT;

bool BULK_LOAD_USE_SST_INGEST; // Enable direct SST file ingestion for RocksDB storage engines

ServerKnobs(Randomize, ClientKnobs*, IsSimulated);
void initialize(Randomize, ClientKnobs*, IsSimulated);
};
2 changes: 1 addition & 1 deletion fdbclient/tests/fdb_cluster_fixture.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ function start_fdb_cluster {
"${local_build_dir}" \
--knobs "${knobs}" \
--stateless_count 1 --replication_count 1 --logs_count 1 \
--storage_count "${ss_count}" --storage_type ssd \
--storage_count "${ss_count}" --storage_type ssd-rocksdb-v1 \
--dump_pids on \
> >(tee "${output}") \
2> >(tee "${output}" >&2)
Expand Down
65 changes: 64 additions & 1 deletion fdbserver/KeyValueStoreRocksDB.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
#include <memory>
#include <tuple>
#include <vector>
#include <fstream>

#endif // WITH_ROCKSDB

Expand Down Expand Up @@ -2141,6 +2142,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
void close() override { doClose(this, false); }

KeyValueStoreType getType() const override { return KeyValueStoreType(KeyValueStoreType::SSD_ROCKSDB_V1); }
bool supportsSstIngestion() const override { return true; }

Future<Void> init() override {
if (openFuture.isValid()) {
Expand Down Expand Up @@ -2492,6 +2494,37 @@ struct RocksDBKeyValueStore : IKeyValueStore {
return EncryptionAtRestMode(EncryptionAtRestMode::DISABLED);
}

Future<Void> ingestSSTFiles(std::string bulkLoadLocalDir,
std::shared_ptr<BulkLoadFileSetKeyMap> localFileSets) override {
// Create a list of SST files to ingest
std::vector<std::string> sstFiles;
for (const auto& [range, fileSet] : *localFileSets) {
if (fileSet.hasDataFile()) {
sstFiles.push_back(fileSet.getDataFileFullPath());
}
}

if (sstFiles.empty()) {
TraceEvent(SevInfo, "RocksDBIngestSSTFilesNoFiles", id);
return Void(); // Nothing to ingest
}

// Configure ingestion options
rocksdb::IngestExternalFileOptions options;
options.move_files = true;
options.verify_checksums_before_ingest = true;

// Ingest the SST files
// The default column family parameter is necessary here; w/o it the ingested keyvalues are unreadable
rocksdb::Status status = db->IngestExternalFile(defaultFdbCF, sstFiles, options);
if (!status.ok()) {
TraceEvent(SevError, "RocksDBIngestSSTFilesError", id).detail("Error", status.ToString());
throw internal_error();
}

return Void();
}

DB db = nullptr;
std::shared_ptr<SharedRocksDBState> sharedState;
std::shared_ptr<PerfContextMetrics> perfContextMetrics;
Expand Down Expand Up @@ -2963,6 +2996,36 @@ TEST_CASE("noSim/RocksDB/RangeClear") {
wait(closed);
return Void();
}
} // namespace

TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/IngestSSTFileVisibility") {
state std::string testDir = "test_ingest_sst_visibility";
state UID testStoreID = deterministicRandom()->randomUniqueID();
state RocksDBKeyValueStore* kvStore = new RocksDBKeyValueStore(testDir, testStoreID);

// Initialize the store
wait(kvStore->init());

// Create and ingest an SST file
state std::string sstFile = testDir + "/test.sst";
rocksdb::SstFileWriter sstWriter(rocksdb::EnvOptions(), kvStore->sharedState->getOptions());
ASSERT(sstWriter.Open(sstFile).ok());
ASSERT(sstWriter.Put("test_key", "test_value").ok());
ASSERT(sstWriter.Finish().ok());

// Ingest the SST file
wait(kvStore->ingestSSTFiles(testDir, std::make_shared<BulkLoadFileSetKeyMap>()));

// Verify the key is visible
Optional<Value> value = wait(kvStore->readValue("test_key"_sr, Optional<ReadOptions>()));
ASSERT(value.present());
ASSERT(value.get() == "test_value"_sr);

// Clean up
kvStore->dispose();
platform::eraseDirectoryRecursive(testDir);

return Void();
}

} // namespace
#endif // WITH_ROCKSDB
114 changes: 111 additions & 3 deletions fdbserver/storageserver.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <cinttypes>
#include <functional>
#include <fstream>
#include <iterator>
#include <limits>
#include <memory>
Expand Down Expand Up @@ -8892,6 +8893,85 @@ ACTOR Future<Void> tryGetRangeForBulkLoad(PromiseStream<RangeResult> results,
}
}

// Utility function to process sample files during bulk load
ACTOR static Future<Void> processSampleFiles(StorageServer* data,
std::string bulkLoadLocalDir,
std::shared_ptr<BulkLoadFileSetKeyMap> localFileSets) {
state BulkLoadFileSetKeyMap::const_iterator iter = localFileSets->begin();
state BulkLoadFileSetKeyMap::const_iterator end = localFileSets->end();
state std::vector<KeyValue> rawSamples;
state std::unique_ptr<IRocksDBSstFileReader> reader;

while (iter != end) {
const auto& [range, fileSet] = *iter;
if (fileSet.hasByteSampleFile()) {
state std::string sampleFilePath = fileSet.getBytesSampleFileFullPath();
state int retryCount = 0;
state int maxRetries = 10; // Consider making this a KNOB
state Error lastError;

// This outer loop retries reading the entire file if errors occur during opening/reading
while (retryCount < maxRetries) {
try {
// Read all samples from the SST file into memory first
// Store as KeyValueRef to keep the original encoded size value
rawSamples.clear();
reader = newRocksDBSstFileReader();
reader->open(abspath(sampleFilePath));

TraceEvent(SevInfo, "StorageServerProcessingSampleFile", data->thisServerID)
.detail("File", sampleFilePath);

// Read all samples
while (reader->hasNext()) {
KeyValue kv = reader->next();
// Store the raw KeyValueRef
rawSamples.push_back(KeyValueRef(kv.key, kv.value));
}

// Now apply all read samples to the in-memory set and update metrics
for (const auto& kv : rawSamples) {
const KeyRef& key = kv.key;
int64_t size = BinaryReader::fromStringRef<int64_t>(kv.value, Unversioned());
data->metrics.byteSample.sample.insert(key, size);
data->metrics.notifyBytes(key, size);
data->addMutationToMutationLogOrStorage(
invalidVersion,
MutationRef(MutationRef::SetValue, key.withPrefix(persistByteSampleKeys.begin), kv.value));
}

// If we get here, processing was successful for this file
break; // Exit the retry loop

} catch (Error& e) {
lastError = e;
retryCount++;
TraceEvent(retryCount < maxRetries ? SevWarn : SevError,
"StorageServerSampleFileProcessingError",
data->thisServerID)
.error(e) // Log the actual error 'e'
.detail("File", sampleFilePath)
// REMOVED: .detail("Key", kv.key).detail("Value", kv.value) as 'kv' is out of scope
.detail("RetryCount", retryCount)
.detail("MaxRetries", maxRetries);

// No need to check/close reader here, unique_ptr handles it.

if (retryCount < maxRetries) {
// Wait before retrying, with exponential backoff
wait(delay(0.1 * pow(2, retryCount))); // Consider adding jitter
continue; // Retry reading the file
}
// On final retry failure, throw the last error encountered
throw lastError;
}
} // end retry loop
}
++iter;
} // end file iteration loop
return Void();
}

ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
state const UID fetchKeysID = deterministicRandom()->randomUniqueID();
state TraceInterval interval("FetchKeys");
Expand Down Expand Up @@ -9135,9 +9215,36 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchKey", data->thisServerID)
.detail("DataMoveId", dataMoveId.toString())
.detail("Range", keys)
.detail("Knobs", SERVER_KNOBS->BULK_LOAD_USE_SST_INGEST)
.detail("SupportsSstIngestion", data->storage.getKeyValueStore()->supportsSstIngestion())
.detail("Phase", "File download");
hold = tryGetRangeForBulkLoad(results, keys, localBulkLoadFileSets);
rangeEnd = keys.end;
// Attempt SST ingestion...
if (SERVER_KNOBS->BULK_LOAD_USE_SST_INGEST &&
data->storage.getKeyValueStore()->supportsSstIngestion()) {
TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchKey", data->thisServerID)
.detail("DataMoveId", dataMoveId.toString())
.detail("Range", keys)
.detail("Phase", "SST ingestion");
// Verify ranges...
for (const auto& [range, fileSet] : *localBulkLoadFileSets) {
ASSERT(keys.contains(range));
}
// Clear the key range before ingestion. This mirrors the replaceRange done in the case were we do
// not ingest SST files.
// data->storage.getKeyValueStore()->clear(keys);
// TODO: this is a blocking call. We should make this as async call.
wait(data->storage.getKeyValueStore()->ingestSSTFiles(bulkLoadLocalDir, localBulkLoadFileSets));

// Process sample files after SST ingestion
wait(processSampleFiles(data, bulkLoadLocalDir, localBulkLoadFileSets));

// NOTICE: We break the 'fetchKeys' loop here if we successfully ingest the SST files.
// EARLY EXIT FROM 'fetchKeys' LOOP!!!
break;
} else {
hold = tryGetRangeForBulkLoad(results, keys, localBulkLoadFileSets);
rangeEnd = keys.end;
}
} else {
hold = tryGetRange(results, &tr, keys);
rangeEnd = keys.end;
Expand Down Expand Up @@ -9219,6 +9326,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
for (; kvItr != this_block.end(); ++kvItr) {
data->byteSampleApplySet(*kvItr, invalidVersion);
}

if (this_block.more) {
blockBegin = this_block.getReadThrough();
} else {
Expand Down Expand Up @@ -9307,7 +9415,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
}
break;
}
}
} // fetchKeys loop.

// We have completed the fetch and write of the data, now we wait for MVCC window to pass.
// As we have finished this work, we will allow more work to start...
Expand Down