diff --git a/SConstruct b/SConstruct index f3aee00a55b1a..202ba301e8c71 100644 --- a/SConstruct +++ b/SConstruct @@ -296,6 +296,12 @@ add_option( nargs=0, ) +add_option( + 'enable-fcbis', + help='Enable file copy-based initial sync', + nargs=0, +) + add_option( 'full-featured', help='Enable all optional features', @@ -2704,6 +2710,10 @@ if has_option('enable-fipsmode') or has_option('full-featured'): env.SetConfigHeaderDefine("PERCONA_FIPSMODE_ENABLED") env['PSMDB_PRO_FEATURES'].append('FIPSMode') +if has_option('enable-fcbis') or has_option('full-featured'): + env.SetConfigHeaderDefine("PERCONA_FCBIS_ENABLED") + env['PSMDB_PRO_FEATURES'].append('FCBIS') + env.Tool('forceincludes') # ---- other build setup ----- diff --git a/jstests/replsets/fcbis-replication.js b/jstests/replsets/fcbis-replication.js new file mode 100644 index 0000000000000..5b80ed1a02d46 --- /dev/null +++ b/jstests/replsets/fcbis-replication.js @@ -0,0 +1,76 @@ +/** + * Tests that new node added via FCBIS works correctly as primary. + * + * @tags: [requires_wiredtiger] + */ +(function() { +'use strict'; + +load("jstests/replsets/rslib.js"); // For reconfig and isConfigCommitted. + +let addNodeConfig = function(rst, nodeId, conn) { + var config = rst.getReplSetConfigFromNode(); + config.version += 1; + config.members.push({_id: nodeId, host: conn.host}); + reconfig(rst, config); + assert.soon(() => isConfigCommitted(rst.getPrimary())); + rst.waitForConfigReplication(rst.getPrimary()); + rst.awaitReplication(); + return config; +}; + +const basenodes = 1; // <= Test will not hang if nodes > 1 + + +var rsname = 'fcbis_replset'; +var rs = new ReplSetTest({ + name: rsname, + nodes: basenodes, + nodeOptions: {verbose: 2}, +}); + +rs.startSet({ }); +rs.initiate(); + +// do fsync before FCBIS +assert.commandWorked(rs.getPrimary().adminCommand({fsync: 1})); +// assert.commandWorked(rs.getSecondary().adminCommand({fsync: 1})); + +// Add a new member that will undergo initial sync +let newNode = rs.add({ + rsConfig: {priority: 10}, + setParameter: { + 'initialSyncMethod': 'fileCopyBased', + //'initialSyncSourceReadPreference': 'primary', + }, + verbose: 2, +}); + +// wait for user input to be able to attach gdb before initial sync +//jsTest.log("--XXXX-- newNode: " + newNode.pid); +//print("Press Enter to continue"); +//let psw = passwordPrompt(); + +addNodeConfig(rs, basenodes + 1, newNode); +rs.waitForState(newNode, ReplSetTest.State.SECONDARY); +rs.waitForAllNewlyAddedRemovals(); + +jsTest.log("--XXXX-- Added new member"); + +// Output serverStatus for reference +jsTest.log("--XXXX-- newNode serverStatus: " + tojson(newNode.adminCommand({'serverStatus': 1, repl: 1}))); + +// Make the new member become primary +assert.commandWorked(newNode.adminCommand({replSetStepUp: 1})); +jsTest.log("--XXXX-- After replSetStepUp"); + +rs.awaitNodesAgreeOnPrimary(undefined, undefined, newNode); +jsTest.log("--XXXX-- All nodes agree on newNode being primary"); + +// BUG: This insert would not return and test would hang because of PSMDB-1589. This only happens when using FCBIS. +assert.commandWorked(rs.getPrimary().getDB('test').getCollection('foo').insert({x: 1})); // <= This will fail! +jsTest.log("--XXXX-- After insert on new member"); + +rs.stopSet(); +})(); + diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 7a1e06c9d0742..7526a06cd0889 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -78,6 +78,7 @@ config_header_substs = ( ('@mongo_config_grpc@', 'MONGO_CONFIG_GRPC'), ('@mongo_config_have_basic_stringbuf_str_rvalue@', 'MONGO_CONFIG_HAVE_BASIC_STRINGBUF_STR_RVALUE'), ('@percona_fipsmode_enabled@', 'PERCONA_FIPSMODE_ENABLED'), + ('@percona_fcbis_enabled@', 'PERCONA_FCBIS_ENABLED'), ) diff --git a/src/mongo/config.h.in b/src/mongo/config.h.in index 1a4c581c5a52b..435a4d91147f6 100644 --- a/src/mongo/config.h.in +++ b/src/mongo/config.h.in @@ -113,5 +113,8 @@ // FIPSMode enabled @percona_fipsmode_enabled@ +// FCBIS enabled +@percona_fcbis_enabled@ + // Defined if basic_stringbuf::str()&& overload exists @mongo_config_have_basic_stringbuf_str_rvalue@ diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 7383c806e57e8..e4fed4da7a6a7 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -2533,6 +2533,7 @@ env.Library( 'query/stats/stats', 'repl/drop_pending_collection_reaper', 'repl/initial_syncer', + 'repl/initial_syncer_fcb' if has_option('enable-fcbis') or has_option('full-featured') else [], 'repl/repl_coordinator_impl', 'repl/replication_recovery', 'repl/serveronly_repl', diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index a523cfb93b986..4a05b280c9ad7 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -244,6 +244,7 @@ env.Library( source=[ 'document_source_backup_cursor.cpp', 'document_source_backup_cursor_extend.cpp', + 'document_source_backup_file.cpp', ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/server_base', diff --git a/src/mongo/db/pipeline/document_source_backup_cursor.h b/src/mongo/db/pipeline/document_source_backup_cursor.h index 0d112cc67121b..2fa35c3fa56d0 100644 --- a/src/mongo/db/pipeline/document_source_backup_cursor.h +++ b/src/mongo/db/pipeline/document_source_backup_cursor.h @@ -59,7 +59,12 @@ class DocumentSourceBackupCursor : public DocumentSource { return true; } - void assertSupportsMultiDocumentTransaction() const { + ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level, + bool isImplicitDefault) const final { + return onlyReadConcernLocalSupported(kStageName, level, isImplicitDefault); + } + + void assertSupportsMultiDocumentTransaction() const final { transactionNotSupported(kStageName); } }; diff --git a/src/mongo/db/pipeline/document_source_backup_cursor_extend.h b/src/mongo/db/pipeline/document_source_backup_cursor_extend.h index b92040d9d9fd3..1905a1bd5d940 100644 --- a/src/mongo/db/pipeline/document_source_backup_cursor_extend.h +++ b/src/mongo/db/pipeline/document_source_backup_cursor_extend.h @@ -59,7 +59,12 @@ class DocumentSourceBackupCursorExtend : public DocumentSource { return true; } - void assertSupportsMultiDocumentTransaction() const { + ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level, + bool isImplicitDefault) const final { + return onlyReadConcernLocalSupported(kStageName, level, isImplicitDefault); + } + + void assertSupportsMultiDocumentTransaction() const final { transactionNotSupported(kStageName); } }; diff --git a/src/mongo/db/pipeline/document_source_backup_file.cpp b/src/mongo/db/pipeline/document_source_backup_file.cpp new file mode 100644 index 0000000000000..a18dd12782d90 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_backup_file.cpp @@ -0,0 +1,187 @@ +/*====== +This file is part of Percona Server for MongoDB. + +Copyright (C) 2024-present Percona and/or its affiliates. All rights reserved. + + This program is free software: you can redistribute it and/or modify + it under the terms of the Server Side Public License, version 1, + as published by MongoDB, Inc. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Server Side Public License for more details. + + You should have received a copy of the Server Side Public License + along with this program. If not, see + . + + As a special exception, the copyright holders give permission to link the + code of portions of this program with the OpenSSL library under certain + conditions as described in each individual source file and distribute + linked combinations including the program with the OpenSSL library. You + must comply with the Server Side Public License in all respects for + all of the code used other than as permitted herein. If you modify file(s) + with this exception, you may extend this exception to your version of the + file(s), but you are not obligated to do so. If you do not wish to do so, + delete this exception statement from your version. If you delete this + exception statement from all source files in the program, then also delete + it in the license file. +======= */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery + +#include "mongo/db/pipeline/document_source_backup_file.h" + +#include +#include +#include +#include + +#include "mongo/base/data_range.h" +#include "mongo/base/error_codes.h" +#include "mongo/base/string_data.h" +#include "mongo/bson/bsonmisc.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsontypes.h" +#include "mongo/db/exec/document_value/document.h" +#include "mongo/db/namespace_string.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/intrusive_counter.h" +#include "mongo/util/str.h" +#include "mongo/util/uuid.h" + +namespace mongo { + +namespace { +constexpr StringData kBackupId = "backupId"_sd; +constexpr StringData kFile = "file"_sd; +constexpr StringData kByteOffset = "byteOffset"_sd; + +// We only link this file into mongod so this stage doesn't exist in mongos +REGISTER_INTERNAL_DOCUMENT_SOURCE(_backupFile, + DocumentSourceBackupFile::LiteParsed::parse, + DocumentSourceBackupFile::createFromBson, + true); +} // namespace + +using boost::intrusive_ptr; + +std::unique_ptr DocumentSourceBackupFile::LiteParsed::parse( + const NamespaceString& nss, const BSONElement& spec) { + + return std::make_unique(spec.fieldName()); +} + +const char* DocumentSourceBackupFile::getSourceName() const { + return kStageName.rawData(); +} + +Value DocumentSourceBackupFile::serialize(const SerializationOptions& opts) const { + return Value{Document{{getSourceName(), + Document{{kBackupId, Value(_backupId)}, + {kFile, Value(_filePath)}, + {kByteOffset, Value(_byteOffset)}}}}}; +} + +DocumentSource::GetNextResult DocumentSourceBackupFile::doGetNext() { + if (_file.eof()) { + return GetNextResult::makeEOF(); + } + + auto byteOffset = _file.tellg(); + _file.read(_dataBuf.data(), kBlockSize); + uassert(ErrorCodes::FileStreamFailed, + str::stream() << "Error reading file " << _filePath << " at offset " << byteOffset, + !_file.bad()); + auto bytesRead = _file.gcount(); + auto eof = _file.eof(); + + Document doc; + doc = Document{{"byteOffset"_sd, static_cast(byteOffset)}, + {"data"_sd, BSONBinData(_dataBuf.data(), bytesRead, BinDataGeneral)}, + {"endOfFile"_sd, eof}}; + + return doc; +} + +intrusive_ptr DocumentSourceBackupFile::createFromBson( + BSONElement spec, const intrusive_ptr& pExpCtx) { + // This cursor is non-tailable so we don't touch pExpCtx->tailableMode here + + uassert(ErrorCodes::FailedToParse, + str::stream() << kStageName << " parameters must be specified in an object, but found: " + << typeName(spec.type()), + spec.type() == Object); + + auto backupId = UUID::fromCDR(std::array{}); + std::string filePath; + long long byteOffset = 0; + + for (auto&& elem : spec.embeddedObject()) { + const auto fieldName = elem.fieldNameStringData(); + + if (fieldName == kBackupId) { + uassert(ErrorCodes::TypeMismatch, + str::stream() << "The '" << fieldName << "' parameter of the " << kStageName + << " stage must be a binary data value, but found: " + << typeName(elem.type()), + elem.type() == BSONType::BinData); + backupId = uassertStatusOK(UUID::parse(elem)); + } else if (fieldName == kFile) { + uassert(ErrorCodes::TypeMismatch, + str::stream() << "The '" << fieldName << "' parameter of the " << kStageName + << " stage must be a string value, but found: " + << typeName(elem.type()), + elem.type() == BSONType::String); + filePath = elem.String(); + } else if (fieldName == kByteOffset) { + uassert(ErrorCodes::TypeMismatch, + str::stream() << "The '" << fieldName << "' parameter of the " << kStageName + << " stage must be a long integer value, but found: " + << typeName(elem.type()), + elem.type() == BSONType::NumberLong); + byteOffset = elem.Long(); + } else { + uasserted(ErrorCodes::FailedToParse, + str::stream() << "Unrecognized option '" << fieldName << "' in " << kStageName + << " stage"); + } + } + + uassert(ErrorCodes::InvalidOptions, + str::stream() << "'" << kByteOffset << "' parameter cannot be less than zero", + byteOffset >= 0); + + std::ifstream iFile(filePath, std::ios_base::in | std::ios_base::binary); + uassert(ErrorCodes::FileOpenFailed, + str::stream() << "Failed to open file " << filePath, + iFile.is_open()); + iFile.seekg(byteOffset); + uassert(ErrorCodes::FileOpenFailed, + str::stream() << "Failed to set read position " << byteOffset << " in file " + << filePath, + !iFile.fail()); + invariant(byteOffset == iFile.tellg()); + + return make_intrusive( + pExpCtx, backupId, std::move(filePath), byteOffset, std::move(iFile)); +} + +DocumentSourceBackupFile::DocumentSourceBackupFile(const intrusive_ptr& expCtx, + UUID backupId, + std::string filePath, + long long byteOffset, + std::ifstream file) + : DocumentSource(kStageName, expCtx), + _dataBuf(), + _backupId(backupId), + _filePath(std::move(filePath)), + _byteOffset(byteOffset), + _file(std::move(file)) {} + +DocumentSourceBackupFile::~DocumentSourceBackupFile() { + _file.close(); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_backup_file.h b/src/mongo/db/pipeline/document_source_backup_file.h new file mode 100644 index 0000000000000..2fbd0e7b0a820 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_backup_file.h @@ -0,0 +1,139 @@ +/*====== +This file is part of Percona Server for MongoDB. + +Copyright (C) 2024-present Percona and/or its affiliates. All rights reserved. + + This program is free software: you can redistribute it and/or modify + it under the terms of the Server Side Public License, version 1, + as published by MongoDB, Inc. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Server Side Public License for more details. + + You should have received a copy of the Server Side Public License + along with this program. If not, see + . + + As a special exception, the copyright holders give permission to link the + code of portions of this program with the OpenSSL library under certain + conditions as described in each individual source file and distribute + linked combinations including the program with the OpenSSL library. You + must comply with the Server Side Public License in all respects for + all of the code used other than as permitted herein. If you modify file(s) + with this exception, you may extend this exception to your version of the + file(s), but you are not obligated to do so. If you do not wish to do so, + delete this exception statement from your version. If you delete this + exception statement from all source files in the program, then also delete + it in the license file. +======= */ + +#pragma once + +#include +#include +#include +#include +#include + +#include + +#include "mongo/base/string_data.h" +#include "mongo/bson/bsonelement.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/lite_parsed_document_source.h" +#include "mongo/stdx/unordered_set.h" +#include "mongo/util/uuid.h" + +namespace mongo { + +class DocumentSourceBackupFile final : public DocumentSource { +public: + static constexpr StringData kStageName = "$_backupFile"_sd; + + class LiteParsed final : public LiteParsedDocumentSource { + public: + using LiteParsedDocumentSource::LiteParsedDocumentSource; + + static std::unique_ptr parse(const NamespaceString& nss, + const BSONElement& spec); + + stdx::unordered_set getInvolvedNamespaces() const final { + return {}; + } + + PrivilegeVector requiredPrivileges( + [[maybe_unused]] bool isMongos, + [[maybe_unused]] bool bypassDocumentValidation) const final { + return {Privilege(ResourcePattern::forClusterResource(), ActionType::fsync)}; + } + + bool isInitialSource() const final { + return true; + } + + void assertSupportsMultiDocumentTransaction() const final { + transactionNotSupported(kStageName); + } + }; + + /** + * Parses a $_backupFile stage from 'spec'. + */ + static boost::intrusive_ptr createFromBson( + BSONElement spec, const boost::intrusive_ptr& pCtx); + + DocumentSourceBackupFile(const boost::intrusive_ptr& expCtx, + UUID backupId, + std::string filePath, + long long byteOffset, + std::ifstream file); + + DocumentSourceBackupFile(const DocumentSourceBackupFile&) = delete; + DocumentSourceBackupFile& operator=(const DocumentSourceBackupFile&) = delete; + DocumentSourceBackupFile(DocumentSourceBackupFile&&) = delete; + DocumentSourceBackupFile& operator=(DocumentSourceBackupFile&&) = delete; + + ~DocumentSourceBackupFile() override; + + const char* getSourceName() const override; + + StageConstraints constraints([[maybe_unused]] Pipeline::SplitState pipeState) const override { + StageConstraints constraints{StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, + LookupRequirement::kAllowed, + UnionRequirement::kNotAllowed, + ChangeStreamRequirement::kDenylist}; + constraints.isIndependentOfAnyCollection = true; + constraints.requiresInputDocSource = false; + return constraints; + } + + Value serialize(const SerializationOptions& opts = SerializationOptions()) const final; + + boost::optional distributedPlanLogic() final { + return boost::none; + } + + void addVariableRefs(std::set* refs) const final {} + +protected: + GetNextResult doGetNext() override; + +private: + static constexpr std::streamsize kBlockSize = 1 << 20; + + std::array _dataBuf; + const UUID _backupId; + const std::string _filePath; + const long long _byteOffset; + std::ifstream _file; +}; + +} // namespace mongo diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index dcb7643d0a192..1bbd4d0e07c42 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1283,6 +1283,30 @@ env.Library( ], ) +env.Library( + target='initial_syncer_fcb', + source=[ + 'fcb_file_cloner.cpp', + 'initial_syncer_fcb.cpp', + ], + LIBDEPS=[ + 'repl_sync_shared_data', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/catalog/catalog_control', + '$BUILD_DIR/mongo/db/index_builds_coordinator_interface', + '$BUILD_DIR/mongo/db/serverless/serverless_lock', + '$BUILD_DIR/mongo/db/startup_recovery', + '$BUILD_DIR/mongo/db/storage/storage_engine_common', + '$BUILD_DIR/mongo/executor/scoped_task_executor', + '$BUILD_DIR/mongo/util/progress_meter', + 'drop_pending_collection_reaper', + 'initial_syncer', + 'replication_auth', + 'serveronly_repl', + ], +) + env.Library( target='rollback_checker', source=[ diff --git a/src/mongo/db/repl/fcb_file_cloner.cpp b/src/mongo/db/repl/fcb_file_cloner.cpp new file mode 100644 index 0000000000000..30b7baf4a827a --- /dev/null +++ b/src/mongo/db/repl/fcb_file_cloner.cpp @@ -0,0 +1,312 @@ +/*====== +This file is part of Percona Server for MongoDB. + +Copyright (C) 2024-present Percona and/or its affiliates. All rights reserved. + + This program is free software: you can redistribute it and/or modify + it under the terms of the Server Side Public License, version 1, + as published by MongoDB, Inc. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Server Side Public License for more details. + + You should have received a copy of the Server Side Public License + along with this program. If not, see + . + + As a special exception, the copyright holders give permission to link the + code of portions of this program with the OpenSSL library under certain + conditions as described in each individual source file and distribute + linked combinations including the program with the OpenSSL library. You + must comply with the Server Side Public License in all respects for + all of the code used other than as permitted herein. If you modify file(s) + with this exception, you may extend this exception to your version of the + file(s), but you are not obligated to do so. If you do not wish to do so, + delete this exception statement from your version. If you delete this + exception statement from all source files in the program, then also delete + it in the license file. +======= */ + +#include "fcb_file_cloner.h" + +#include +#include + +#include + +#include "mongo/base/string_data.h" +#include "mongo/bson/bsonelement.h" +#include "mongo/bson/bsonmisc.h" +#include "mongo/bson/bsontypes.h" +#include "mongo/client/dbclient_connection.h" +#include "mongo/db/database_name.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/pipeline/aggregate_command_gen.h" +#include "mongo/db/pipeline/aggregation_request_helper.h" +#include "mongo/db/repl/read_concern_args.h" +#include "mongo/db/storage/storage_options.h" +#include "mongo/db/write_concern_options.h" +#include "mongo/platform/mutex.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/clock_source.h" +#include "mongo/util/concurrency/with_lock.h" +#include "mongo/util/net/hostandport.h" +#include "mongo/util/str.h" + + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplicationInitialSync + + +namespace mongo::repl { + +FCBFileCloner::FCBFileCloner(const UUID& backupId, + const std::string& remoteFileName, + size_t remoteFileSize, + const std::string& relativePath, + InitialSyncSharedData* sharedData, + const HostAndPort& source, + DBClientConnection* client, + StorageInterface* storageInterface, + ThreadPool* dbPool) + : BaseCloner("FCBFileCloner"_sd, sharedData, source, client, storageInterface, dbPool), + _backupId(backupId), + _remoteFileName(remoteFileName), + _remoteFileSize(remoteFileSize), + _relativePathString(relativePath), + _queryStage("query", this, &FCBFileCloner::queryStage), + _fsWorkTaskRunner(dbPool), + _scheduleFsWorkFn([this](executor::TaskExecutor::CallbackFn work) { + auto task = [this, work = std::move(work)]( + OperationContext* opCtx, + const Status& status) mutable noexcept -> TaskRunner::NextAction { + try { + work(executor::TaskExecutor::CallbackArgs(nullptr, {}, status, opCtx)); + } catch (const DBException& e) { + setSyncFailedStatus(e.toStatus()); + } + return TaskRunner::NextAction::kDisposeOperationContext; + }; + _fsWorkTaskRunner.schedule(std::move(task)); + return executor::TaskExecutor::CallbackHandle(); + }), + _progressMeter(remoteFileSize, + kProgressMeterSecondsBetween, + kProgressMeterCheckInterval, + "bytes copied", + str::stream() << _remoteFileName << " FCB file clone progress") { + _stats.filePath = _relativePathString; + _stats.fileSize = _remoteFileSize; +} + +BaseCloner::ClonerStages FCBFileCloner::getStages() { + return {&_queryStage}; +} + +void FCBFileCloner::preStage() { + stdx::lock_guard lk(_mutex); + _stats.start = getSharedData()->getClock()->now(); + + // Construct local path name from the relative path and the temp dbpath. + boost::filesystem::path relativePath(_relativePathString); + uassert(6113300, + str::stream() << "Path " << _relativePathString << " should be a relative path", + relativePath.is_relative()); + + boost::filesystem::path syncTargetTempDBPath{storageGlobalParams.dbpath}; + syncTargetTempDBPath /= ".initialsync"; + _localFilePath = syncTargetTempDBPath; + + _localFilePath /= relativePath; + _localFilePath = _localFilePath.lexically_normal(); + uassert(6113301, + str::stream() << "Path " << _relativePathString + << " must not escape its parent directory.", + StringData(_localFilePath.generic_string()) + .startsWith(syncTargetTempDBPath.generic_string())); + + // Create and open files and any parent directories. + if (boost::filesystem::exists(_localFilePath)) { + LOGV2(6113302, + "Local file exists at start of FCBFileCloner; truncating.", + "localFilePath"_attr = _localFilePath.string()); + } else { + auto localFileDir = _localFilePath.parent_path(); + boost::system::error_code ec; + boost::filesystem::create_directories(localFileDir, ec); + uassert(6113303, + str::stream() << "Failed to create directory " << localFileDir.string() << " Error " + << ec.message(), + !ec); + } + _localFile.open(_localFilePath.string(), + std::ios_base::out | std::ios_base::binary | std::ios_base::trunc); + uassert(ErrorCodes::FileOpenFailed, + str::stream() << "Failed to open file " << _localFilePath.string(), + !_localFile.fail()); + _fileOffset = 0; +} + +void FCBFileCloner::postStage() { + _localFile.close(); + stdx::lock_guard lk(_mutex); + _stats.end = getSharedData()->getClock()->now(); +} + +BaseCloner::AfterStageBehavior FCBFileCloner::queryStage() { + // Since the query stage may be re-started, we need to make sure all the file system work + // from the previous run is done before running the query again. + waitForFilesystemWorkToComplete(); + _sawEof = false; + runQuery(); + waitForFilesystemWorkToComplete(); + uassert( + 6113304, + str::stream() + << "Received entire file, but did not get end of file marker. File may be incomplete " + << _localFilePath.string(), + _sawEof); + return kContinueNormally; +} + +size_t FCBFileCloner::getFileOffset() { + stdx::lock_guard lk(_mutex); + return _fileOffset; +} + +void FCBFileCloner::runQuery() { + auto backupFileStage = BSON( + "$_backupFile" << BSON("backupId" << _backupId << "file" << _remoteFileName << "byteOffset" + << static_cast(getFileOffset()))); + AggregateCommandRequest aggRequest( + NamespaceString::makeCollectionlessAggregateNSS(DatabaseName::kAdmin), {backupFileStage}); + aggRequest.setReadConcern(ReadConcernArgs::kImplicitDefault); + aggRequest.setWriteConcern(WriteConcernOptions()); + + LOGV2_DEBUG(6113305, + 2, + "FCBFileCloner running aggregation", + "source"_attr = getSource(), + "aggRequest"_attr = aggregation_request_helper::serializeToCommandObj(aggRequest)); + const bool useExhaust = + true; // TODO: !MONGO_unlikely(FCBFileClonerDisableExhaust.shouldFail()); + std::unique_ptr cursor = uassertStatusOK(DBClientCursor::fromAggregationRequest( + getClient(), std::move(aggRequest), true /* secondaryOk */, useExhaust)); + try { + while (cursor->more()) { + handleNextBatch(*cursor); + } + } catch (const DBException& e) { + // We cannot continue after an error when processing exhaust cursors. Instead we must + // reconnect, which is handled by the BaseCloner. + LOGV2_DEBUG(6113306, + 1, + "FCBFileCloner received an exception while downloading data", + "error"_attr = e.toStatus(), + "source"_attr = getSource(), + "backupId"_attr = _backupId, + "remoteFile"_attr = _remoteFileName, + "fileOffset"_attr = getFileOffset()); + getClient()->shutdown(); + throw; + } +} + +void FCBFileCloner::handleNextBatch(DBClientCursor& cursor) { + LOGV2_DEBUG(6113307, + 4, + "FCBFileCloner handleNextBatch", + "source"_attr = getSource(), + "backupId"_attr = _backupId, + "remoteFile"_attr = _remoteFileName, + "fileOffset"_attr = getFileOffset()); + { + stdx::lock_guard lk(*getSharedData()); + if (!getSharedData()->getStatus(lk).isOK()) { + static constexpr char const* message = + "BackupFile cloning cancelled due to cloning failure"; + LOGV2(6113323, message, "error"_attr = getSharedData()->getStatus(lk)); + uasserted(ErrorCodes::CallbackCanceled, + str::stream() << message << ": " << getSharedData()->getStatus(lk)); + } + } + { + stdx::lock_guard lk(_mutex); + _stats.receivedBatches++; + while (cursor.moreInCurrentBatch()) { + _dataToWrite.emplace_back(cursor.nextSafe()); + } + } + + // Schedule the next set of writes. + auto&& scheduleResult = + _scheduleFsWorkFn([this](const executor::TaskExecutor::CallbackArgs& cbd) { + writeDataToFilesystemCallback(cbd); + }); + + if (!scheduleResult.isOK()) { + Status newStatus = scheduleResult.getStatus().withContext( + str::stream() << "Error copying file '" << _remoteFileName << "'"); + // We must throw an exception to terminate query. + uassertStatusOK(newStatus); + } +} + +void FCBFileCloner::writeDataToFilesystemCallback(const executor::TaskExecutor::CallbackArgs& cbd) { + LOGV2_DEBUG(6113309, + 4, + "FCBFileCloner writeDataToFilesystemCallback", + "backupId"_attr = _backupId, + "remoteFile"_attr = _remoteFileName, + "localFile"_attr = _localFilePath.string(), + "fileOffset"_attr = getFileOffset()); + uassertStatusOK(cbd.status); + { + stdx::lock_guard lk(_mutex); + if (_dataToWrite.empty()) { + LOGV2_WARNING(6113310, + "writeDataToFilesystemCallback, but no data to write", + "remoteFile"_attr = _remoteFileName); + } + for (const auto& doc : _dataToWrite) { + uassert(6113311, + str::stream() << "Saw multiple end-of-file-markers in file " << _remoteFileName, + !_sawEof); + // Received file data should always be in sync with the stream and where we think + // our next input should be coming from. + const auto byteOffset = doc["byteOffset"].safeNumberLong(); + invariant(byteOffset == _localFile.tellp()); + invariant(byteOffset == _fileOffset); + const auto& dataElem = doc["data"]; + uassert(6113312, + str::stream() << "Expected file data to be type BinDataGeneral. " << doc, + dataElem.type() == BinData && dataElem.binDataType() == BinDataGeneral); + int dataLength = 0; + const char* data = dataElem.binData(dataLength); + _localFile.write(data, dataLength); + uassert(ErrorCodes::FileStreamFailed, + str::stream() << "Unable to write file data for file " << _remoteFileName + << " at offset " << _fileOffset, + !_localFile.fail()); + _progressMeter.hit(dataLength); + _fileOffset += dataLength; + _stats.bytesCopied += dataLength; + _sawEof = doc["endOfFile"].booleanSafe(); + } + _dataToWrite.clear(); + _stats.writtenBatches++; + } +} + +void FCBFileCloner::waitForFilesystemWorkToComplete() { + _fsWorkTaskRunner.join(); +} + +logv2::LogComponent FCBFileCloner::getLogComponent() { + return logv2::LogComponent::kReplicationInitialSync; +} + +} // namespace mongo::repl diff --git a/src/mongo/db/repl/fcb_file_cloner.h b/src/mongo/db/repl/fcb_file_cloner.h new file mode 100644 index 0000000000000..d12f458352575 --- /dev/null +++ b/src/mongo/db/repl/fcb_file_cloner.h @@ -0,0 +1,228 @@ +/*====== +This file is part of Percona Server for MongoDB. + +Copyright (C) 2024-present Percona and/or its affiliates. All rights reserved. + + This program is free software: you can redistribute it and/or modify + it under the terms of the Server Side Public License, version 1, + as published by MongoDB, Inc. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Server Side Public License for more details. + + You should have received a copy of the Server Side Public License + along with this program. If not, see + . + + As a special exception, the copyright holders give permission to link the + code of portions of this program with the OpenSSL library under certain + conditions as described in each individual source file and distribute + linked combinations including the program with the OpenSSL library. You + must comply with the Server Side Public License in all respects for + all of the code used other than as permitted herein. If you modify file(s) + with this exception, you may extend this exception to your version of the + file(s), but you are not obligated to do so. If you do not wish to do so, + delete this exception statement from your version. If you delete this + exception statement from all source files in the program, then also delete + it in the license file. +======= */ + + +#pragma once + +#include +#include +#include + +#include + +#include "mongo/base/checked_cast.h" +#include "mongo/base/error_codes.h" +#include "mongo/base/status.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/client/dbclient_connection.h" +#include "mongo/client/dbclient_cursor.h" +#include "mongo/db/repl/base_cloner.h" +#include "mongo/db/repl/initial_sync_shared_data.h" +#include "mongo/db/repl/storage_interface.h" +#include "mongo/db/repl/task_runner.h" +#include "mongo/executor/task_executor.h" +#include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/functional.h" +#include "mongo/util/net/hostandport.h" +#include "mongo/util/progress_meter.h" +#include "mongo/util/time_support.h" +#include "mongo/util/uuid.h" + + +namespace mongo::repl { + +class FCBFileCloner final : public BaseCloner { +public: + struct Stats { + std::string filePath; + size_t fileSize; + Date_t start; + Date_t end; + size_t receivedBatches{0}; + size_t writtenBatches{0}; + size_t bytesCopied{0}; + + std::string toString() const; + BSONObj toBSON() const; + void append(BSONObjBuilder* builder) const; + }; + + /** + * Type of function to schedule file system tasks with the executor. + */ + using ScheduleFsWorkFn = unique_function( + executor::TaskExecutor::CallbackFn)>; + + /** + * Constructor for FCBFileCloner + * + * remoteFileName: Path of file to copy on remote system. + * remoteFileSize: Size of remote file in bytes, used for progress messages and stats only. + * relativePath: Path of file relative to dbpath on the remote system, as a + * boost::filesystem::path generic path. + */ + FCBFileCloner(const UUID& backupId, + const std::string& remoteFileName, + size_t remoteFileSize, + const std::string& relativePath, + InitialSyncSharedData* sharedData, + const HostAndPort& source, + DBClientConnection* client, + StorageInterface* storageInterface, + ThreadPool* dbPool); + + ~FCBFileCloner() override = default; + + /** + * Waits for any file system work to finish or fail. + */ + void waitForFilesystemWorkToComplete(); + +protected: + InitialSyncSharedData* getSharedData() const override { + return checked_cast(BaseCloner::getSharedData()); + } + + ClonerStages getStages() final; + +private: + class FCBFileClonerQueryStage : public ClonerStage { + public: + FCBFileClonerQueryStage(std::string name, FCBFileCloner* cloner, ClonerRunFn stageFunc) + : ClonerStage(std::move(name), cloner, stageFunc) {} + + bool checkSyncSourceValidityOnRetry() override { + // Sync source validity is assured by the backup ID not existing if the sync source + // is restarted or otherwise becomes invalid. + return false; + } + + bool isTransientError(const Status& status) override { + if (isCursorError(status)) { + return true; + } + return ErrorCodes::isRetriableError(status); + } + + static bool isCursorError(const Status& status) { + // Our cursor was killed on the sync source. + return (status == ErrorCodes::CursorNotFound) || + (status == ErrorCodes::OperationFailed) || (status == ErrorCodes::QueryPlanKilled); + } + }; + + /** + * Overriden to allow the BaseCloner to use the initial syncer log component. + */ + logv2::LogComponent getLogComponent() override; + + // TODO: do we need Stats/getStats in this class? + /** + * The preStage sets the begin time in _stats and makes sure the destination file + * can be created. + */ + void preStage() final; + + /** + * The postStage sets the end time in _stats. + */ + void postStage() final; + + + /** + * Stage function that executes a query to retrieve the file data. For each + * batch returned by the upstream node, handleNextBatch will be called with the data. This + * stage will finish when the entire query is finished or failed. + */ + AfterStageBehavior queryStage(); + + /** + * Put all results from a query batch into a buffer, and schedule it to be written to disk. + */ + void handleNextBatch(DBClientCursor& cursor); + + /** + * Called whenever there is a new batch of documents ready from the DBClientConnection. + * + * Each document returned will be inserted via the storage interfaceRequest storage + * interface. + */ + void writeDataToFilesystemCallback(const executor::TaskExecutor::CallbackArgs& cbd); + + /** + * Sends an (aggregation) query command to the source. That query command with be parameterized + * based on copy progress. + */ + void runQuery(); + + /** + * Convenience call to get the file offset under a lock. + */ + size_t getFileOffset(); + + // All member variables are labeled with one of the following codes indicating the + // synchronization rules for accessing them. + // + // (R) Read-only in concurrent operation; no synchronization required. + // (S) Self-synchronizing; access according to class's own rules. + // (M) Reads and writes guarded by _mutex (defined in base class). + // (X) Access only allowed from the main flow of control called from run() or constructor. + const UUID _backupId; // (R) + const std::string _remoteFileName; // (R) + size_t _remoteFileSize; // (R) + const std::string _relativePathString; // (R) + boost::filesystem::path _localFilePath; // (X) + + FCBFileClonerQueryStage _queryStage; // (R) + + std::ofstream _localFile; // (M) + // File offset we will request from the remote side in the next query. + off_t _fileOffset = 0; // (M) + bool _sawEof = false; // (X) + + // Data read from source to insert. + std::vector _dataToWrite; // (M) + // Putting _fsWorkTaskRunner last ensures anything the database work threads depend on + // like _dataToWrite, is destroyed after those threads exit. + TaskRunner _fsWorkTaskRunner; // (R) + // Function for scheduling filesystem work using the executor. + ScheduleFsWorkFn _scheduleFsWorkFn; // (R) + + ProgressMeter _progressMeter; // (X) progress meter for this instance. + Stats _stats; // (M) + + static constexpr int kProgressMeterSecondsBetween = 60; + static constexpr int kProgressMeterCheckInterval = 128; +}; + +} // namespace mongo::repl diff --git a/src/mongo/db/repl/initial_syncer_fcb.cpp b/src/mongo/db/repl/initial_syncer_fcb.cpp new file mode 100644 index 0000000000000..766a1463f3e2a --- /dev/null +++ b/src/mongo/db/repl/initial_syncer_fcb.cpp @@ -0,0 +1,2050 @@ +/*====== +This file is part of Percona Server for MongoDB. + +Copyright (C) 2024-present Percona and/or its affiliates. All rights reserved. + + This program is free software: you can redistribute it and/or modify + it under the terms of the Server Side Public License, version 1, + as published by MongoDB, Inc. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Server Side Public License for more details. + + You should have received a copy of the Server Side Public License + along with this program. If not, see + . + + As a special exception, the copyright holders give permission to link the + code of portions of this program with the OpenSSL library under certain + conditions as described in each individual source file and distribute + linked combinations including the program with the OpenSSL library. You + must comply with the Server Side Public License in all respects for + all of the code used other than as permitted herein. If you modify file(s) + with this exception, you may extend this exception to your version of the + file(s), but you are not obligated to do so. If you do not wish to do so, + delete this exception statement from your version. If you delete this + exception statement from all source files in the program, then also delete + it in the license file. +======= */ + +#include "initial_syncer_fcb.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "mongo/base/error_codes.h" +#include "mongo/base/status.h" +#include "mongo/bson/bsonmisc.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/timestamp.h" +#include "mongo/client/dbclient_cursor.h" +#include "mongo/client/fetcher.h" +#include "mongo/client/remote_command_retry_scheduler.h" +#include "mongo/db/catalog/catalog_control.h" +#include "mongo/db/client.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/database_name.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/dbhelpers.h" +#include "mongo/db/feature_compatibility_version_parser.h" +#include "mongo/db/index_builds_coordinator.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/pipeline/aggregate_command_gen.h" +#include "mongo/db/repl/all_database_cloner.h" +#include "mongo/db/repl/drop_pending_collection_reaper.h" +#include "mongo/db/repl/fcb_file_cloner.h" +#include "mongo/db/repl/initial_sync_state.h" +#include "mongo/db/repl/initial_syncer_common_stats.h" +#include "mongo/db/repl/initial_syncer_factory.h" +#include "mongo/db/repl/initial_syncer_interface.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/repl_server_parameters_gen.h" +#include "mongo/db/repl/replication_auth.h" +#include "mongo/db/repl/replication_consistency_markers.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/replication_coordinator_external_state_impl.h" +#include "mongo/db/repl/replication_process.h" +#include "mongo/db/repl/storage_interface.h" +#include "mongo/db/repl/sync_source_selector.h" +#include "mongo/db/repl/tenant_migration_access_blocker_util.h" +#include "mongo/db/repl/transaction_oplog_application.h" +#include "mongo/db/server_options.h" +#include "mongo/db/server_recovery.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" +#include "mongo/db/service_context.h" +#include "mongo/db/startup_recovery.h" +#include "mongo/db/storage/storage_engine.h" +#include "mongo/db/storage/storage_engine_init.h" +#include "mongo/db/storage/storage_options.h" +#include "mongo/executor/remote_command_request.h" +#include "mongo/executor/task_executor.h" +#include "mongo/logv2/log.h" +#include "mongo/platform/compiler.h" // IWYU pragma: keep +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/destructor_guard.h" +#include "mongo/util/fail_point.h" +#include "mongo/util/scopeguard.h" +#include "mongo/util/str.h" +#include "mongo/util/time_support.h" +#include "mongo/util/timer.h" +#include "mongo/util/version/releases.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplicationInitialSync + + +namespace mongo { +namespace repl { + +// Failpoint for initial sync +extern FailPoint failInitialSyncWithBadHost; + +// Failpoint which causes the initial sync function to hang before creating shared data and +// splitting control flow between the oplog fetcher and the cloners. +extern FailPoint initialSyncHangBeforeSplittingControlFlow; + +// Failpoint which causes the initial sync function to hang before copying databases. +extern FailPoint initialSyncHangBeforeCopyingDatabases; + +// Failpoint which causes the initial sync function to hang before finishing. +extern FailPoint initialSyncHangBeforeFinish; + +// Failpoint which causes the initial sync function to hang before creating the oplog. +extern FailPoint initialSyncHangBeforeCreatingOplog; + +// Failpoint which skips clearing _initialSyncState after a successful initial sync attempt. +extern FailPoint skipClearInitialSyncState; + +// Failpoint which causes the initial sync function to fail and hang before starting a new attempt. +extern FailPoint failAndHangInitialSync; + +// Failpoint which causes the initial sync function to hang before choosing a sync source. +extern FailPoint initialSyncHangBeforeChoosingSyncSource; + +// Failpoint which causes the initial sync function to hang after finishing. +extern FailPoint initialSyncHangAfterFinish; + +// Failpoint which causes the initial sync function to hang after cloning files. +MONGO_FAIL_POINT_DEFINE(initialSyncHangAfterCloningFiles); + +namespace { +using namespace executor; +using CallbackArgs = executor::TaskExecutor::CallbackArgs; +using Event = executor::TaskExecutor::EventHandle; +using Handle = executor::TaskExecutor::CallbackHandle; +using QueryResponseStatus = StatusWith; +using UniqueLock = stdx::unique_lock; +using LockGuard = stdx::lock_guard; + +constexpr StringData kMetadataFieldName = "metadata"_sd; +constexpr StringData kBackupIdFieldName = "backupId"_sd; +constexpr StringData kDBPathFieldName = "dbpath"_sd; +constexpr StringData kFileNameFieldName = "filename"_sd; +constexpr StringData kFileSizeFieldName = "fileSize"_sd; + +// Used to reset the oldest timestamp during initial sync to a non-null timestamp. +const Timestamp kTimestampOne(0, 1); + +ServiceContext::UniqueOperationContext makeOpCtx() { + return cc().makeOperationContext(); +} + +/** + * Computes a boost::filesystem::path generic-style relative path (always uses slashes) + * from a base path and a relative path. + */ +std::string getPathRelativeTo(const std::string& path, const std::string& basePath) { + if (basePath.empty() || path.find(basePath) != 0) { + uasserted(6113319, + str::stream() << "The file " << path << " is not a subdirectory of " << basePath); + } + + auto result = path.substr(basePath.size()); + // Skip separators at the beginning of the relative part. + if (!result.empty() && (result[0] == '/' || result[0] == '\\')) { + result.erase(result.begin()); + } + + std::replace(result.begin(), result.end(), '\\', '/'); + return result; +} +} // namespace + +const ServiceContext::ConstructorActionRegisterer initialSyncerRegistererFCB( + "InitialSyncerRegistererFCB", + {"InitialSyncerFactoryRegisterer"} /* dependency list */, + [](ServiceContext* service) { + InitialSyncerFactory::get(service)->registerInitialSyncer( + "fileCopyBased", + [](InitialSyncerInterface::Options opts, + std::unique_ptr dataReplicatorExternalState, + ThreadPool* writerPool, + StorageInterface* storage, + ReplicationProcess* replicationProcess, + const InitialSyncerInterface::OnCompletionFn& onCompletion) { + return std::make_shared(opts, + std::move(dataReplicatorExternalState), + writerPool, + storage, + replicationProcess, + onCompletion); + }); + }); + +InitialSyncerFCB::InitialSyncerFCB( + InitialSyncerInterface::Options opts, + std::unique_ptr dataReplicatorExternalState, + ThreadPool* writerPool, + StorageInterface* storage, + ReplicationProcess* replicationProcess, + const OnCompletionFn& onCompletion) + : _fetchCount(0), + _opts(opts), + _dataReplicatorExternalState(std::move(dataReplicatorExternalState)), + _exec(_dataReplicatorExternalState->getSharedTaskExecutor()), + _clonerExec(_exec), + _writerPool(writerPool), + _storage(storage), + _replicationProcess(replicationProcess), + _backupId(UUID::fromCDR(std::array{})), + _cfgDBPath(storageGlobalParams.dbpath), + _onCompletion(onCompletion), + _createClientFn( + [] { return std::make_unique(true /* autoReconnect */); }) { + uassert(ErrorCodes::BadValue, "task executor cannot be null", _exec); + uassert(ErrorCodes::BadValue, "invalid storage interface", _storage); + uassert(ErrorCodes::BadValue, "invalid replication process", _replicationProcess); + uassert(ErrorCodes::BadValue, "invalid getMyLastOptime function", _opts.getMyLastOptime); + uassert(ErrorCodes::BadValue, "invalid setMyLastOptime function", _opts.setMyLastOptime); + uassert(ErrorCodes::BadValue, "invalid resetOptimes function", _opts.resetOptimes); + uassert(ErrorCodes::BadValue, "invalid sync source selector", _opts.syncSourceSelector); + uassert(ErrorCodes::BadValue, "callback function cannot be null", _onCompletion); +} + +InitialSyncerFCB::~InitialSyncerFCB() { + DESTRUCTOR_GUARD({ + shutdown().transitional_ignore(); + join(); + }); +} + +bool InitialSyncerFCB::isActive() const { + stdx::lock_guard lock(_mutex); + return _isActive_inlock(); +} + +bool InitialSyncerFCB::_isActive_inlock() const { + return State::kRunning == _state || State::kShuttingDown == _state; +} + +std::string InitialSyncerFCB::getInitialSyncMethod() const { + return "fileCopyBased"; +} + +Status InitialSyncerFCB::startup(OperationContext* opCtx, + std::uint32_t initialSyncMaxAttempts) noexcept { + invariant(opCtx); + invariant(initialSyncMaxAttempts >= 1U); + + stdx::lock_guard lock(_mutex); + switch (_state) { + case State::kPreStart: + _state = State::kRunning; + break; + case State::kRunning: + return {ErrorCodes::IllegalOperation, "initial syncer already started"}; + case State::kShuttingDown: + return {ErrorCodes::ShutdownInProgress, "initial syncer shutting down"}; + case State::kComplete: + return {ErrorCodes::ShutdownInProgress, "initial syncer completed"}; + } + + _setUp_inlock(opCtx, initialSyncMaxAttempts); + + // Start first initial sync attempt. + std::uint32_t initialSyncAttempt = 0; + _attemptExec = std::make_unique( + _exec, Status(ErrorCodes::CallbackCanceled, "Initial Sync Attempt Canceled")); + _clonerAttemptExec = std::make_unique( + _clonerExec, Status(ErrorCodes::CallbackCanceled, "Initial Sync Attempt Canceled")); + auto status = _scheduleWorkAndSaveHandle_inlock( + [=](const executor::TaskExecutor::CallbackArgs& args) { + _startInitialSyncAttemptCallback(args, initialSyncAttempt, initialSyncMaxAttempts); + }, + &_startInitialSyncAttemptHandle, + str::stream() << "_startInitialSyncAttemptCallback-" << initialSyncAttempt); + + if (!status.isOK()) { + _state = State::kComplete; + return status; + } + + return Status::OK(); +} + +Status InitialSyncerFCB::shutdown() { + stdx::unique_lock lock(_mutex); + switch (_state) { + case State::kPreStart: + // Transition directly from PreStart to Complete if not started yet. + _state = State::kComplete; + return Status::OK(); + case State::kRunning: + _state = State::kShuttingDown; + break; + case State::kShuttingDown: + case State::kComplete: + // Nothing to do if we are already in ShuttingDown or Complete state. + return Status::OK(); + } + + _cancelRemainingWork_inlock(); + + // Ensure that storage change will not be blocked by shutdown's opCtx (first call to + // InitialSyncerFCB::shutdown comes from ReplicationCoordinatorImpl::enterTerminalShutdown + // at the moment when there is no opCtx in the shutdown thread yet). + // Wait for finish of tasks that change storage location is any is running. + _inStorageChangeCondition.wait(lock, [this] { return !_inStorageChange; }); + + return Status::OK(); +} + +void InitialSyncerFCB::cancelCurrentAttempt() { + stdx::lock_guard lk(_mutex); + if (_isActive_inlock()) { + LOGV2_DEBUG(128419, + 1, + "Cancelling the current initial sync attempt.", + "currentAttempt"_attr = _stats.failedInitialSyncAttempts + 1); + _cancelRemainingWork_inlock(); + } else { + LOGV2_DEBUG(128420, + 1, + "There is no initial sync attempt to cancel because the initial syncer is not " + "currently active."); + } +} + +void InitialSyncerFCB::_cancelRemainingWork_inlock() { + _cancelHandle_inlock(_startInitialSyncAttemptHandle); + _cancelHandle_inlock(_chooseSyncSourceHandle); + _cancelHandle_inlock(_getBaseRollbackIdHandle); + _cancelHandle_inlock(_fetchBackupCursorHandle); + _cancelHandle_inlock(_transferFileHandle); + _cancelHandle_inlock(_currentHandle); + _cancelHandle_inlock(_getLastRollbackIdHandle); + + if (_sharedData) { + // We actually hold the required lock, but the lock object itself is not passed through. + _clearRetriableError(WithLock::withoutLock()); + stdx::lock_guard lock(*_sharedData); + _sharedData->setStatusIfOK( + lock, Status{ErrorCodes::CallbackCanceled, "Initial sync attempt canceled"}); + } + if (_client) { + _client->shutdownAndDisallowReconnect(); + } + _shutdownComponent_inlock(_applier); + _shutdownComponent_inlock(_backupCursorFetcher); + _shutdownComponent_inlock(_fCVFetcher); + _shutdownComponent_inlock(_beginFetchingOpTimeFetcher); + (*_attemptExec)->shutdown(); + (*_clonerAttemptExec)->shutdown(); + _attemptCanceled = true; +} + +void InitialSyncerFCB::join() { + stdx::unique_lock lk(_mutex); + _stateCondition.wait(lk, [this]() { return !_isActive_inlock(); }); +} + +InitialSyncerFCB::State InitialSyncerFCB::getState_forTest() const { + stdx::lock_guard lk(_mutex); + return _state; +} + +Date_t InitialSyncerFCB::getWallClockTime_forTest() const { + stdx::lock_guard lk(_mutex); + return _lastApplied.wallTime; +} + +void InitialSyncerFCB::setAllowedOutageDuration_forTest(Milliseconds allowedOutageDuration) { + stdx::lock_guard lk(_mutex); + _allowedOutageDuration = allowedOutageDuration; + if (_sharedData) { + stdx::lock_guard lk(*_sharedData); + _sharedData->setAllowedOutageDuration_forTest(lk, allowedOutageDuration); + } +} + +bool InitialSyncerFCB::_isShuttingDown() const { + stdx::lock_guard lock(_mutex); + return _isShuttingDown_inlock(); +} + +bool InitialSyncerFCB::_isShuttingDown_inlock() const { + return State::kShuttingDown == _state; +} + +std::string InitialSyncerFCB::getDiagnosticString() const { + LockGuard lk(_mutex); + str::stream out; + out << "InitialSyncerFCB -" << " active: " << _isActive_inlock() + << " shutting down: " << _isShuttingDown_inlock(); + if (_initialSyncState) { + out << " opsAppied: " << _initialSyncState->appliedOps; + } + + return out; +} + +BSONObj InitialSyncerFCB::getInitialSyncProgress() const { + LockGuard lk(_mutex); + + // We return an empty BSON object after an initial sync attempt has been successfully + // completed. When an initial sync attempt completes successfully, initialSyncCompletes is + // incremented and then _initialSyncState is cleared. We check that _initialSyncState has been + // cleared because an initial sync attempt can fail even after initialSyncCompletes is + // incremented, and we also check that initialSyncCompletes is positive because an initial sync + // attempt can also fail before _initialSyncState is initialized. + if (!_initialSyncState && initial_sync_common_stats::initialSyncCompletes.get() > 0) { + return {}; + } + return _getInitialSyncProgress_inlock(); +} + +void InitialSyncerFCB::_appendInitialSyncProgressMinimal_inlock(BSONObjBuilder* bob) const { + bob->append("method", getInitialSyncMethod()); + _stats.append(bob); + if (!_initialSyncState) { + return; + } + if (_initialSyncState->allDatabaseCloner) { + const auto allDbClonerStats = _initialSyncState->allDatabaseCloner->getStats(); + const auto approxTotalDataSize = allDbClonerStats.dataSize; + bob->appendNumber("approxTotalDataSize", approxTotalDataSize); + long long approxTotalBytesCopied = 0; + for (auto const& dbClonerStats : allDbClonerStats.databaseStats) { + for (auto const& collClonerStats : dbClonerStats.collectionStats) { + approxTotalBytesCopied += collClonerStats.approxBytesCopied; + } + } + bob->appendNumber("approxTotalBytesCopied", approxTotalBytesCopied); + if (approxTotalBytesCopied > 0) { + const auto statsObj = bob->asTempObj(); + auto totalInitialSyncElapsedMillis = + statsObj.getField("totalInitialSyncElapsedMillis").safeNumberLong(); + const auto downloadRate = + (double)totalInitialSyncElapsedMillis / (double)approxTotalBytesCopied; + const auto remainingInitialSyncEstimatedMillis = + downloadRate * (double)(approxTotalDataSize - approxTotalBytesCopied); + bob->appendNumber("remainingInitialSyncEstimatedMillis", + (long long)remainingInitialSyncEstimatedMillis); + } + } + bob->appendNumber("appliedOps", static_cast(_initialSyncState->appliedOps)); + if (!_initialSyncState->beginApplyingTimestamp.isNull()) { + bob->append("initialSyncOplogStart", _initialSyncState->beginApplyingTimestamp); + } + // Only include the beginFetchingTimestamp if it's different from the beginApplyingTimestamp. + if (!_initialSyncState->beginFetchingTimestamp.isNull() && + _initialSyncState->beginFetchingTimestamp != _initialSyncState->beginApplyingTimestamp) { + bob->append("initialSyncOplogFetchingStart", _initialSyncState->beginFetchingTimestamp); + } + if (!_initialSyncState->stopTimestamp.isNull()) { + bob->append("initialSyncOplogEnd", _initialSyncState->stopTimestamp); + } + if (_sharedData) { + stdx::lock_guard sdLock(*_sharedData); + auto unreachableSince = _sharedData->getSyncSourceUnreachableSince(sdLock); + if (unreachableSince != Date_t()) { + bob->append("syncSourceUnreachableSince", unreachableSince); + bob->append("currentOutageDurationMillis", + durationCount(_sharedData->getCurrentOutageDuration(sdLock))); + } + bob->append("totalTimeUnreachableMillis", + durationCount(_sharedData->getTotalTimeUnreachable(sdLock))); + } +} + +BSONObj InitialSyncerFCB::_getInitialSyncProgress_inlock() const { + try { + BSONObjBuilder bob; + _appendInitialSyncProgressMinimal_inlock(&bob); + if (_initialSyncState) { + if (_initialSyncState->allDatabaseCloner) { + BSONObjBuilder dbsBuilder(bob.subobjStart("databases")); + _initialSyncState->allDatabaseCloner->getStats().append(&dbsBuilder); + dbsBuilder.doneFast(); + } + } + return bob.obj(); + } catch (const DBException& e) { + LOGV2(128421, + "Error creating initial sync progress object: {error}", + "Error creating initial sync progress object", + "error"_attr = e.toString()); + } + BSONObjBuilder bob; + _appendInitialSyncProgressMinimal_inlock(&bob); + return bob.obj(); +} + +void InitialSyncerFCB::_setUp_inlock(OperationContext* opCtx, + std::uint32_t initialSyncMaxAttempts) { + // 'opCtx' is passed through from startup(). + _replicationProcess->getConsistencyMarkers()->clearInitialSyncId(opCtx); + + auto* serviceCtx = opCtx->getServiceContext(); + _storage->setInitialDataTimestamp(serviceCtx, Timestamp::kAllowUnstableCheckpointsSentinel); + _storage->setStableTimestamp(serviceCtx, Timestamp::min()); + + _stats.initialSyncStart = _exec->now(); + _stats.maxFailedInitialSyncAttempts = initialSyncMaxAttempts; + _stats.failedInitialSyncAttempts = 0; + _stats.exec = std::weak_ptr(_exec); + + _allowedOutageDuration = Seconds(initialSyncTransientErrorRetryPeriodSeconds.load()); +} + +void InitialSyncerFCB::_tearDown_inlock(OperationContext* opCtx, + const StatusWith& lastApplied) { + _stats.initialSyncEnd = _exec->now(); + + if (!lastApplied.isOK()) { + return; + } + const auto lastAppliedOpTime = lastApplied.getValue().opTime; + auto initialDataTimestamp = lastAppliedOpTime.getTimestamp(); + + // A node coming out of initial sync must guarantee at least one oplog document is visible + // such that others can sync from this node. Oplog visibility is only advanced when applying + // oplog entries during initial sync. Correct the visibility to match the initial sync time + // before transitioning to steady state replication. + const bool orderedCommit = true; + _storage->oplogDiskLocRegister(opCtx, initialDataTimestamp, orderedCommit); + + tenant_migration_access_blocker::recoverTenantMigrationAccessBlockers(opCtx); + ServerlessOperationLockRegistry::recoverLocks(opCtx); + reconstructPreparedTransactions(opCtx, repl::OplogApplication::Mode::kInitialSync); + + _replicationProcess->getConsistencyMarkers()->setInitialSyncIdIfNotSet(opCtx); + + _storage->setInitialDataTimestamp(opCtx->getServiceContext(), initialDataTimestamp); + + auto currentLastAppliedOpTime = _opts.getMyLastOptime(); + if (currentLastAppliedOpTime.isNull()) { + _opts.setMyLastOptime(lastApplied.getValue()); + } else { + invariant(currentLastAppliedOpTime == lastAppliedOpTime); + } + + LOGV2(128422, + "initial sync done; took " + "{duration}.", + "Initial sync done", + "duration"_attr = + duration_cast(_stats.initialSyncEnd - _stats.initialSyncStart)); + initial_sync_common_stats::initialSyncCompletes.increment(); +} + +void InitialSyncerFCB::_startInitialSyncAttemptCallback( + const executor::TaskExecutor::CallbackArgs& callbackArgs, + std::uint32_t initialSyncAttempt, + std::uint32_t initialSyncMaxAttempts) noexcept { + auto status = [&] { + stdx::lock_guard lock(_mutex); + return _checkForShutdownAndConvertStatus_inlock( + callbackArgs, + str::stream() << "error while starting initial sync attempt " + << (initialSyncAttempt + 1) << " of " << initialSyncMaxAttempts); + }(); + + if (!status.isOK()) { + _finishInitialSyncAttempt(status); + return; + } + + LOGV2(128423, + "Starting initial sync (attempt {initialSyncAttempt} of {initialSyncMaxAttempts})", + "Starting initial sync attempt", + "initialSyncAttempt"_attr = (initialSyncAttempt + 1), + "initialSyncMaxAttempts"_attr = initialSyncMaxAttempts); + + // This completion guard invokes _finishInitialSyncAttempt on destruction. + auto cancelRemainingWorkInLock = [this]() { + _cancelRemainingWork_inlock(); + }; + auto finishInitialSyncAttemptFn = [this](const StatusWith& lastApplied) { + _finishInitialSyncAttempt(lastApplied); + }; + auto onCompletionGuard = + std::make_shared(cancelRemainingWorkInLock, finishInitialSyncAttemptFn); + + // Lock guard must be declared after completion guard because completion guard destructor + // has to run outside lock. + stdx::lock_guard lock(_mutex); + + LOGV2_DEBUG(128424, + 2, + "Resetting sync source so a new one can be chosen for this initial sync attempt"); + _syncSource = HostAndPort(); + + LOGV2_DEBUG(128425, 2, "Resetting all optimes before starting this initial sync attempt"); + _opts.resetOptimes(); + _lastApplied = {OpTime(), Date_t()}; + _lastFetched = {}; + _backupCursorInfo.reset(); + + LOGV2_DEBUG( + 128426, 2, "Resetting the oldest timestamp before starting this initial sync attempt"); + auto* storageEngine = getGlobalServiceContext()->getStorageEngine(); + if (storageEngine) { + // Set the oldestTimestamp to one because WiredTiger does not allow us to set it to zero + // since that would also set the all_durable point to zero. We specifically don't set + // the stable timestamp here because that will trigger taking a first stable checkpoint even + // though the initialDataTimestamp is still set to kAllowUnstableCheckpointsSentinel. + storageEngine->setOldestTimestamp(kTimestampOne); + } + + LOGV2_DEBUG(128427, + 2, + "Resetting feature compatibility version to last-lts. If the sync source is in " + "latest feature compatibility version, we will find out when we clone the " + "server configuration collection (admin.system.version)"); + serverGlobalParams.mutableFCV.reset(); + + // Get sync source. + std::uint32_t chooseSyncSourceAttempt = 0; + std::uint32_t chooseSyncSourceMaxAttempts = + static_cast(numInitialSyncConnectAttempts.load()); + + // _scheduleWorkAndSaveHandle_inlock() is shutdown-aware. + status = _scheduleWorkAndSaveHandle_inlock( + [=](const executor::TaskExecutor::CallbackArgs& args) { + _chooseSyncSourceCallback( + args, chooseSyncSourceAttempt, chooseSyncSourceMaxAttempts, onCompletionGuard); + }, + &_chooseSyncSourceHandle, + str::stream() << "_chooseSyncSourceCallback-" << chooseSyncSourceAttempt); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } +} + +void InitialSyncerFCB::_chooseSyncSourceCallback( + const executor::TaskExecutor::CallbackArgs& callbackArgs, + std::uint32_t chooseSyncSourceAttempt, + std::uint32_t chooseSyncSourceMaxAttempts, + std::shared_ptr onCompletionGuard) noexcept try { + if (MONGO_unlikely(initialSyncHangBeforeChoosingSyncSource.shouldFail())) { + LOGV2(128428, "initialSyncHangBeforeChoosingSyncSource fail point enabled"); + initialSyncHangBeforeChoosingSyncSource.pauseWhileSet(); + } + + stdx::unique_lock lock(_mutex); + // Cancellation should be treated the same as other errors. In this case, the most likely cause + // of a failed _chooseSyncSourceCallback() task is a cancellation triggered by + // InitialSyncerFCB::shutdown() or the task executor shutting down. + auto status = + _checkForShutdownAndConvertStatus_inlock(callbackArgs, "error while choosing sync source"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + if (MONGO_unlikely(failInitialSyncWithBadHost.shouldFail())) { + status = Status(ErrorCodes::InvalidSyncSource, + "initial sync failed - failInitialSyncWithBadHost failpoint is set."); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + auto syncSource = _chooseSyncSource_inlock(); + if (!syncSource.isOK()) { + if (chooseSyncSourceAttempt + 1 >= chooseSyncSourceMaxAttempts) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock( + lock, + Status(ErrorCodes::InitialSyncOplogSourceMissing, + "No valid sync source found in current replica set to do an initial sync.")); + return; + } + + auto when = (*_attemptExec)->now() + _opts.syncSourceRetryWait; + LOGV2_DEBUG(128429, + 1, + "Error getting sync source: '{error}', trying again in " + "{syncSourceRetryWait} at {retryTime}. Attempt {chooseSyncSourceAttempt} of " + "{numInitialSyncConnectAttempts}", + "Error getting sync source. Waiting to retry", + "error"_attr = syncSource.getStatus(), + "syncSourceRetryWait"_attr = _opts.syncSourceRetryWait, + "retryTime"_attr = when.toString(), + "chooseSyncSourceAttempt"_attr = (chooseSyncSourceAttempt + 1), + "numInitialSyncConnectAttempts"_attr = numInitialSyncConnectAttempts.load()); + auto status = _scheduleWorkAtAndSaveHandle_inlock( + when, + [=](const executor::TaskExecutor::CallbackArgs& args) { + _chooseSyncSourceCallback(args, + chooseSyncSourceAttempt + 1, + chooseSyncSourceMaxAttempts, + onCompletionGuard); + }, + &_chooseSyncSourceHandle, + str::stream() << "_chooseSyncSourceCallback-" << (chooseSyncSourceAttempt + 1)); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + return; + } + + if (MONGO_unlikely(initialSyncHangBeforeCreatingOplog.shouldFail())) { + // This log output is used in js tests so please leave it. + LOGV2(128430, + "initial sync - initialSyncHangBeforeCreatingOplog fail point " + "enabled. Blocking until fail point is disabled."); + lock.unlock(); + while (MONGO_unlikely(initialSyncHangBeforeCreatingOplog.shouldFail()) && + !_isShuttingDown()) { + mongo::sleepsecs(1); + } + lock.lock(); + } + + // There is no need to schedule separate task to create oplog collection since we are already in + // a callback and we are certain there's no existing operation context (required for creating + // collections and dropping user databases) attached to the current thread. + status = _truncateOplogAndDropReplicatedDatabases(); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + _syncSource = syncSource.getValue(); + + // Schedule rollback ID checker. + _rollbackChecker = std::make_unique(*_attemptExec, _syncSource); + auto scheduleResult = _rollbackChecker->reset([=](const RollbackChecker::Result& result) { + return _rollbackCheckerResetCallback(result, onCompletionGuard); + }); + status = scheduleResult.getStatus(); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + _getBaseRollbackIdHandle = scheduleResult.getValue(); +} catch (const DBException&) { + // Report exception as an initial syncer failure. + stdx::unique_lock lock(_mutex); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, exceptionToStatus()); +} + +// TODO: we probably don't need this in FCBIS +Status InitialSyncerFCB::_truncateOplogAndDropReplicatedDatabases() { + // truncate oplog; drop user databases. + LOGV2_DEBUG(128431, + 1, + "About to truncate the oplog, if it exists, ns:{namespace}, and drop all " + "user databases (so that we can clone them).", + "About to truncate the oplog, if it exists, and drop all user databases (so that " + "we can clone them)", + logAttrs(NamespaceString::kRsOplogNamespace)); + + auto opCtx = makeOpCtx(); + // This code can make untimestamped writes (deletes) to the _mdb_catalog on top of existing + // timestamped updates. + opCtx->recoveryUnit()->allowAllUntimestampedWrites(); + + // We are not replicating nor validating these writes. + UnreplicatedWritesBlock unreplicatedWritesBlock(opCtx.get()); + + // 1.) Truncate the oplog. + LOGV2_DEBUG(128432, + 2, + "Truncating the existing oplog: {namespace}", + "Truncating the existing oplog", + logAttrs(NamespaceString::kRsOplogNamespace)); + Timer timer; + auto status = _storage->truncateCollection(opCtx.get(), NamespaceString::kRsOplogNamespace); + LOGV2(128433, + "Initial syncer oplog truncation finished in: {durationMillis}ms", + "Initial syncer oplog truncation finished", + "durationMillis"_attr = timer.millis()); + if (!status.isOK()) { + // 1a.) Create the oplog. + LOGV2_DEBUG(128434, + 2, + "Creating the oplog: {namespace}", + "Creating the oplog", + logAttrs(NamespaceString::kRsOplogNamespace)); + status = _storage->createOplog(opCtx.get(), NamespaceString::kRsOplogNamespace); + if (!status.isOK()) { + return status; + } + } + + // 2a.) Abort any index builds started during initial sync. + IndexBuildsCoordinator::get(opCtx.get()) + ->abortAllIndexBuildsForInitialSync(opCtx.get(), "Aborting index builds for initial sync"); + + // 2b.) Drop user databases. + LOGV2_DEBUG(128435, 2, "Dropping user databases"); + return _storage->dropReplicatedDatabases(opCtx.get()); +} + +void InitialSyncerFCB::_rollbackCheckerResetCallback( + const RollbackChecker::Result& result, std::shared_ptr onCompletionGuard) { + stdx::lock_guard lock(_mutex); + auto status = _checkForShutdownAndConvertStatus_inlock(result.getStatus(), + "error while getting base rollback ID"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + // we will need shared data to clone files from sync source + _sharedData = + std::make_unique(_rollbackChecker->getBaseRBID(), + _allowedOutageDuration, + getGlobalServiceContext()->getFastClockSource()); + _client = _createClientFn(); + + // schedule $backupCursor on the sync source + status = _scheduleWorkAndSaveHandle_inlock( + [this, onCompletionGuard](const executor::TaskExecutor::CallbackArgs& args) { + _fetchBackupCursorCallback(args, onCompletionGuard); + }, + &_fetchBackupCursorHandle, + "_fetchBackupCursorCallback"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } +} + +void InitialSyncerFCB::_fcvFetcherCallback(const StatusWith& result, + std::shared_ptr onCompletionGuard, + const OpTime& lastOpTime, + OpTime& beginFetchingOpTime) { + stdx::unique_lock lock(_mutex); + auto status = _checkForShutdownAndConvertStatus_inlock( + result.getStatus(), "error while getting the remote feature compatibility version"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + const auto docs = result.getValue().documents; + if (docs.size() > 1) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock( + lock, + Status(ErrorCodes::TooManyMatchingDocuments, + str::stream() << "Expected to receive one feature compatibility version " + "document, but received: " + << docs.size() << ". First: " << redact(docs.front()) + << ". Last: " << redact(docs.back()))); + return; + } + const auto hasDoc = docs.begin() != docs.end(); + if (!hasDoc) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock( + lock, + Status(ErrorCodes::IncompatibleServerVersion, + "Sync source had no feature compatibility version document")); + return; + } + + auto fCVParseSW = FeatureCompatibilityVersionParser::parse(docs.front()); + if (!fCVParseSW.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, fCVParseSW.getStatus()); + return; + } + + auto version = fCVParseSW.getValue(); + + // Changing the featureCompatibilityVersion during initial sync is unsafe. + // (Generic FCV reference): This FCV check should exist across LTS binary versions. + if (serverGlobalParams.featureCompatibility.acquireFCVSnapshot().isUpgradingOrDowngrading( + version)) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock( + lock, + Status(ErrorCodes::IncompatibleServerVersion, + str::stream() << "Sync source had unsafe feature compatibility version: " + << multiversion::toString(version))); + return; + } else { + // Since we don't guarantee that we always clone the "admin.system.version" collection first + // and collection/index creation can depend on FCV, we set the in-memory FCV value to match + // the version on the sync source. We won't persist the FCV on disk nor will we update our + // minWireVersion until we clone the actual document. + serverGlobalParams.mutableFCV.setVersion(version); + } + + if (MONGO_unlikely(initialSyncHangBeforeSplittingControlFlow.shouldFail())) { + lock.unlock(); + LOGV2(128436, + "initial sync - initialSyncHangBeforeSplittingControlFlow fail point " + "enabled. Blocking until fail point is disabled."); + while (MONGO_unlikely(initialSyncHangBeforeSplittingControlFlow.shouldFail()) && + !_isShuttingDown()) { + mongo::sleepsecs(1); + } + lock.lock(); + } + + // This is where the flow of control starts to split into two parallel tracks: + // - oplog fetcher + // - data cloning and applier + _sharedData = + std::make_unique(_rollbackChecker->getBaseRBID(), + _allowedOutageDuration, + getGlobalServiceContext()->getFastClockSource()); + _client = _createClientFn(); + _initialSyncState = std::make_unique(std::make_unique( + _sharedData.get(), _syncSource, _client.get(), _storage, _writerPool)); + + _initialSyncState->beginApplyingTimestamp = lastOpTime.getTimestamp(); + _initialSyncState->beginFetchingTimestamp = beginFetchingOpTime.getTimestamp(); + + invariant(_initialSyncState->beginApplyingTimestamp >= + _initialSyncState->beginFetchingTimestamp, + str::stream() << "beginApplyingTimestamp was less than beginFetchingTimestamp. " + "beginApplyingTimestamp: " + << _initialSyncState->beginApplyingTimestamp.toBSON() + << " beginFetchingTimestamp: " + << _initialSyncState->beginFetchingTimestamp.toBSON()); + + invariant(!result.getValue().documents.empty()); + LOGV2_DEBUG(128437, + 2, + "Setting begin applying timestamp to {beginApplyingTimestamp}, ns: " + "{namespace} and the begin fetching timestamp to {beginFetchingTimestamp}", + "Setting begin applying timestamp and begin fetching timestamp", + "beginApplyingTimestamp"_attr = _initialSyncState->beginApplyingTimestamp, + logAttrs(NamespaceString::kRsOplogNamespace), + "beginFetchingTimestamp"_attr = _initialSyncState->beginFetchingTimestamp); + + const auto configResult = _dataReplicatorExternalState->getCurrentConfig(); + status = configResult.getStatus(); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + _initialSyncState.reset(); + return; + } + + if (MONGO_unlikely(initialSyncHangBeforeCopyingDatabases.shouldFail())) { + lock.unlock(); + // This could have been done with a scheduleWorkAt but this is used only by JS tests where + // we run with multiple threads so it's fine to spin on this thread. + // This log output is used in js tests so please leave it. + LOGV2(128438, + "initial sync - initialSyncHangBeforeCopyingDatabases fail point " + "enabled. Blocking until fail point is disabled."); + while (MONGO_unlikely(initialSyncHangBeforeCopyingDatabases.shouldFail()) && + !_isShuttingDown()) { + mongo::sleepsecs(1); + } + lock.lock(); + } + + lock.unlock(); +} + +void InitialSyncerFCB::_finishInitialSyncAttempt(const StatusWith& lastApplied) { + // Since _finishInitialSyncAttempt can be called from any component's callback function or + // scheduled task, it is possible that we may not be in a TaskExecutor-managed thread when this + // function is invoked. + // For example, if CollectionCloner fails while inserting documents into the + // CollectionBulkLoader, we will get here via one of CollectionCloner's TaskRunner callbacks + // which has an active OperationContext bound to the current Client. This would lead to an + // invariant when we attempt to create a new OperationContext for _tearDown(opCtx). + // To avoid this, we schedule _finishCallback against the TaskExecutor rather than calling it + // here synchronously. + + // Unless dismissed, a scope guard will schedule _finishCallback() upon exiting this function. + // Since it is a requirement that _finishCallback be called outside the lock (which is possible + // if the task scheduling fails and we have to invoke _finishCallback() synchronously), we + // declare the scope guard before the lock guard. + auto result = lastApplied; + ScopeGuard finishCallbackGuard([this, &result] { + auto scheduleResult = _exec->scheduleWork( + [=](const mongo::executor::TaskExecutor::CallbackArgs&) { _finishCallback(result); }); + if (!scheduleResult.isOK()) { + LOGV2_WARNING(128439, + "Unable to schedule initial syncer completion task due to " + "{error}. Running callback on current thread.", + "Unable to schedule initial syncer completion task. Running callback on " + "current thread", + "error"_attr = redact(scheduleResult.getStatus())); + _finishCallback(result); + } + }); + + LOGV2(128440, "Initial sync attempt finishing up"); + + stdx::lock_guard lock(_mutex); + + auto runTime = _initialSyncState ? _initialSyncState->timer.millis() : 0; + int rollBackId = -1; + int operationsRetried = 0; + int totalTimeUnreachableMillis = 0; + if (_sharedData) { + stdx::lock_guard sdLock(*_sharedData); + rollBackId = _sharedData->getRollBackId(); + operationsRetried = _sharedData->getTotalRetries(sdLock); + totalTimeUnreachableMillis = + durationCount(_sharedData->getTotalTimeUnreachable(sdLock)); + } + + if (MONGO_unlikely(failAndHangInitialSync.shouldFail())) { + LOGV2(128441, "failAndHangInitialSync fail point enabled"); + failAndHangInitialSync.pauseWhileSet(); + result = Status(ErrorCodes::InternalError, "failAndHangInitialSync fail point enabled"); + } + + _stats.initialSyncAttemptInfos.emplace_back( + InitialSyncerFCB::InitialSyncAttemptInfo{runTime, + result.getStatus(), + _syncSource, + rollBackId, + operationsRetried, + totalTimeUnreachableMillis}); + + if (!result.isOK()) { + // This increments the number of failed attempts for the current initial sync request. + ++_stats.failedInitialSyncAttempts; + // This increments the number of failed attempts across all initial sync attempts since + // process startup. + initial_sync_common_stats::initialSyncFailedAttempts.increment(); + } + + bool hasRetries = _stats.failedInitialSyncAttempts < _stats.maxFailedInitialSyncAttempts; + + initial_sync_common_stats::LogInitialSyncAttemptStats( + result, hasRetries, _getInitialSyncProgress_inlock()); + + if (result.isOK()) { + // Scope guard will invoke _finishCallback(). + return; + } + + LOGV2_ERROR(128442, + "Initial sync attempt failed -- attempts left: " + "{attemptsLeft} cause: " + "{error}", + "Initial sync attempt failed", + "attemptsLeft"_attr = + (_stats.maxFailedInitialSyncAttempts - _stats.failedInitialSyncAttempts), + "error"_attr = redact(result.getStatus())); + + // Check if need to do more retries. + if (!hasRetries) { + LOGV2_FATAL_CONTINUE(128443, + "The maximum number of retries have been exhausted for initial sync"); + + initial_sync_common_stats::initialSyncFailures.increment(); + + // Scope guard will invoke _finishCallback(). + return; + } + + // denylist only makes sense if we still have retries left. + if (result.getStatus() == ErrorCodes::InvalidSyncSource) { + // If the sync source is invalid, we should denylist it for a while. + const auto until = (*_attemptExec)->now() + _opts.syncSourceRetryWait * 2; + _opts.syncSourceSelector->denylistSyncSource(_syncSource, until); + } + + _attemptExec = std::make_unique( + _exec, Status(ErrorCodes::CallbackCanceled, "Initial Sync Attempt Canceled")); + _clonerAttemptExec = std::make_unique( + _clonerExec, Status(ErrorCodes::CallbackCanceled, "Initial Sync Attempt Canceled")); + _attemptCanceled = false; + auto when = (*_attemptExec)->now() + _opts.initialSyncRetryWait; + auto status = _scheduleWorkAtAndSaveHandle_inlock( + when, + [=](const executor::TaskExecutor::CallbackArgs& args) { + _startInitialSyncAttemptCallback( + args, _stats.failedInitialSyncAttempts, _stats.maxFailedInitialSyncAttempts); + }, + &_startInitialSyncAttemptHandle, + str::stream() << "_startInitialSyncAttemptCallback-" << _stats.failedInitialSyncAttempts); + + if (!status.isOK()) { + result = status; + // Scope guard will invoke _finishCallback(). + return; + } + + // Next initial sync attempt scheduled successfully and we do not need to call _finishCallback() + // until the next initial sync attempt finishes. + finishCallbackGuard.dismiss(); +} + +void InitialSyncerFCB::_finishCallback(StatusWith lastApplied) { + // After running callback function, clear '_onCompletion' to release any resources that might be + // held by this function object. + // '_onCompletion' must be moved to a temporary copy and destroyed outside the lock in case + // there is any logic that's invoked at the function object's destruction that might call into + // this InitialSyncerFCB. 'onCompletion' must be destroyed outside the lock and this should + // happen before we transition the state to Complete. + decltype(_onCompletion) onCompletion; + { + stdx::lock_guard lock(_mutex); + auto opCtx = makeOpCtx(); + _tearDown_inlock(opCtx.get(), lastApplied); + invariant(_onCompletion); + std::swap(_onCompletion, onCompletion); + } + + if (MONGO_unlikely(initialSyncHangBeforeFinish.shouldFail())) { + // This log output is used in js tests so please leave it. + LOGV2(128444, + "initial sync - initialSyncHangBeforeFinish fail point " + "enabled. Blocking until fail point is disabled."); + while (MONGO_unlikely(initialSyncHangBeforeFinish.shouldFail()) && !_isShuttingDown()) { + mongo::sleepsecs(1); + } + } + + // Any _retryingOperation is no longer active. This must be done before signalling state + // Complete. + _retryingOperation = boost::none; + + // Completion callback must be invoked outside mutex. + try { + onCompletion(lastApplied); + } catch (...) { + LOGV2_WARNING(128445, + "initial syncer finish callback threw exception: {error}", + "Initial syncer finish callback threw exception", + "error"_attr = redact(exceptionToStatus())); + } + + // Destroy the remaining reference to the completion callback before we transition the state to + // Complete so that callers can expect any resources bound to '_onCompletion' to be released + // before InitialSyncerFCB::join() returns. + onCompletion = {}; + + { + stdx::lock_guard lock(_mutex); + invariant(_state != State::kComplete); + _state = State::kComplete; + _stateCondition.notify_all(); + + // Clear the initial sync progress after an initial sync attempt has been successfully + // completed. + if (lastApplied.isOK() && !MONGO_unlikely(skipClearInitialSyncState.shouldFail())) { + _initialSyncState.reset(); + } + + // Destroy shared references to executors. + _attemptExec = nullptr; + _clonerAttemptExec = nullptr; + _clonerExec = nullptr; + _exec = nullptr; + } + + if (MONGO_unlikely(initialSyncHangAfterFinish.shouldFail())) { + LOGV2(128446, + "initial sync finished - initialSyncHangAfterFinish fail point " + "enabled. Blocking until fail point is disabled."); + while (MONGO_unlikely(initialSyncHangAfterFinish.shouldFail()) && !_isShuttingDown()) { + mongo::sleepsecs(1); + } + } +} + +bool InitialSyncerFCB::_shouldRetryError(WithLock lk, Status status) { + if (ErrorCodes::isRetriableError(status)) { + stdx::lock_guard sharedDataLock(*_sharedData); + return _sharedData->shouldRetryOperation(sharedDataLock, &_retryingOperation); + } + // The status was OK or some error other than a retriable error, so clear the retriable error + // state and indicate that we should not retry. + _clearRetriableError(lk); + return false; +} + +void InitialSyncerFCB::_clearRetriableError(WithLock lk) { + _retryingOperation = boost::none; +} + +Status InitialSyncerFCB::_checkForShutdownAndConvertStatus_inlock( + const executor::TaskExecutor::CallbackArgs& callbackArgs, const std::string& message) { + return _checkForShutdownAndConvertStatus_inlock(callbackArgs.status, message); +} + +Status InitialSyncerFCB::_checkForShutdownAndConvertStatus_inlock(const Status& status, + const std::string& message) { + + if (_isShuttingDown_inlock()) { + return {ErrorCodes::CallbackCanceled, message + ": initial syncer is shutting down"}; + } + + return status.withContext(message); +} + +Status InitialSyncerFCB::_scheduleWorkAndSaveHandle_inlock( + executor::TaskExecutor::CallbackFn work, + executor::TaskExecutor::CallbackHandle* handle, + const std::string& name) { + invariant(handle); + if (_isShuttingDown_inlock()) { + return {ErrorCodes::CallbackCanceled, + str::stream() << "failed to schedule work " << name + << ": initial syncer is shutting down"}; + } + auto result = (*_attemptExec)->scheduleWork(std::move(work)); + if (!result.isOK()) { + return result.getStatus().withContext(str::stream() << "failed to schedule work " << name); + } + *handle = result.getValue(); + return Status::OK(); +} + +Status InitialSyncerFCB::_scheduleWorkAtAndSaveHandle_inlock( + Date_t when, + executor::TaskExecutor::CallbackFn work, + executor::TaskExecutor::CallbackHandle* handle, + const std::string& name) { + invariant(handle); + if (_isShuttingDown_inlock()) { + return {ErrorCodes::CallbackCanceled, + str::stream() << "failed to schedule work " << name << " at " << when.toString() + << ": initial syncer is shutting down"}; + } + auto result = (*_attemptExec)->scheduleWorkAt(when, std::move(work)); + if (!result.isOK()) { + return result.getStatus().withContext(str::stream() << "failed to schedule work " << name + << " at " << when.toString()); + } + *handle = result.getValue(); + return Status::OK(); +} + +void InitialSyncerFCB::_cancelHandle_inlock(executor::TaskExecutor::CallbackHandle handle) { + if (!handle) { + return; + } + (*_attemptExec)->cancel(handle); +} + +template +Status InitialSyncerFCB::_startupComponent_inlock(Component& component) { + // It is necessary to check if shutdown or attempt cancelling happens before starting a + // component; otherwise the component may call a callback function in line which will + // cause a deadlock when the callback attempts to obtain the initial syncer mutex. + if (_isShuttingDown_inlock() || _attemptCanceled) { + component.reset(); + if (_isShuttingDown_inlock()) { + return {ErrorCodes::CallbackCanceled, + "initial syncer shutdown while trying to call startup() on component"}; + } else { + return {ErrorCodes::CallbackCanceled, + "initial sync attempt canceled while trying to call startup() on component"}; + } + } + auto status = component->startup(); + if (!status.isOK()) { + component.reset(); + } + return status; +} + +template +void InitialSyncerFCB::_shutdownComponent_inlock(Component& component) { + if (!component) { + return; + } + component->shutdown(); +} + +StatusWith InitialSyncerFCB::_chooseSyncSource_inlock() { + auto syncSource = _opts.syncSourceSelector->chooseNewSyncSource(_lastFetched); + if (syncSource.empty()) { + return Status{ErrorCodes::InvalidSyncSource, + str::stream() << "No valid sync source available. Our last fetched optime: " + << _lastFetched.toString()}; + } + return syncSource; +} + +namespace { + +constexpr int kBackupCursorFileFetcherRetryAttempts = 10; + +BSONObj makeBackupCursorCmd() { + BSONArrayBuilder pipelineBuilder; + pipelineBuilder << BSON("$backupCursor" << BSONObj()); + return BSON("aggregate" << 1 << "pipeline" << pipelineBuilder.arr() << "cursor" << BSONObj()); +} + +AggregateCommandRequest makeBackupCursorRequest() { + return {NamespaceString::makeCollectionlessAggregateNSS(DatabaseName::kAdmin), + {BSON("$backupCursor" << BSONObj())}}; +} + +} // namespace + +// clean local files in the dbpath +Status InitialSyncerFCB::_deleteLocalFiles() { + // list of files is in the _localFiles vector of std::string + for (const auto& path : _localFiles) { + boost::system::error_code ec; + boost::filesystem::remove(path, ec); + if (ec) { + return {ErrorCodes::InternalError, + "Error deleting file '{}': {}"_format(path, ec.message())}; + } + } + return Status::OK(); +} + +// function to move files from one directory to another +// excluding .dummy subdirectory +Status InitialSyncerFCB::_moveFiles(const boost::filesystem::path& sourceDir, + const boost::filesystem::path& destDir) { + namespace fs = boost::filesystem; + + const fs::path excluded{".dummy"}; + try { + std::vector files; + // populate files list and create directory structure under destDir + for (auto it = fs::recursive_directory_iterator(sourceDir); + it != fs::recursive_directory_iterator(); + ++it) { + if (fs::is_regular_file(it->status())) { + // TODO: filter some files + // push into the list + files.push_back(it->path()); + } else if (fs::is_directory(it->status())) { + auto relPath = fs::relative(it->path(), sourceDir); + if (excluded == relPath) { + it.disable_recursion_pending(); + } else { + fs::create_directories(destDir / relPath); + } + } + } + // move files from the list + for (const auto& sourcePath : files) { + auto destPath = destDir / fs::relative(sourcePath, sourceDir); + fs::rename(sourcePath, destPath); + } + + return Status::OK(); + } catch (const fs::filesystem_error& e) { + return Status(ErrorCodes::UnknownError, e.what()); + } +} + +// Open a local backup cursor and obtain a list of files from that. +StatusWith> InitialSyncerFCB::_getBackupFiles() { + std::vector files; + try { + // Open a local backup cursor and obtain a list of files from that. + + // Try to use DBDirectClient + auto opCtx = makeOpCtx(); + DBDirectClient client(opCtx.get()); + auto cursor = uassertStatusOK(DBClientCursor::fromAggregationRequest( + &client, makeBackupCursorRequest(), true /* secondaryOk */, false /* useExhaust */)); + if (cursor->more()) { + auto metadata = cursor->next(); + files.reserve(cursor->objsLeftInBatch()); + } + while (cursor->more()) { + auto rec = cursor->next(); + files.emplace_back(rec[kFileNameFieldName].String()); + } + + // Close cursor + cursor->kill(); + } catch (const DBException& e) { + return e.toStatus(); + } + return files; +} + +// Switch storage location +Status InitialSyncerFCB::_switchStorageLocation( + OperationContext* opCtx, + const std::string& newLocation, + const boost::optional recoveryMode) { + boost::system::error_code ec; + boost::filesystem::create_directories(newLocation, ec); + if (ec) { + return {ErrorCodes::InternalError, + str::stream() << "Failed to create directory " << newLocation + << " Error: " << ec.message()}; + } + + auto previousCatalogState = catalog::closeCatalog(opCtx); + + auto lastShutdownState = + reinitializeStorageEngine(opCtx, StorageEngineInitFlags{}, [&newLocation, opCtx] { + storageGlobalParams.dbpath = newLocation; + repl::clearLocalOplogPtr(opCtx->getServiceContext()); + }); + opCtx->getServiceContext()->getStorageEngine()->notifyStartupComplete(); + if (StorageEngine::LastShutdownState::kClean != lastShutdownState) { + return {ErrorCodes::InternalError, + str::stream() << "Failed to switch storage location to " << newLocation}; + } + + + if (recoveryMode) { + // We need to run startup recovery in the specified mode. + // This is necessary to ensure that the storage engine is in a consistent state. + try { + startup_recovery::runStartupRecoveryInMode(opCtx, lastShutdownState, *recoveryMode); + } catch (const ExceptionFor& error) { + // versions incompatibility (we actually should check this when we select sync source) + return error.toStatus(); + } + } + + catalog::openCatalogAfterStorageChange(opCtx); + + LOGV2_DEBUG(128415, 1, "Switched storage location", "newLocation"_attr = newLocation); + return Status::OK(); +} + +Status InitialSyncerFCB::_killBackupCursor_inlock() { + const auto* info = _backupCursorInfo.get(); + invariant(info); + executor::RemoteCommandRequest killCursorsRequest( + _syncSource, + info->nss.db().toString(), + BSON("killCursors" << info->nss.coll().toString() << "cursors" + << BSON_ARRAY(info->cursorId)), + nullptr); + + auto scheduleResult = _exec->scheduleRemoteCommand( + killCursorsRequest, [](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { + if (!args.response.isOK()) { + LOGV2_WARNING(128416, + "killCursors command task failed", + "error"_attr = redact(args.response.status)); + return; + } + auto status = getStatusFromCommandResult(args.response.data); + if (status.isOK()) { + LOGV2_INFO(128417, "Killed backup cursor"); + } else { + LOGV2_WARNING(128418, "killCursors command failed", "error"_attr = redact(status)); + } + }); + return scheduleResult.getStatus(); +} + +// TenantMigrationRecipientService::Instance::_openBackupCursor +// ShardMergeRecipientService::Instance::_openBackupCursor +void InitialSyncerFCB::_fetchBackupCursorCallback( + const executor::TaskExecutor::CallbackArgs& callbackArgs, + // NOLINTNEXTLINE(*-unnecessary-value-param) + std::shared_ptr onCompletionGuard) noexcept try { + stdx::lock_guard lock(_mutex); + auto status = _checkForShutdownAndConvertStatus_inlock( + callbackArgs, "error executing backup cusrsor on the sync source"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + const auto aggregateCommandRequestObj = [] { + AggregateCommandRequest aggRequest( + NamespaceString::makeCollectionlessAggregateNSS(DatabaseName::kAdmin), + {BSON("$backupCursor" << BSONObj())}); + // We must set a writeConcern on internal commands. + aggRequest.setWriteConcern(WriteConcernOptions()); + return aggRequest.toBSON(BSONObj()); + }(); + + LOGV2_DEBUG(128407, 1, "Opening backup cursor on sync source", "syncSource"_attr = _syncSource); + + auto fetchStatus = std::make_shared>(); + const auto fetcherCallback = [this, fetchStatus](const Fetcher::QueryResponseStatus& dataStatus, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob) noexcept { + try { + uassertStatusOK(dataStatus); + + const auto& data = dataStatus.getValue(); + for (const BSONObj& doc : data.documents) { + if (doc[kMetadataFieldName]) { + // First batch must contain the metadata. + const auto& metadata = doc[kMetadataFieldName].Obj(); + auto checkpointTimestamp = metadata["checkpointTimestamp"].timestamp(); + _backupId = UUID(uassertStatusOK(UUID::parse(metadata[kBackupIdFieldName]))); + _remoteDBPath = metadata[kDBPathFieldName].String(); + auto status = OpTime::parseFromOplogEntry(metadata["oplogEnd"].Obj()); + invariant(status.isOK()); + _oplogEnd = status.getValue(); + _backupCursorInfo = std::make_unique( + data.cursorId, data.nss, checkpointTimestamp); + + LOGV2_INFO(128409, + "Opened backup cursor on sync source", + "backupCursorId"_attr = data.cursorId, + "remoteDBPath"_attr = _remoteDBPath, + "backupCursorCheckpointTimestamp"_attr = checkpointTimestamp); + // empty _remoteFiles on new sync attempt start + _remoteFiles.clear(); + } else { + auto fileName = doc[kFileNameFieldName].String(); + auto fileSize = doc[kFileSizeFieldName].numberLong(); + LOGV2_DEBUG(128410, + 1, + "Backup cursor entry", + "filename"_attr = fileName, + "fileSize"_attr = fileSize, + "backupCursorId"_attr = data.cursorId); + _remoteFiles.emplace_back(fileName, fileSize); + } + } + + *fetchStatus = Status::OK(); + if (!getMoreBob || data.documents.empty()) { + // Exit fetcher but keep the backupCursor alive to prevent WT on sync source + // from modifying file bytes. backupCursor can be closed after all files are + // copied + *nextAction = Fetcher::NextAction::kExitAndKeepCursorAlive; + return; + } + + getMoreBob->append("getMore", data.cursorId); + getMoreBob->append("collection", data.nss.coll()); + } catch (DBException& ex) { + LOGV2_ERROR( + 128408, "Error fetching backup cursor entries", "error"_attr = ex.toString()); + *fetchStatus = ex.toStatus(); + // In case of following error: + // "Location50886: The existing backup cursor must be closed before $backupCursor can + // succeed." replace error code with InvalidSyncSource to ensure fallback to logical + if (fetchStatus->get().code() == 50886) { + *fetchStatus = Status{ErrorCodes::InvalidSyncSource, ex.reason()}; + } + } + }; + + _backupCursorFetcher = std::make_unique( + *_attemptExec, + _syncSource, + DatabaseName::kAdmin.toString(), + aggregateCommandRequestObj, + fetcherCallback, + // ReadPreferenceSetting::secondaryPreferredMetadata(), + ReadPreferenceSetting(ReadPreference::PrimaryPreferred).toContainingBSON(), + executor::RemoteCommandRequest::kNoTimeout, + executor::RemoteCommandRequest::kNoTimeout, + RemoteCommandRetryScheduler::makeRetryPolicy( + kBackupCursorFileFetcherRetryAttempts, executor::RemoteCommandRequest::kNoTimeout)); + + Status scheduleStatus = _backupCursorFetcher->schedule(); + if (!scheduleStatus.isOK()) { + _backupCursorFetcher.reset(); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus); + return; + } + + _backupCursorFetcher->onCompletion() + .thenRunOn(**_attemptExec) + .then([this, fetchStatus, onCompletionGuard, &lock] { + if (!*fetchStatus) { + // the callback was never invoked + uasserted(128411, "Internal error running cursor callback in command"); + } + auto status = fetchStatus->get(); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + uassert(128414, + "Internal error: no file names collected from sync source", + !_remoteFiles.empty()); + + // schedule file transfer callback + status = _scheduleWorkAndSaveHandle_inlock( + [this, onCompletionGuard](const executor::TaskExecutor::CallbackArgs& args) { + _transferFileCallback(args, 0lu, onCompletionGuard); + }, + &_transferFileHandle, + str::stream() << "_transferFileCallback-" << 0); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + }) + .wait(); + +} catch (const DBException&) { + // Report exception as an initial syncer failure. + stdx::unique_lock lock(_mutex); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, exceptionToStatus()); +} + +// tenant_migration_shard_merge_util.cpp : cloneFile +void InitialSyncerFCB::_transferFileCallback( + const executor::TaskExecutor::CallbackArgs& callbackArgs, + std::size_t fileIdx, + // NOLINTNEXTLINE(*-unnecessary-value-param) + std::shared_ptr onCompletionGuard) noexcept try { + // stdx::lock_guard lock(_mutex); + stdx::unique_lock lock(_mutex); + auto status = _checkForShutdownAndConvertStatus_inlock( + callbackArgs, "error transferring file from sync source"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + // create connection to the sync source + DBClientConnection syncSourceConn{true /* autoReconnect */}; + status = syncSourceConn.connect(_syncSource, "File copy-based initial sync", boost::none); + if (status.isOK()) { + status = replAuthenticate(&syncSourceConn) + .withContext(str::stream() << "Failed to authenticate to " << _syncSource); + } + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + // execute remote request + std::string remoteFileName = _remoteFiles[fileIdx].name; + size_t remoteFileSize = _remoteFiles[fileIdx].size; + auto currentBackupFileCloner = + std::make_unique(_backupId, + remoteFileName, + remoteFileSize, + getPathRelativeTo(remoteFileName, _remoteDBPath), + _sharedData.get(), + _syncSource, + &syncSourceConn, + _storage, + _writerPool); + auto cloneStatus = currentBackupFileCloner->run(); + if (!cloneStatus.isOK()) { + LOGV2_WARNING(128412, + "Failed to clone file", + "fileName"_attr = remoteFileName, + "error"_attr = cloneStatus); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, cloneStatus); + } else { + LOGV2_DEBUG(128413, 1, "Cloned file", "fileName"_attr = remoteFileName); + auto nextFileIdx = fileIdx + 1; + if (nextFileIdx < _remoteFiles.size()) { + // schedule next file cloning + auto status = _scheduleWorkAndSaveHandle_inlock( + [this, nextFileIdx, onCompletionGuard]( + const executor::TaskExecutor::CallbackArgs& args) { + _transferFileCallback(args, nextFileIdx, onCompletionGuard); + }, + &_transferFileHandle, + str::stream() << "_transferFileCallback-" << nextFileIdx); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + } else { + // all files are cloned - close backup cursor + auto status = _killBackupCursor_inlock(); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + if (MONGO_unlikely(initialSyncHangAfterCloningFiles.shouldFail())) { + // This could have been done with a scheduleWorkAt but this is used only by JS tests + // where we run with multiple threads so it's fine to spin on this thread. This log + // output is used in js tests so please leave it. + LOGV2(128447, + "initial sync - initialSyncHangAfterCloningFiles fail point " + "enabled. Blocking until fail point is disabled."); + lock.unlock(); + while (MONGO_unlikely(initialSyncHangAfterCloningFiles.shouldFail()) && + !_isShuttingDown()) { + mongo::sleepsecs(1); + } + lock.lock(); + } + // schedule next task + status = _scheduleWorkAndSaveHandle_inlock( + [this, onCompletionGuard](const executor::TaskExecutor::CallbackArgs& args) { + _switchToDownloadedCallback(args, onCompletionGuard); + }, + &_currentHandle, + "_switchToDownloadedCallback"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + } + } +} catch (const DBException&) { + // Report exception as an initial syncer failure. + stdx::unique_lock lock(_mutex); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, exceptionToStatus()); +} + +void InitialSyncerFCB::_switchToDownloadedCallback( + const executor::TaskExecutor::CallbackArgs& callbackArgs, + // NOLINTNEXTLINE(*-unnecessary-value-param) + std::shared_ptr onCompletionGuard) noexcept try { + ChangeStorageGuard changeStorageGuard(this); + stdx::unique_lock lock(_mutex); + auto status = _checkForShutdownAndConvertStatus_inlock(callbackArgs, + "_switchToDownloadedCallback cancelled"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + // Save list of files existing in dbpath. We will delete them later + LOGV2_DEBUG(128404, 2, "Reading the list of local files via $backupCursor"); + auto bfiles = _getBackupFiles(); + if (!bfiles.isOK()) { + LOGV2_DEBUG( + 128405, 2, "Failed to get the list of local files", "status"_attr = bfiles.getStatus()); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, bfiles.getStatus()); + return; + } + LOGV2_DEBUG( + 128406, 2, "Retrieved names of local files", "number"_attr = bfiles.getValue().size()); + _localFiles = bfiles.getValue(); + + auto opCtx = makeOpCtx(); + Lock::GlobalLock lk(opCtx.get(), MODE_X); + // retrieve the current on-disk replica set configuration + auto* rs = repl::ReplicationCoordinator::get(opCtx->getServiceContext()); + invariant(rs); + BSONObj savedRSConfig = rs->getConfigBSON(); + + // Switch storage to be pointing to the set of downloaded files + lock.unlock(); + status = _switchStorageLocation(opCtx.get(), + _cfgDBPath + "/.initialsync", + startup_recovery::StartupRecoveryMode::kReplicaSetMember); + lock.lock(); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + // do some cleanup + auto* consistencyMarkers = _replicationProcess->getConsistencyMarkers(); + consistencyMarkers->setMinValid(opCtx.get(), + OpTime{kTimestampOne, repl::OpTime::kUninitializedTerm}); + // TODO: when extend backup cursor is implemented use the last opTime retrieved from the sync + // source + consistencyMarkers->setOplogTruncateAfterPoint(opCtx.get(), _oplogEnd.getTimestamp()); + // clear and reset the initalSyncId + consistencyMarkers->clearInitialSyncId(opCtx.get()); + consistencyMarkers->setInitialSyncIdIfNotSet(opCtx.get()); + + ReplicationCoordinatorExternalStateImpl externalState( + opCtx->getServiceContext(), + DropPendingCollectionReaper::get(opCtx.get()), + StorageInterface::get(opCtx.get()), + ReplicationProcess::get(opCtx.get())); + // replace the lastVote document with a default one + status = StorageInterface::get(opCtx.get()) + ->dropCollection(opCtx.get(), NamespaceString::kLastVoteNamespace); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + status = externalState.createLocalLastVoteCollection(opCtx.get()); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + // replace the config with savedRSConfig + status = externalState.replaceLocalConfigDocument(opCtx.get(), savedRSConfig); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + // schedule next task + status = _scheduleWorkAndSaveHandle_inlock( + [this, onCompletionGuard](const executor::TaskExecutor::CallbackArgs& args) { + _executeRecovery(args, onCompletionGuard); + }, + &_currentHandle, + "_executeRecovery"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } +} catch (const DBException&) { + // Report exception as an initial syncer failure. + stdx::unique_lock lock(_mutex); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, exceptionToStatus()); +} + +void InitialSyncerFCB::_executeRecovery( + const executor::TaskExecutor::CallbackArgs& callbackArgs, + // NOLINTNEXTLINE(*-unnecessary-value-param) + std::shared_ptr onCompletionGuard) noexcept try { + stdx::lock_guard lock(_mutex); + auto status = + _checkForShutdownAndConvertStatus_inlock(callbackArgs, "_executeRecovery cancelled"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + auto opCtx = makeOpCtx(); + auto* serviceCtx = opCtx->getServiceContext(); + inReplicationRecovery(serviceCtx) = true; + ON_BLOCK_EXIT([serviceCtx] { + inReplicationRecovery(serviceCtx) = false; + }); + + _replicationProcess->getReplicationRecovery()->recoverFromOplogAsStandalone(opCtx.get(), true); + + // Aborts all active, two-phase index builds. + [[maybe_unused]] auto stoppedIndexBuilds = + IndexBuildsCoordinator::get(serviceCtx)->stopIndexBuildsForRollback(opCtx.get()); + + if (!stoppedIndexBuilds.empty()) { + LOGV2_WARNING(128498, + "Aborted active index builds during initial sync recovery", + "numIndexBuilds"_attr = stoppedIndexBuilds.size()); + } + + // Set stable timestamp + if (BSONObj lastEntry; + Helpers::getLast(opCtx.get(), NamespaceString::kRsOplogNamespace, lastEntry)) { + auto lastTime = repl::OpTimeAndWallTime::parse(lastEntry); + _storage->setStableTimestamp(serviceCtx, lastTime.opTime.getTimestamp()); + } + + // schedule next task + status = _scheduleWorkAndSaveHandle_inlock( + [this, onCompletionGuard](const executor::TaskExecutor::CallbackArgs& args) { + _switchToDummyToDBPathCallback(args, onCompletionGuard); + }, + &_currentHandle, + "_switchToDummyToDBPathCallback"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } +} catch (const DBException&) { + // Report exception as an initial syncer failure. + stdx::unique_lock lock(_mutex); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, exceptionToStatus()); +} + +void InitialSyncerFCB::_switchToDummyToDBPathCallback( + const executor::TaskExecutor::CallbackArgs& callbackArgs, + // NOLINTNEXTLINE(*-unnecessary-value-param) + std::shared_ptr onCompletionGuard) noexcept try { + ChangeStorageGuard changeStorageGuard(this); + stdx::unique_lock lock(_mutex); + auto status = _checkForShutdownAndConvertStatus_inlock( + callbackArgs, "_switchToDummyToDBPathCallback cancelled"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + auto opCtx = makeOpCtx(); + Lock::GlobalLock lk(opCtx.get(), MODE_X); + // Switch storage to a dummy location + lock.unlock(); + status = _switchStorageLocation(opCtx.get(), _cfgDBPath + "/.initialsync/.dummy"); + lock.lock(); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + // Delete the list of files obtained from the local backup cursor + status = _deleteLocalFiles(); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + // Move the files from the download location to the normal dbpath + boost::filesystem::path cfgDBPath(_cfgDBPath); + status = _moveFiles(cfgDBPath / ".initialsync", cfgDBPath); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + // Switch storage back to the normal dbpath + lock.unlock(); + status = _switchStorageLocation( + opCtx.get(), _cfgDBPath, startup_recovery::StartupRecoveryMode::kReplicaSetMember); + lock.lock(); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + // schedule next task + status = _scheduleWorkAndSaveHandle_inlock( + [this, onCompletionGuard](const executor::TaskExecutor::CallbackArgs& args) { + _finalizeAndCompleteCallback(args, onCompletionGuard); + }, + &_currentHandle, + "_finalizeAndCompleteCallback"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } +} catch (const DBException&) { + // Report exception as an initial syncer failure. + stdx::unique_lock lock(_mutex); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, exceptionToStatus()); +} + +void InitialSyncerFCB::_finalizeAndCompleteCallback( + const executor::TaskExecutor::CallbackArgs& callbackArgs, + // NOLINTNEXTLINE(*-unnecessary-value-param) + std::shared_ptr onCompletionGuard) noexcept try { + stdx::lock_guard lock(_mutex); + auto status = _checkForShutdownAndConvertStatus_inlock( + callbackArgs, "_finalizeAndCompleteCallback cancelled"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } + + { + auto opCtx = makeOpCtx(); + // Attach JournalListener to the new instance of storage engine + auto* journalListener = _dataReplicatorExternalState->getReplicationJournalListener(); + opCtx->getServiceContext()->getStorageEngine()->setJournalListener(journalListener); + } + + // TODO: set value of _lastApplied or provide another instance of OpTimeAndWallTime + // TODO: fix this temporary solution + _lastApplied.opTime = _oplogEnd; + _lastApplied.wallTime = Date_t::fromMillisSinceEpoch(_oplogEnd.getSecs() * 1000); + // Successfully complete initial sync + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, _lastApplied); +} catch (const DBException&) { + // Report exception as an initial syncer failure. + stdx::unique_lock lock(_mutex); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, exceptionToStatus()); +} + +// template +// void InitialSyncerFCB::_(const executor::TaskExecutor::CallbackArgs& callbackArgs, +// // NOLINTNEXTLINE(*-unnecessary-value-param) +// std::shared_ptr onCompletionGuard) noexcept try { +// stdx::lock_guard lock(_mutex); +// auto status = _checkForShutdownAndConvertStatus_inlock(callbackArgs, "error message"); +// if (!status.isOK()) { +// onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); +// return; +// } +// +// // schedule next task +// status = _scheduleWorkAndSaveHandle_inlock( +// [this, onCompletionGuard](const executor::TaskExecutor::CallbackArgs& args) { +// _nextTaskCallback(args, onCompletionGuard); +// }, +// &_currentHandle, +// "task name"); +// if (!status.isOK()) { +// onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); +// return; +// } +//} catch (const DBException&) { +// // Report exception as an initial syncer failure. +// stdx::unique_lock lock(_mutex); +// onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, exceptionToStatus()); +//} + +// debugging template +// onCompletionGuard->setResultAndCancelRemainingWork_inlock( +// lock, +// {ErrorCodes::NotImplemented, +// "All files cloned; cancel FCBIS for debugging reason"}); + + +std::string InitialSyncerFCB::Stats::toString() const { + return toBSON().toString(); +} + +BSONObj InitialSyncerFCB::Stats::toBSON() const { + BSONObjBuilder bob; + append(&bob); + return bob.obj(); +} + +void InitialSyncerFCB::Stats::append(BSONObjBuilder* builder) const { + builder->appendNumber("failedInitialSyncAttempts", + static_cast(failedInitialSyncAttempts)); + builder->appendNumber("maxFailedInitialSyncAttempts", + static_cast(maxFailedInitialSyncAttempts)); + + auto e = exec.lock(); + if (initialSyncStart != Date_t()) { + builder->appendDate("initialSyncStart", initialSyncStart); + auto elapsedDurationEnd = e ? e->now() : Date_t::now(); + if (initialSyncEnd != Date_t()) { + builder->appendDate("initialSyncEnd", initialSyncEnd); + elapsedDurationEnd = initialSyncEnd; + } + long long elapsedMillis = + duration_cast(elapsedDurationEnd - initialSyncStart).count(); + builder->appendNumber("totalInitialSyncElapsedMillis", elapsedMillis); + } + + BSONArrayBuilder arrBuilder(builder->subarrayStart("initialSyncAttempts")); + for (auto const& attemptInfo : initialSyncAttemptInfos) { + arrBuilder.append(attemptInfo.toBSON()); + } + arrBuilder.doneFast(); +} + +std::string InitialSyncerFCB::InitialSyncAttemptInfo::toString() const { + return toBSON().toString(); +} + +BSONObj InitialSyncerFCB::InitialSyncAttemptInfo::toBSON() const { + BSONObjBuilder bob; + append(&bob); + return bob.obj(); +} + +void InitialSyncerFCB::InitialSyncAttemptInfo::append(BSONObjBuilder* builder) const { + builder->appendNumber("durationMillis", durationMillis); + builder->append("status", status.toString()); + builder->append("syncSource", syncSource.toString()); + if (rollBackId >= 0) { + builder->append("rollBackId", rollBackId); + } + builder->append("operationsRetried", operationsRetried); + builder->append("totalTimeUnreachableMillis", totalTimeUnreachableMillis); +} + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/initial_syncer_fcb.h b/src/mongo/db/repl/initial_syncer_fcb.h new file mode 100644 index 0000000000000..44f82dbe3d683 --- /dev/null +++ b/src/mongo/db/repl/initial_syncer_fcb.h @@ -0,0 +1,627 @@ +/*====== +This file is part of Percona Server for MongoDB. + +Copyright (C) 2024-present Percona and/or its affiliates. All rights reserved. + + This program is free software: you can redistribute it and/or modify + it under the terms of the Server Side Public License, version 1, + as published by MongoDB, Inc. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Server Side Public License for more details. + + You should have received a copy of the Server Side Public License + along with this program. If not, see + . + + As a special exception, the copyright holders give permission to link the + code of portions of this program with the OpenSSL library under certain + conditions as described in each individual source file and distribute + linked combinations including the program with the OpenSSL library. You + must comply with the Server Side Public License in all respects for + all of the code used other than as permitted herein. If you modify file(s) + with this exception, you may extend this exception to your version of the + file(s), but you are not obligated to do so. If you do not wish to do so, + delete this exception statement from your version. If you delete this + exception statement from all source files in the program, then also delete + it in the license file. +======= */ + + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include + +#include "mongo/base/status.h" +#include "mongo/base/status_with.h" +#include "mongo/base/string_data.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/client/dbclient_connection.h" +#include "mongo/client/fetcher.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/callback_completion_guard.h" +#include "mongo/db/repl/data_replicator_external_state.h" +#include "mongo/db/repl/initial_sync_shared_data.h" +#include "mongo/db/repl/initial_syncer_interface.h" +#include "mongo/db/repl/multiapplier.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/rollback_checker.h" +#include "mongo/db/repl/tenant_migration_shared_data.h" +#include "mongo/db/startup_recovery.h" +#include "mongo/executor/scoped_task_executor.h" +#include "mongo/executor/task_executor.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/platform/mutex.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/concurrency/with_lock.h" +#include "mongo/util/duration.h" +#include "mongo/util/net/hostandport.h" +#include "mongo/util/time_support.h" +#include "mongo/util/uuid.h" + +namespace mongo { +namespace repl { + +struct InitialSyncState; +class ReplicationProcess; +class StorageInterface; + +/** + * The initial syncer provides services to keep collection in sync by replicating + * changes via an oplog source to the local system storage. + * + * This class will use existing machinery like the Executor to schedule work and + * network tasks, as well as provide serial access and synchronization of state. + * + * + * Entry Points: + * -- startup: Start initial sync. + */ +class InitialSyncerFCB : public InitialSyncerInterface { +public: + InitialSyncerFCB(const InitialSyncerFCB&) = delete; + InitialSyncerFCB& operator=(const InitialSyncerFCB&) = delete; + InitialSyncerFCB(InitialSyncerFCB&&) = delete; + InitialSyncerFCB& operator=(InitialSyncerFCB&&) = delete; + + /** + * Callback completion guard for initial syncer. + */ + using OnCompletionGuard = CallbackCompletionGuard>; + + struct InitialSyncAttemptInfo { + int durationMillis; + Status status; + HostAndPort syncSource; + int rollBackId; + int operationsRetried; + int totalTimeUnreachableMillis; + + std::string toString() const; + BSONObj toBSON() const; + void append(BSONObjBuilder* builder) const; + }; + + struct Stats { + std::uint32_t failedInitialSyncAttempts{0}; + std::uint32_t maxFailedInitialSyncAttempts{0}; + Date_t initialSyncStart; + Date_t initialSyncEnd; + std::vector initialSyncAttemptInfos; + std::weak_ptr exec; + + std::string toString() const; + BSONObj toBSON() const; + void append(BSONObjBuilder* builder) const; + }; + + InitialSyncerFCB(InitialSyncerInterface::Options opts, + std::unique_ptr dataReplicatorExternalState, + ThreadPool* writerPool, + StorageInterface* storage, + ReplicationProcess* replicationProcess, + const OnCompletionFn& onCompletion); + + ~InitialSyncerFCB() override; + + /** + * Returns true if an initial sync is currently running or in the process of shutting down. + */ + bool isActive() const; + + std::string getInitialSyncMethod() const final; + + bool allowLocalDbAccess() const final { + return true; + } + + Status startup(OperationContext* opCtx, std::uint32_t maxAttempts) noexcept final; + + Status shutdown() final; + + void join() final; + + /** + * Returns internal state in a loggable format. + */ + std::string getDiagnosticString() const; + + BSONObj getInitialSyncProgress() const final; + + void cancelCurrentAttempt() final; + + // State transitions: + // PreStart --> Running --> ShuttingDown --> Complete + // It is possible to skip intermediate states. For example, calling shutdown() when the data + // replicator has not started will transition from PreStart directly to Complete. + enum class State { kPreStart, kRunning, kShuttingDown, kComplete }; + + /** + * Returns current initial syncer state. + * For testing only. + */ + State getState_forTest() const; + + /** + * Returns the wall clock time component of _lastApplied. + * For testing only. + */ + Date_t getWallClockTime_forTest() const; + + /** + * Sets the allowed outage duration in _sharedData. + * For testing only. + */ + void setAllowedOutageDuration_forTest(Milliseconds allowedOutageDuration); + +private: + /** + * Attributes of remote file received from $backupCursor + */ + struct BackupFile { + std::string name; + size_t size; + }; + + /** + * Guard storage changing functions from being deadlocked by shutdown. + */ + class ChangeStorageGuard { + public: + ChangeStorageGuard(InitialSyncerFCB* initialSyncer) : _initialSyncer(initialSyncer) { + stdx::lock_guard lk(_initialSyncer->_mutex); + _initialSyncer->_inStorageChange = true; + } + + ~ChangeStorageGuard() { + { + stdx::lock_guard lk(_initialSyncer->_mutex); + _initialSyncer->_inStorageChange = false; + } + _initialSyncer->_inStorageChangeCondition.notify_all(); + } + + ChangeStorageGuard(const ChangeStorageGuard&) = delete; + ChangeStorageGuard& operator=(const ChangeStorageGuard&) = delete; + ChangeStorageGuard(ChangeStorageGuard&&) = delete; + ChangeStorageGuard& operator=(ChangeStorageGuard&&) = delete; + + private: + InitialSyncerFCB* _initialSyncer; + }; + + /** + * Returns true if we are still processing initial sync tasks (_state is either Running or + * Shutdown). + */ + bool _isActive_inlock() const; + + /** + * Cancels all outstanding work. + * Used by shutdown() and CompletionGuard::setResultAndCancelRemainingWork(). + */ + void _cancelRemainingWork_inlock(); + + /** + * Returns true if the initial syncer has received a shutdown request (_state is ShuttingDown). + */ + bool _isShuttingDown() const; + bool _isShuttingDown_inlock() const; + + /** + * Initial sync flowchart: + * + * startup() + * | + * | + * V + * _setUp_inlock() + * | + * | + * V + * _startInitialSyncAttemptCallback() + * | + * | + * |<-------+ + * | | + * | | (bad sync source) + * | | + * V | + * _chooseSyncSourceCallback() + * | + * | + * | (good sync source found) + * | + * | + * V + * _truncateOplogAndDropReplicatedDatabases() + * | + * | + * V + * _rollbackCheckerResetCallback() + * | + * | + * V + * _lastOplogEntryFetcherCallbackForDefaultBeginFetchingOpTime() [removed] + * | + * | + * V + * _getBeginFetchingOpTimeCallback() [removed] + * | + * | + * V + * _lastOplogEntryFetcherCallbackForBeginApplyingTimestamp() [removed] + * | + * | + * V + * _fcvFetcherCallback() + * | + * | + * +------------------------------+ + * | | + * | | + * V V + * _oplogFetcherCallback[removed] _allDatabaseClonerCallback [removed] + * | | + * | | + * | V + * | _lastOplogEntryFetcherCallbackForStopTimestamp() [removed] + * | | | + * | | | + * | (no ops to apply) | | (have ops to apply) + * | | | + * | | V + * | | _getNextApplierBatchCallback() [removed] + * | | | ^ + * | | | | + * | | | (end ts not reached) + * | | | | + * | | V | + * | | _multiApplierCallback()-----+ [removed] + * | | | + * | | | + * | (reached end timestamp) + * | | | + * | V V + * | _rollbackCheckerCheckForRollbackCallback() [removed] + * | | + * | | + * +------------------------------+ + * | + * | + * V + * _finishInitialSyncAttempt() + * | + * | + * V + * _finishCallback() + */ + + /** + * Sets up internal state to begin initial sync. + */ + void _setUp_inlock(OperationContext* opCtx, std::uint32_t initialSyncMaxAttempts); + + /** + * Tears down internal state before reporting final status to caller. + */ + void _tearDown_inlock(OperationContext* opCtx, + const StatusWith& lastApplied); + + /** + * Callback to start a single initial sync attempt. + */ + void _startInitialSyncAttemptCallback(const executor::TaskExecutor::CallbackArgs& callbackArgs, + std::uint32_t initialSyncAttempt, + std::uint32_t initialSyncMaxAttempts) noexcept; + + /** + * Callback to obtain sync source from sync source selector. + * For every initial sync attempt, we will try up to 'numInitialSyncConnectAttempts' times (at + * an interval of '_opts.syncSourceRetryWait' ms) to obtain a valid sync source before giving up + * and returning ErrorCodes::InitialSyncOplogSourceMissing. + */ + void _chooseSyncSourceCallback(const executor::TaskExecutor::CallbackArgs& callbackArgs, + std::uint32_t chooseSyncSourceAttempt, + std::uint32_t chooseSyncSourceMaxAttempts, + std::shared_ptr onCompletionGuard) noexcept; + + /** + * Callback to execute backup cursor on the sync source + */ + void _fetchBackupCursorCallback(const executor::TaskExecutor::CallbackArgs& callbackArgs, + std::shared_ptr onCompletionGuard) noexcept; + + /** + * Callback to transfer file from the sync source + */ + void _transferFileCallback(const executor::TaskExecutor::CallbackArgs& callbackArgs, + std::size_t fileIdx, + std::shared_ptr onCompletionGuard) noexcept; + + /** + * Switch to downloaded files and do some cleanup of the 'local' db + */ + void _switchToDownloadedCallback(const executor::TaskExecutor::CallbackArgs& callbackArgs, + std::shared_ptr onCompletionGuard) noexcept; + + /** + * Replay the oplog on the instance recoverd from backup + * Scheduled from _switchToDownloadedCallback + * Schedules _switchToDummyToDBPathCallback + */ + void _executeRecovery(const executor::TaskExecutor::CallbackArgs& callbackArgs, + std::shared_ptr onCompletionGuard) noexcept; + + /** + * Switch to dummy location, remove local files from dbpath, move downloaded files to the dbpath + * Switch back to dbpath + */ + void _switchToDummyToDBPathCallback( + const executor::TaskExecutor::CallbackArgs& callbackArgs, + std::shared_ptr onCompletionGuard) noexcept; + + /** + * Finalize and complete inital sync + */ + void _finalizeAndCompleteCallback( + const executor::TaskExecutor::CallbackArgs& callbackArgs, + std::shared_ptr onCompletionGuard) noexcept; + + /** + * This function does the following: + * 1.) Truncate oplog. + * 2.) Drop user databases (replicated dbs). + */ + Status _truncateOplogAndDropReplicatedDatabases(); + + /** + * Callback for rollback checker's first replSetGetRBID command before starting data cloning. + */ + void _rollbackCheckerResetCallback(const RollbackChecker::Result& result, + std::shared_ptr onCompletionGuard); + + /** + * Callback for the '_fCVFetcher'. A successful response lets us check if the remote node + * is in a currently acceptable fCV and if it has a 'targetVersion' set. + */ + void _fcvFetcherCallback(const StatusWith& result, + std::shared_ptr onCompletionGuard, + const OpTime& lastOpTime, + OpTime& beginFetchingOpTime); + + /** + * Reports result of current initial sync attempt. May schedule another initial sync attempt + * depending on shutdown state and whether we've exhausted all initial sync retries. + */ + void _finishInitialSyncAttempt(const StatusWith& lastApplied); + + /** + * Invokes completion callback and transitions state to State::kComplete. + */ + void _finishCallback(StatusWith lastApplied); + + // Obtains a valid sync source from the sync source selector. + // Returns error if a sync source cannot be found. + StatusWith _chooseSyncSource_inlock(); + + void _appendInitialSyncProgressMinimal_inlock(BSONObjBuilder* bob) const; + BSONObj _getInitialSyncProgress_inlock() const; + + /** + * Check if a status is one which means there's a retriable error and we should retry the + * current operation, and records whether an operation is currently being retried. Note this + * can only handle one operation at a time (i.e. it should not be used in both parts of the + * "split" section of Initial Sync) + */ + bool _shouldRetryError(WithLock lk, Status status); + + /** + * Indicates we are no longer handling a retriable error. + */ + void _clearRetriableError(WithLock lk); + + /** + * Checks the given status (or embedded status inside the callback args) and current data + * replicator shutdown state. If the given status is not OK or if we are shutting down, returns + * a new error status that should be passed to _finishCallback. The reason in the new error + * status will include 'message'. + * Otherwise, returns Status::OK(). + */ + Status _checkForShutdownAndConvertStatus_inlock( + const executor::TaskExecutor::CallbackArgs& callbackArgs, const std::string& message); + Status _checkForShutdownAndConvertStatus_inlock(const Status& status, + const std::string& message); + + /** + * Schedules work to be run by the task executor. + * Saves handle if work was successfully scheduled. + * Returns scheduleWork status (without the handle). + */ + Status _scheduleWorkAndSaveHandle_inlock(executor::TaskExecutor::CallbackFn work, + executor::TaskExecutor::CallbackHandle* handle, + const std::string& name); + Status _scheduleWorkAtAndSaveHandle_inlock(Date_t when, + executor::TaskExecutor::CallbackFn work, + executor::TaskExecutor::CallbackHandle* handle, + const std::string& name); + + /** + * Cancels task executor callback handle if not null. + */ + void _cancelHandle_inlock(executor::TaskExecutor::CallbackHandle handle); + + /** + * Starts up component and checks initial syncer's shutdown state at the same time. + * If component's startup() fails, resets 'component' (which is assumed to be a unique_ptr + * to the component type). + */ + template + Status _startupComponent_inlock(Component& component); + + /** + * Shuts down component if not null. + */ + template + void _shutdownComponent_inlock(Component& component); + + /** + * Temporary location to declare all FCB-related private methods + * TODO: reorganize + */ + Status _deleteLocalFiles(); + + Status _moveFiles(const boost::filesystem::path& sourceDir, + const boost::filesystem::path& destDir); + + StatusWith> _getBackupFiles(); + + Status _switchStorageLocation( + OperationContext* opCtx, + const std::string& newLocation, + boost::optional = boost::none); + + Status _killBackupCursor_inlock(); + + // Counts how many documents have been refetched from the source in the current batch. + AtomicWord _fetchCount; + + // + // All member variables are labeled with one of the following codes indicating the + // synchronization rules for accessing them. + // + // (R) Read-only in concurrent operation; no synchronization required. + // (S) Self-synchronizing; access in any way from any context. + // (M) Reads and writes guarded by _mutex + // (X) Reads and writes must be performed in a callback in _exec + // (MX) Must hold _mutex and be in a callback in _exec to write; must either hold + // _mutex or be in a callback in _exec to read. + + mutable Mutex _mutex = MONGO_MAKE_LATCH("InitialSyncerFCB::_mutex"); // (S) + const InitialSyncerInterface::Options _opts; // (R) + std::unique_ptr _dataReplicatorExternalState; // (R) + std::shared_ptr _exec; // (R) + std::unique_ptr _attemptExec; // (X) + // The executor that the Cloner thread runs on. In production code this is the same as _exec, + // but for unit testing, _exec is single-threaded and our NetworkInterfaceMock runs it in + // lockstep with the unit test code. If we pause the cloners using failpoints + // NetworkInterfaceMock is unaware of this and this causes our unit tests to deadlock. + std::shared_ptr _clonerExec; // (R) + std::unique_ptr _clonerAttemptExec; // (X) + ThreadPool* _writerPool; // (R) + StorageInterface* _storage; // (R) + ReplicationProcess* _replicationProcess; // (S) + std::vector _localFiles; // TODO: + std::vector _remoteFiles; // TODO: + UUID _backupId; // TODO: + std::string _remoteDBPath; // TODO: + OpTime _oplogEnd; // TODO: + const std::string _cfgDBPath; // TODO: + std::unique_ptr _backupCursorInfo; // TODO: + + // This is invoked with the final status of the initial sync. If startup() fails, this callback + // is never invoked. The caller gets the last applied optime when the initial sync completes + // successfully or an error status. + // '_onCompletion' is cleared on completion (in _finishCallback()) in order to release any + // resources that might be held by the callback function object. + OnCompletionFn _onCompletion; // (M) + + // Handle to currently scheduled _startInitialSyncAttemptCallback() task. + executor::TaskExecutor::CallbackHandle _startInitialSyncAttemptHandle; // (M) + + // Handle to currently scheduled _chooseSyncSourceCallback() task. + executor::TaskExecutor::CallbackHandle _chooseSyncSourceHandle; // (M) + + // Handle to currently scheduled _fetchBackupCursorCallback() task. + executor::TaskExecutor::CallbackHandle _fetchBackupCursorHandle; // (M) + + // Handle to currently scheduled _transferFileCallback() task. + executor::TaskExecutor::CallbackHandle _transferFileHandle; // (M) + + // Handle to currently scheduled task (one of several tasks in the file move/dbpath change + // sequence). + executor::TaskExecutor::CallbackHandle _currentHandle; // (M) + + // RollbackChecker to get rollback ID before and after each initial sync attempt. + std::unique_ptr _rollbackChecker; // (M) + + // Handle returned from RollbackChecker::reset(). + RollbackChecker::CallbackHandle _getBaseRollbackIdHandle; // (M) + + // Handle returned from RollbackChecker::checkForRollback(). + RollbackChecker::CallbackHandle _getLastRollbackIdHandle; // (M) + + // The operation, if any, currently being retried because of a network error. + InitialSyncSharedData::RetryableOperation _retryingOperation; // (M) + + std::unique_ptr _initialSyncState; // (M) + std::unique_ptr _beginFetchingOpTimeFetcher; // (S) + std::unique_ptr _fCVFetcher; // (S) + std::unique_ptr _backupCursorFetcher; // (S) + std::unique_ptr _applier; // (M) + HostAndPort _syncSource; // (M) + std::unique_ptr _client; // (M) + OpTime _lastFetched; // (MX) + OpTimeAndWallTime _lastApplied; // (MX) + + // Used to signal changes in _state. + mutable stdx::condition_variable _stateCondition; + + // Current initial syncer state. See comments for State enum class for details. + State _state = State::kPreStart; // (M) + + // Used to create the DBClientConnection for the cloners + CreateClientFn _createClientFn; + + // Contains stats on the current initial sync request (includes all attempts). + // To access these stats in a user-readable format, use getInitialSyncProgress(). + Stats _stats; // (M) + + // Data shared by cloners and fetcher. Follow InitialSyncSharedData synchronization rules. + std::unique_ptr _sharedData; // (S) + + // Amount of time an outage is allowed to continue before the initial sync attempt is marked + // as failed. + Milliseconds _allowedOutageDuration; // (M) + + // The initial sync attempt has been canceled + bool _attemptCanceled = false; // (X) + + // Conditional variable to wait for end of storage change + stdx::condition_variable _inStorageChangeCondition; // (M) + bool _inStorageChange = false; // (M) +}; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 8794751136712..20dce19c432ad 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -789,6 +789,18 @@ void ReplicationCoordinatorImpl::_startInitialSync( "logical initial sync.", "initialSyncMethod"_attr = initialSyncMethod, "error"_attr = swInitialSyncer.getStatus().reason()); + if (initialSyncMethod == "fileCopyBased") { + LOGV2_WARNING( + 128490, + "Support for the file copy-based intial sync (FCBIS) is available in " + "the Percona Supported Builds of MongoDB. You can compile Percona " + "Server for MongoDB with FCBIS yourself by following the build from " + "source guide " + "(https://docs.percona.com/percona-server-for-mongodb/7.0/install/" + "source.html). You can also subscribe to support to receive Percona " + "Supported Builds, see (https://www.percona.com/services/support) for " + "more information."); + } swInitialSyncer = createInitialSyncer(std::string("logical")); } initialSyncerCopy = uassertStatusOK(swInitialSyncer);