diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index ef203eda31a..8d81c0d6c4e 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -619,7 +619,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( ROCKSDB_MEMTABLE_PROTECTION_BYTES_PER_KEY, 0 ); if ( randomize && BUGGIFY ) ROCKSDB_MEMTABLE_PROTECTION_BYTES_PER_KEY = 8; // Default: 0 (disabled). Supported values: 0, 1, 2, 4, 8. // Block cache key-value checksum. Checksum is validated during read, so has non-trivial impact on read performance. init( ROCKSDB_BLOCK_PROTECTION_BYTES_PER_KEY, 0 ); if ( randomize && BUGGIFY ) ROCKSDB_BLOCK_PROTECTION_BYTES_PER_KEY = 8; // Default: 0 (disabled). Supported values: 0, 1, 2, 4, 8. - init( ROCKSDB_ENABLE_NONDETERMINISM, false ); + init( ROCKSDB_ENABLE_NONDETERMINISM, false ); init( SHARDED_ROCKSDB_ALLOW_WRITE_STALL_ON_FLUSH, false ); init( SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO, 0.01 ); if (isSimulated) SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO = deterministicRandom()->random01(); init( SHARD_METADATA_SCAN_BYTES_LIMIT, 10485760 ); // 10MB @@ -1410,4 +1410,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi (5.0 * VERSIONS_PER_SECOND); clientKnobs->INIT_MID_SHARD_BYTES = MIN_SHARD_BYTES; } + + init(BULK_LOAD_USE_SST_INGEST, true); // Enable SST ingestion by default + if (isSimulated) { + BULK_LOAD_USE_SST_INGEST = deterministicRandom()->coinflip(); + } } diff --git a/fdbclient/include/fdbclient/IKeyValueStore.actor.h b/fdbclient/include/fdbclient/IKeyValueStore.actor.h index e76bc040cee..9ad9a428fce 100644 --- a/fdbclient/include/fdbclient/IKeyValueStore.actor.h +++ b/fdbclient/include/fdbclient/IKeyValueStore.actor.h @@ -62,6 +62,8 @@ class IKeyValueStore : public IClosable { // Returns true if the KV store supports shards, i.e., implements addRange(), removeRange(), and // persistRangeMapping(). virtual bool shardAware() const { return false; } + // Returns true if the store supports external SST file ingestion. + virtual bool supportsSstIngestion() const { return false; } virtual void set(KeyValueRef keyValue, const Arena* arena = nullptr) = 0; virtual void clear(KeyRangeRef range, const Arena* arena = nullptr) = 0; virtual Future canCommit() { return Void(); } @@ -157,6 +159,11 @@ class IKeyValueStore : public IClosable { // Obtain the encryption mode of the storage. The encryption mode needs to match the encryption mode of the cluster. virtual Future encryptionMode() = 0; + virtual Future ingestSSTFiles(std::string bulkLoadLocalDir, + std::shared_ptr localFileSets) { + throw not_implemented(); + } + protected: virtual ~IKeyValueStore() {} }; diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 094d0aca73f..1ca6e3457bd 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -1456,6 +1456,8 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ServerKnobs : public KnobsImpl >(tee "${output}") \ 2> >(tee "${output}" >&2) diff --git a/fdbserver/KeyValueStoreRocksDB.actor.cpp b/fdbserver/KeyValueStoreRocksDB.actor.cpp index d901e9c1197..3e05d7affa8 100644 --- a/fdbserver/KeyValueStoreRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreRocksDB.actor.cpp @@ -61,6 +61,7 @@ #include #include #include +#include #endif // WITH_ROCKSDB @@ -2141,6 +2142,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { void close() override { doClose(this, false); } KeyValueStoreType getType() const override { return KeyValueStoreType(KeyValueStoreType::SSD_ROCKSDB_V1); } + bool supportsSstIngestion() const override { return true; } Future init() override { if (openFuture.isValid()) { @@ -2492,6 +2494,37 @@ struct RocksDBKeyValueStore : IKeyValueStore { return EncryptionAtRestMode(EncryptionAtRestMode::DISABLED); } + Future ingestSSTFiles(std::string bulkLoadLocalDir, + std::shared_ptr localFileSets) override { + // Create a list of SST files to ingest + std::vector sstFiles; + for (const auto& [range, fileSet] : *localFileSets) { + if (fileSet.hasDataFile()) { + sstFiles.push_back(fileSet.getDataFileFullPath()); + } + } + + if (sstFiles.empty()) { + TraceEvent(SevInfo, "RocksDBIngestSSTFilesNoFiles", id); + return Void(); // Nothing to ingest + } + + // Configure ingestion options + rocksdb::IngestExternalFileOptions options; + options.move_files = true; + options.verify_checksums_before_ingest = true; + + // Ingest the SST files + // The default column family parameter is necessary here; w/o it the ingested keyvalues are unreadable + rocksdb::Status status = db->IngestExternalFile(defaultFdbCF, sstFiles, options); + if (!status.ok()) { + TraceEvent(SevError, "RocksDBIngestSSTFilesError", id).detail("Error", status.ToString()); + throw internal_error(); + } + + return Void(); + } + DB db = nullptr; std::shared_ptr sharedState; std::shared_ptr perfContextMetrics; @@ -2963,6 +2996,36 @@ TEST_CASE("noSim/RocksDB/RangeClear") { wait(closed); return Void(); } -} // namespace +TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/IngestSSTFileVisibility") { + state std::string testDir = "test_ingest_sst_visibility"; + state UID testStoreID = deterministicRandom()->randomUniqueID(); + state RocksDBKeyValueStore* kvStore = new RocksDBKeyValueStore(testDir, testStoreID); + + // Initialize the store + wait(kvStore->init()); + + // Create and ingest an SST file + state std::string sstFile = testDir + "/test.sst"; + rocksdb::SstFileWriter sstWriter(rocksdb::EnvOptions(), kvStore->sharedState->getOptions()); + ASSERT(sstWriter.Open(sstFile).ok()); + ASSERT(sstWriter.Put("test_key", "test_value").ok()); + ASSERT(sstWriter.Finish().ok()); + + // Ingest the SST file + wait(kvStore->ingestSSTFiles(testDir, std::make_shared())); + + // Verify the key is visible + Optional value = wait(kvStore->readValue("test_key"_sr, Optional())); + ASSERT(value.present()); + ASSERT(value.get() == "test_value"_sr); + + // Clean up + kvStore->dispose(); + platform::eraseDirectoryRecursive(testDir); + + return Void(); +} + +} // namespace #endif // WITH_ROCKSDB diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index a709b012f73..895039a945f 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -8892,6 +8893,85 @@ ACTOR Future tryGetRangeForBulkLoad(PromiseStream results, } } +// Utility function to process sample files during bulk load +ACTOR static Future processSampleFiles(StorageServer* data, + std::string bulkLoadLocalDir, + std::shared_ptr localFileSets) { + state BulkLoadFileSetKeyMap::const_iterator iter = localFileSets->begin(); + state BulkLoadFileSetKeyMap::const_iterator end = localFileSets->end(); + state std::vector rawSamples; + state std::unique_ptr reader; + + while (iter != end) { + const auto& [range, fileSet] = *iter; + if (fileSet.hasByteSampleFile()) { + state std::string sampleFilePath = fileSet.getBytesSampleFileFullPath(); + state int retryCount = 0; + state int maxRetries = 10; // Consider making this a KNOB + state Error lastError; + + // This outer loop retries reading the entire file if errors occur during opening/reading + while (retryCount < maxRetries) { + try { + // Read all samples from the SST file into memory first + // Store as KeyValueRef to keep the original encoded size value + rawSamples.clear(); + reader = newRocksDBSstFileReader(); + reader->open(abspath(sampleFilePath)); + + TraceEvent(SevInfo, "StorageServerProcessingSampleFile", data->thisServerID) + .detail("File", sampleFilePath); + + // Read all samples + while (reader->hasNext()) { + KeyValue kv = reader->next(); + // Store the raw KeyValueRef + rawSamples.push_back(KeyValueRef(kv.key, kv.value)); + } + + // Now apply all read samples to the in-memory set and update metrics + for (const auto& kv : rawSamples) { + const KeyRef& key = kv.key; + int64_t size = BinaryReader::fromStringRef(kv.value, Unversioned()); + data->metrics.byteSample.sample.insert(key, size); + data->metrics.notifyBytes(key, size); + data->addMutationToMutationLogOrStorage( + invalidVersion, + MutationRef(MutationRef::SetValue, key.withPrefix(persistByteSampleKeys.begin), kv.value)); + } + + // If we get here, processing was successful for this file + break; // Exit the retry loop + + } catch (Error& e) { + lastError = e; + retryCount++; + TraceEvent(retryCount < maxRetries ? SevWarn : SevError, + "StorageServerSampleFileProcessingError", + data->thisServerID) + .error(e) // Log the actual error 'e' + .detail("File", sampleFilePath) + // REMOVED: .detail("Key", kv.key).detail("Value", kv.value) as 'kv' is out of scope + .detail("RetryCount", retryCount) + .detail("MaxRetries", maxRetries); + + // No need to check/close reader here, unique_ptr handles it. + + if (retryCount < maxRetries) { + // Wait before retrying, with exponential backoff + wait(delay(0.1 * pow(2, retryCount))); // Consider adding jitter + continue; // Retry reading the file + } + // On final retry failure, throw the last error encountered + throw lastError; + } + } // end retry loop + } + ++iter; + } // end file iteration loop + return Void(); +} + ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { state const UID fetchKeysID = deterministicRandom()->randomUniqueID(); state TraceInterval interval("FetchKeys"); @@ -9135,9 +9215,36 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchKey", data->thisServerID) .detail("DataMoveId", dataMoveId.toString()) .detail("Range", keys) + .detail("Knobs", SERVER_KNOBS->BULK_LOAD_USE_SST_INGEST) + .detail("SupportsSstIngestion", data->storage.getKeyValueStore()->supportsSstIngestion()) .detail("Phase", "File download"); - hold = tryGetRangeForBulkLoad(results, keys, localBulkLoadFileSets); - rangeEnd = keys.end; + // Attempt SST ingestion... + if (SERVER_KNOBS->BULK_LOAD_USE_SST_INGEST && + data->storage.getKeyValueStore()->supportsSstIngestion()) { + TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchKey", data->thisServerID) + .detail("DataMoveId", dataMoveId.toString()) + .detail("Range", keys) + .detail("Phase", "SST ingestion"); + // Verify ranges... + for (const auto& [range, fileSet] : *localBulkLoadFileSets) { + ASSERT(keys.contains(range)); + } + // Clear the key range before ingestion. This mirrors the replaceRange done in the case were we do + // not ingest SST files. + // data->storage.getKeyValueStore()->clear(keys); + // TODO: this is a blocking call. We should make this as async call. + wait(data->storage.getKeyValueStore()->ingestSSTFiles(bulkLoadLocalDir, localBulkLoadFileSets)); + + // Process sample files after SST ingestion + wait(processSampleFiles(data, bulkLoadLocalDir, localBulkLoadFileSets)); + + // NOTICE: We break the 'fetchKeys' loop here if we successfully ingest the SST files. + // EARLY EXIT FROM 'fetchKeys' LOOP!!! + break; + } else { + hold = tryGetRangeForBulkLoad(results, keys, localBulkLoadFileSets); + rangeEnd = keys.end; + } } else { hold = tryGetRange(results, &tr, keys); rangeEnd = keys.end; @@ -9219,6 +9326,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { for (; kvItr != this_block.end(); ++kvItr) { data->byteSampleApplySet(*kvItr, invalidVersion); } + if (this_block.more) { blockBegin = this_block.getReadThrough(); } else { @@ -9307,7 +9415,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { } break; } - } + } // fetchKeys loop. // We have completed the fetch and write of the data, now we wait for MVCC window to pass. // As we have finished this work, we will allow more work to start...