Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fdbclient/ClientKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( BLOBSTORE_MAX_SEND_BYTES_PER_SECOND, 1e9 );
init( BLOBSTORE_MAX_RECV_BYTES_PER_SECOND, 1e9 );

init( BLOBSTORE_MAX_DELAY_RETRYABLE_ERROR, 60 );
init( BLOBSTORE_MAX_DELAY_RETRYABLE_ERROR, 20 ); // Align with AWS SDK best practices (was 60s)
init( BLOBSTORE_MAX_DELAY_CONNECTION_FAILED, 10 );
init (BLOBSTORE_ENABLE_OBJECT_INTEGRITY_CHECK,true );

Expand Down
16 changes: 14 additions & 2 deletions fdbclient/S3Client.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,6 @@ ACTOR static Future<Void> copyDownFile(Reference<S3BlobStoreEndpoint> endpoint,
std::string objectName,
std::string filepath,
PartConfig config = PartConfig()) {
state double startTime = now();
state Reference<IAsyncFile> file;
state std::vector<PartState> parts;
state int64_t fileSize = 0;
Expand All @@ -840,7 +839,14 @@ ACTOR static Future<Void> copyDownFile(Reference<S3BlobStoreEndpoint> endpoint,
.detail("FilePath", filepath)
.detail("Attempt", retries);

TraceEvent(SevDebug, "S3ClientCopyDownFileBeforeObjectSize")
.detail("Bucket", bucket)
.detail("Object", objectName);
int64_t s = wait(endpoint->objectSize(bucket, objectName));
TraceEvent(SevDebug, "S3ClientCopyDownFileAfterObjectSize")
.detail("Bucket", bucket)
.detail("Object", objectName)
.detail("Size", s);
if (s <= 0) {
TraceEvent(SevWarnAlways, "S3ClientCopyDownFileEmptyFile")
.detail("Bucket", bucket)
Expand Down Expand Up @@ -929,7 +935,6 @@ ACTOR static Future<Void> copyDownFile(Reference<S3BlobStoreEndpoint> endpoint,
.detail("Bucket", bucket)
.detail("ObjectName", objectName)
.detail("FileSize", fileSize)
.detail("Duration", now() - startTime)
.detail("Checksum", expectedChecksum)
.detail("Parts", parts.size());

Expand Down Expand Up @@ -994,10 +999,17 @@ ACTOR static Future<Void> copyDownFile(Reference<S3BlobStoreEndpoint> endpoint,
}

ACTOR Future<Void> copyDownFile(std::string s3url, std::string filepath) {
TraceEvent(SevDebug, "S3ClientCopyDownFileWrapperStart").detail("S3URL", s3url).detail("FilePath", filepath);
std::string resource;
S3BlobStoreEndpoint::ParametersT parameters;
TraceEvent(SevDebug, "S3ClientCopyDownFileBeforeGetEndpoint").detail("S3URL", s3url);
Reference<S3BlobStoreEndpoint> endpoint = getEndpoint(s3url, resource, parameters);
TraceEvent(SevDebug, "S3ClientCopyDownFileAfterGetEndpoint")
.detail("S3URL", s3url)
.detail("Resource", resource)
.detail("Bucket", parameters["bucket"]);
wait(copyDownFile(endpoint, parameters["bucket"], resource, filepath));
TraceEvent(SevDebug, "S3ClientCopyDownFileWrapperEnd").detail("S3URL", s3url).detail("FilePath", filepath);
return Void();
}

Expand Down
2 changes: 2 additions & 0 deletions fdbclient/ServerKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi

// BulkLoading
init( BULKLOAD_FILE_BYTES_MAX, 1*1024*1024*1024 ); // 1GB
init( BULKLOAD_DOWNLOAD_RETRY_DELAY, 2.0 ); // Retry delay for bulk load file downloads - reasonable for both S3 and simulation
init( BULKLOAD_DOWNLOAD_MAX_RETRIES, 20 ); // Maximum retries for bulk load downloads - 20 retries × 2s = 40s total
init( BULKLOAD_BYTE_SAMPLE_BATCH_KEY_COUNT, 10000 ); if( randomize && BUGGIFY ) BULKLOAD_BYTE_SAMPLE_BATCH_KEY_COUNT = deterministicRandom()->randomInt(2, 1000);
init( DD_BULKLOAD_SHARD_BOUNDARY_CHANGE_DELAY_SEC, 60.0 ); if( randomize && BUGGIFY ) DD_BULKLOAD_SHARD_BOUNDARY_CHANGE_DELAY_SEC = deterministicRandom()->random01() * 10 + 1;
init( DD_BULKLOAD_TASK_METADATA_READ_SIZE, 100 ); if( randomize && BUGGIFY ) DD_BULKLOAD_TASK_METADATA_READ_SIZE = deterministicRandom()->randomInt(2, 100);
Expand Down
2 changes: 2 additions & 0 deletions fdbclient/include/fdbclient/ServerKnobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ServerKnobs : public KnobsImpl<ServerKno
double DD_FIX_WRONG_REPLICAS_DELAY; // the amount of time between attempts to increase the replication factor of
// under replicated shards
int BULKLOAD_FILE_BYTES_MAX; // the maximum bytes of files to inject by bulk loading
double BULKLOAD_DOWNLOAD_RETRY_DELAY; // seconds to wait between retries when downloading bulk load files
int BULKLOAD_DOWNLOAD_MAX_RETRIES; // maximum number of retries when downloading bulk load files
int BULKLOAD_BYTE_SAMPLE_BATCH_KEY_COUNT; // the maximum key count that can be successively sampled when bulkload
double DD_BULKLOAD_SHARD_BOUNDARY_CHANGE_DELAY_SEC; // seconds to delay shard boundary change when blocked by bulk
// loading
Expand Down
70 changes: 66 additions & 4 deletions fdbserver/BulkLoadUtil.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,15 +267,33 @@ ACTOR Future<Void> bulkLoadTransportBlobstore_impl(BulkLoadFileSet fromRemoteFil
UID logId) {
// Clear existing local folder
resetFileFolder(abspath(toLocalFileSet.getFolder()));
TraceEvent(SevDebug, "BulkLoadBlobstoreTransportStart", logId)
.detail("FromRemote", fromRemoteFileSet.toString())
.detail("ToLocal", toLocalFileSet.toString())
.detail("HasDataFile", fromRemoteFileSet.hasDataFile());
// TODO(BulkLoad): Make use of fileBytesMax
// TODO: File-at-a-time costs because we make connection for each.
wait(copyDownFile(fromRemoteFileSet.getDataFileFullPath(), abspath(toLocalFileSet.getDataFileFullPath())));
// Skip data file download if the range is empty (no data file)
if (fromRemoteFileSet.hasDataFile()) {
TraceEvent(SevDebug, "BulkLoadBlobstoreBeforeCopyDataFile", logId)
.detail("FromPath", fromRemoteFileSet.getDataFileFullPath())
.detail("ToPath", abspath(toLocalFileSet.getDataFileFullPath()));
wait(copyDownFile(fromRemoteFileSet.getDataFileFullPath(), abspath(toLocalFileSet.getDataFileFullPath())));
TraceEvent(SevDebug, "BulkLoadBlobstoreAfterCopyDataFile", logId);
} else {
TraceEvent("BulkLoadBlobstoreSkipEmptyRange", logId).detail("Reason", "No data file for empty range");
}
// Copy byte sample file if exists
if (fromRemoteFileSet.hasByteSampleFile()) {
TraceEvent(SevDebug, "BulkLoadBlobstoreBeforeCopySampleFile", logId)
.detail("FromPath", fromRemoteFileSet.getBytesSampleFileFullPath())
.detail("ToPath", abspath(toLocalFileSet.getBytesSampleFileFullPath()));
wait(copyDownFile(fromRemoteFileSet.getBytesSampleFileFullPath(),
abspath(toLocalFileSet.getBytesSampleFileFullPath())));
TraceEvent(SevDebug, "BulkLoadBlobstoreAfterCopySampleFile", logId);
}
// TODO(BulkLoad): Throw error if the date/bytesample file does not exist while the filename is not empty
TraceEvent(SevDebug, "BulkLoadBlobstoreTransportEnd", logId);
return Void();
}

Expand All @@ -286,8 +304,15 @@ ACTOR Future<BulkLoadFileSet> bulkLoadDownloadTaskFileSet(BulkLoadTransportMetho
state int retryCount = 0;
state double startTime = now();
ASSERT(transportMethod != BulkLoadTransportMethod::Invalid);
TraceEvent(SevDebug, "BulkLoadDownloadTaskFileSetStart", logId)
.detail("FromRemoteFileSet", fromRemoteFileSet.toString())
.detail("ToLocalRoot", toLocalRoot)
.detail("TransportMethod", transportMethod);
loop {
try {
TraceEvent(SevDebug, "BulkLoadDownloadTaskFileSetAttempt", logId)
.detail("RetryCount", retryCount)
.detail("Elapsed", now() - startTime);
// Step 1: Generate local file set based on remote file set by replacing the remote root to the local root.
state BulkLoadFileSet toLocalFileSet(toLocalRoot,
fromRemoteFileSet.getRelativePath(),
Expand All @@ -299,15 +324,19 @@ ACTOR Future<BulkLoadFileSet> bulkLoadDownloadTaskFileSet(BulkLoadTransportMetho
// Step 2: Download remote file set to local folder
if (transportMethod == BulkLoadTransportMethod::CP) {
ASSERT(fromRemoteFileSet.hasDataFile());
TraceEvent(SevDebug, "BulkLoadDownloadBeforeCP", logId).detail("Elapsed", now() - startTime);
// Copy the data file and the sample file from remote folder to a local folder specified by
// fromRemoteFileSet.
wait(bulkLoadTransportCP_impl(
fromRemoteFileSet, toLocalFileSet, SERVER_KNOBS->BULKLOAD_FILE_BYTES_MAX, logId));
TraceEvent(SevDebug, "BulkLoadDownloadAfterCP", logId).detail("Elapsed", now() - startTime);
} else if (transportMethod == BulkLoadTransportMethod::BLOBSTORE) {
TraceEvent(SevDebug, "BulkLoadDownloadBeforeBlobstore", logId).detail("Elapsed", now() - startTime);
// Copy the data file and the sample file from remote folder to a local folder specified by
// fromRemoteFileSet.
wait(bulkLoadTransportBlobstore_impl(
fromRemoteFileSet, toLocalFileSet, SERVER_KNOBS->BULKLOAD_FILE_BYTES_MAX, logId));
TraceEvent(SevDebug, "BulkLoadDownloadAfterBlobstore", logId).detail("Elapsed", now() - startTime);
} else {
UNREACHABLE();
}
Expand All @@ -330,7 +359,21 @@ ACTOR Future<BulkLoadFileSet> bulkLoadDownloadTaskFileSet(BulkLoadTransportMetho
.detail("Duration", now() - startTime)
.detail("RetryCount", retryCount);
retryCount++;
wait(delay(5.0));
if (retryCount > SERVER_KNOBS->BULKLOAD_DOWNLOAD_MAX_RETRIES) {
TraceEvent(SevError, "SSBulkLoadTaskDownloadFileSetMaxRetriesExceeded", logId)
.errorUnsuppressed(e)
.detail("FromRemoteFileSet", fromRemoteFileSet.toString())
.detail("ToLocalRoot", toLocalRoot)
.detail("Duration", now() - startTime)
.detail("RetryCount", retryCount)
.detail("MaxRetries", SERVER_KNOBS->BULKLOAD_DOWNLOAD_MAX_RETRIES)
.detail("OriginalError", e.code())
.detail("OriginalErrorName", e.name());
// Throw bulkload_task_failed to signal fetchKeys that this is a retryable bulk load error
// This allows the data movement to be retried at the DD level instead of killing the SS
throw bulkload_task_failed();
}
wait(delay(SERVER_KNOBS->BULKLOAD_DOWNLOAD_RETRY_DELAY));
}
}
}
Expand All @@ -345,7 +388,19 @@ ACTOR Future<Void> bulkLoadDownloadTaskFileSets(BulkLoadTransportMethod transpor
for (; iter != fromRemoteFileSets->end(); iter++) {
keys = iter->first;
if (!iter->second.hasDataFile()) {
// Ignore the remote fileSet if it does not have data file
// For empty ranges (no data file), create an empty local fileSet entry so FetchKeys knows this range was
// processed
TraceEvent("BulkLoadDownloadSkipEmptyRange", logId)
.detail("Keys", keys)
.detail("Reason", "No data file for empty range");
// Create a local fileSet with the same structure but no data/sample files (empty range marker)
BulkLoadFileSet emptyLocalFileSet(toLocalRoot,
iter->second.getRelativePath(),
iter->second.getManifestFileName(),
"", // Empty data file name
"", // Empty sample file name
BulkLoadChecksum());
localFileSets->push_back(std::make_pair(keys, emptyLocalFileSet));
continue;
}
BulkLoadFileSet localFileSet =
Expand All @@ -361,8 +416,15 @@ ACTOR Future<Void> downloadManifestFile(BulkLoadTransportMethod transportMethod,
UID logId) {
state int retryCount = 0;
state double startTime = now();
TraceEvent(SevDebug, "BulkLoadDownloadManifestStart", logId)
.detail("FromRemotePath", fromRemotePath)
.detail("ToLocalPath", toLocalPath)
.detail("TransportMethod", transportMethod);
loop {
try {
TraceEvent(SevDebug, "BulkLoadDownloadManifestAttempt", logId)
.detail("RetryCount", retryCount)
.detail("Elapsed", now() - startTime);
if (transportMethod == BulkLoadTransportMethod::CP) {
wait(
copyBulkFile(abspath(fromRemotePath), abspath(toLocalPath), SERVER_KNOBS->BULKLOAD_FILE_BYTES_MAX));
Expand Down Expand Up @@ -397,7 +459,7 @@ ACTOR Future<Void> downloadManifestFile(BulkLoadTransportMethod transportMethod,
if (retryCount > 10) {
throw e;
}
wait(delay(5.0));
wait(delay(SERVER_KNOBS->BULKLOAD_DOWNLOAD_RETRY_DELAY));
}
}
return Void();
Expand Down
8 changes: 7 additions & 1 deletion fdbserver/KnobProtectiveGroups.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,13 @@ void KnobProtectiveGroup::assignKnobs(const KnobKeyValuePairs& overrideKnobs) {

for (const auto& [name, value] : overrideKnobs.getKnobs()) {
Standalone<KnobValueRef> valueRef = KnobValueRef::create(value);
ASSERT(mutableServerKnobs.trySetKnob(name, valueRef));
bool success = mutableServerKnobs.trySetKnob(name, valueRef);
if (!success) {
TraceEvent(SevError, "FailedToAssignKnob")
.detail("KnobName", name)
.detail("KnobValue", valueRef.toString());
}
ASSERT(success);
TraceEvent(SevInfo, "AssignKnobValue").detail("KnobName", name).detail("KnobValue", valueRef.toString());
}
}
8 changes: 5 additions & 3 deletions fdbserver/MockS3Server.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -775,15 +775,17 @@ class MockS3ServerImpl {
state std::string uploadId;
if (!existingUploadId.empty()) {
uploadId = existingUploadId;
// No need to persist - already exists and was persisted on first creation
} else {
MultipartUpload upload(bucket, object);
uploadId = upload.uploadId;
getGlobalStorage().multipartUploads[uploadId] = std::move(upload);
TraceEvent("MockS3MultipartStarted").detail("UploadId", uploadId);
}

if (getGlobalStorage().persistenceEnabled) {
wait(persistMultipartState(uploadId));
// Persist only the newly created upload
if (getGlobalStorage().persistenceEnabled) {
wait(persistMultipartState(uploadId));
}
}

// Generate XML response
Expand Down
12 changes: 8 additions & 4 deletions fdbserver/QuietDatabase.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -966,8 +966,10 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
int64_t maxStorageServerQueueGate = 5e6,
int64_t maxDataDistributionQueueSize = 0,
int64_t maxPoppedVersionLag = 30e6,
int64_t maxVersionOffset = 1e6) {
state QuietDatabaseChecker checker(isGeneralBuggifyEnabled() ? 4000.0 : 1500.0);
int64_t maxVersionOffset = 1e6,
double maxDDRunTime = 0) {
// Use provided maxDDRunTime, or fallback to default (1500s or 4000s with buggify)
state QuietDatabaseChecker checker(maxDDRunTime > 0 ? maxDDRunTime : (isGeneralBuggifyEnabled() ? 4000.0 : 1500.0));
state Future<Void> reconfig =
reconfigureAfter(cx, 100 + (deterministicRandom()->random01() * 100), dbInfo, "QuietDatabase");
state Future<int64_t> dataInFlight;
Expand Down Expand Up @@ -1124,7 +1126,8 @@ Future<Void> quietDatabase(Database const& cx,
int64_t maxStorageServerQueueGate,
int64_t maxDataDistributionQueueSize,
int64_t maxPoppedVersionLag,
int64_t maxVersionOffset) {
int64_t maxVersionOffset,
double maxDDRunTime) {
return waitForQuietDatabase(cx,
dbInfo,
phase,
Expand All @@ -1133,5 +1136,6 @@ Future<Void> quietDatabase(Database const& cx,
maxStorageServerQueueGate,
maxDataDistributionQueueSize,
maxPoppedVersionLag,
maxVersionOffset);
maxVersionOffset,
maxDDRunTime);
}
4 changes: 3 additions & 1 deletion fdbserver/include/fdbserver/workloads/workloads.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ class TestSpec {
Standalone<VectorRef<VectorRef<KeyValueRef>>> options;
int timeout;
double databasePingDelay;
double maxDDRunTime = 0; // Maximum Data Distributor run time before considered stuck (0 = use default)
bool runConsistencyCheck;
bool runConsistencyCheckOnCache;
bool runConsistencyCheckOnTSS;
Expand Down Expand Up @@ -385,7 +386,8 @@ Future<Void> quietDatabase(Database const& cx,
int64_t maxStorageServerQueueGate = 5e6,
int64_t maxDataDistributionQueueSize = 0,
int64_t maxPoppedVersionLag = 30e6,
int64_t maxVersionOffset = 1e6);
int64_t maxVersionOffset = 1e6,
double maxDDRunTime = 0);

/**
* A utility function for testing error situations. It succeeds if the given test
Expand Down
Loading