Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/libcommon/utility/utility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ static std::default_random_engine gen(rd());

std::string CommonUtility::generateRandomString(const char *charArray, std::uniform_int_distribution<int> &distrib,
const int length /*= 10*/) {
const std::lock_guard<std::mutex> lock(_generateRandomStringMutex);
const std::scoped_lock lock(_generateRandomStringMutex);

std::string tmp;
tmp.reserve(static_cast<size_t>(length));
Expand Down
2 changes: 1 addition & 1 deletion src/libsyncengine/jobs/abstractjob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ std::mutex AbstractJob::_nextJobIdMutex;

AbstractJob::AbstractJob() :
_logger(Log::instance()->getLogger()) {
const std::lock_guard lock(_nextJobIdMutex);
const std::scoped_lock lock(_nextJobIdMutex);
_jobId = _nextJobId++;

if (ParametersCache::isExtendedLogEnabled()) {
Expand Down
14 changes: 7 additions & 7 deletions src/libsyncengine/jobs/network/abstractnetworkjob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ void AbstractNetworkJob::unzip(std::istream &is, std::stringstream &ss) {
}

void AbstractNetworkJob::createSession(const Poco::URI &uri) {
const std::scoped_lock<std::recursive_mutex> lock(_mutexSession);
const std::scoped_lock lock(_mutexSession);

if (_session) {
// Redirection case
Expand All @@ -282,7 +282,7 @@ void AbstractNetworkJob::createSession(const Poco::URI &uri) {
}

void AbstractNetworkJob::clearSession() {
const std::scoped_lock<std::recursive_mutex> lock(_mutexSession);
const std::scoped_lock lock(_mutexSession);

if (_session) {
try {
Expand Down Expand Up @@ -334,7 +334,7 @@ ExitInfo AbstractNetworkJob::sendRequest(const Poco::URI &uri) {
// Send request, retrieve an open stream
std::vector<std::reference_wrapper<std::ostream>> stream;
try {
const std::scoped_lock<std::recursive_mutex> lock(_mutexSession);
const std::scoped_lock lock(_mutexSession);
if (_session) {
stream.push_back(_session->sendRequest(req));
if (ioOrLogicalErrorOccurred(stream[0].get())) {
Expand All @@ -350,7 +350,7 @@ ExitInfo AbstractNetworkJob::sendRequest(const Poco::URI &uri) {
// Send data
std::string::const_iterator itBegin = _data.begin();
while (itBegin != _data.end()) {
const std::scoped_lock<std::recursive_mutex> lock(_mutexSession);
const std::scoped_lock lock(_mutexSession);
if (isAborted()) {
LOG_DEBUG(_logger, "Request " << jobId() << ": aborting HTTPS session");
return {};
Expand Down Expand Up @@ -380,7 +380,7 @@ ExitInfo AbstractNetworkJob::sendRequest(const Poco::URI &uri) {

ExitInfo AbstractNetworkJob::receiveResponseFromSession(StreamVector &stream) {
try {
const std::scoped_lock<std::recursive_mutex> lock(_mutexSession);
const std::scoped_lock lock(_mutexSession);
if (_session) {
(void) stream.emplace_back(_session->receiveResponse(_httpResponse));
if (ioOrLogicalErrorOccurred(stream[0].get())) {
Expand Down Expand Up @@ -419,7 +419,7 @@ ExitInfo AbstractNetworkJob::receiveResponse(const Poco::URI &uri) {
switch (httpResponse().getStatus()) {
case Poco::Net::HTTPResponse::HTTP_OK: {
try {
const std::scoped_lock<std::recursive_mutex> lock(_mutexSession);
const std::scoped_lock lock(_mutexSession);
return handleResponse(stream[0].get());
} catch (const std::exception &e) {
LOG_WARN(_logger, "handleResponse exception: " << errorText(e));
Expand Down Expand Up @@ -522,7 +522,7 @@ ExitInfo AbstractNetworkJob::followRedirect() {
}

ExitInfo AbstractNetworkJob::processSocketError(const std::string &msg, const UniqueId jobId) {
const std::scoped_lock<std::recursive_mutex> lock(_mutexSession);
const std::scoped_lock lock(_mutexSession);
if (_session) {
int err = _session->socket().getError();
std::string errMsg = Poco::Error::getMessage(err);
Expand Down
2 changes: 1 addition & 1 deletion src/libsyncengine/login/login.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ ExitCode Login::refreshToken(const std::string &keychainKey, ApiToken &apiToken,

const std::chrono::time_point<std::chrono::steady_clock> tokenLastUpdate = _info[apiToken.userId()]._lastTokenUpdateTime;

const std::lock_guard lock(_info[apiToken.userId()]._mutex);
const std::scoped_lock lock(_info[apiToken.userId()]._mutex);

if (_info[apiToken.userId()]._lastTokenUpdateTime > tokenLastUpdate) {
LOG_INFO(Log::instance()->getLogger(), "Token already refreshed in another thread");
Expand Down
13 changes: 11 additions & 2 deletions src/libsyncengine/propagation/executor/executorworker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,11 @@ ExitInfo ExecutorWorker::generateCreateJob(SyncOpPtr syncOp, std::shared_ptr<Syn
<< exitInfo);
_syncPal->setRestart(true);

const std::scoped_lock lock(SyncPal::updateTreesMutex);
if (!_syncPal->updateTree(ReplicaSide::Local)) {
return ExitCode::LogicError;
}

if (!_syncPal->updateTree(ReplicaSide::Local)->deleteNode(syncOp->affectedNode())) {
LOGW_SYNCPAL_WARN(_logger, L"Error in UpdateTree::deleteNode: node name="
<< Utility::formatSyncName(syncOp->affectedNode()->name()) << L" "
Expand Down Expand Up @@ -1453,8 +1458,7 @@ ExitInfo ExecutorWorker::handleForbiddenAction(SyncOpPtr syncOp, const SyncPath
absoluteLocalFilePath, PlatformInconsistencyCheckerUtility::SuffixType::Blacklisted)) {
LOGW_SYNCPAL_WARN(_logger, L"PlatformInconsistencyCheckerUtility::renameLocalFile failed for "
<< Utility::formatSyncPath(absoluteLocalFilePath));
_syncPal->handleAccessDeniedItem(relativeLocalPath);
return ExitCode::Ok;
return _syncPal->handleAccessDeniedItem(relativeLocalPath);
}
removeFromDb = false;
break;
Expand Down Expand Up @@ -1556,6 +1560,11 @@ ExitInfo ExecutorWorker::propagateConflictToDbAndTree(SyncOpPtr syncOp, bool &pr
}
}
// Remove node from update tree
const std::scoped_lock lock(SyncPal::updateTreesMutex);
if (!_syncPal->updateTree(ReplicaSide::Local) || !_syncPal->updateTree(ReplicaSide::Remote)) {
return ExitCode::LogicError;
}

if (!_syncPal->updateTree(ReplicaSide::Local)->deleteNode(syncOp->conflict().localNode())) {
LOGW_SYNCPAL_WARN(_logger, L"Error in UpdateTree::deleteNode: node "
<< Utility::formatSyncName(syncOp->conflict().localNode()->name()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ void ConflictFinderWorker::execute() {
void ConflictFinderWorker::findConflicts() {
std::vector<std::shared_ptr<Node>> remoteMoveDirNodes;
std::vector<std::shared_ptr<Node>> localMoveDirNodes;
findConflictsInTree(_syncPal->updateTree(ReplicaSide::Local), _syncPal->updateTree(ReplicaSide::Remote), localMoveDirNodes,
remoteMoveDirNodes);
{
const std::scoped_lock lock(SyncPal::updateTreesMutex);
if (!_syncPal->updateTree(ReplicaSide::Local)) return;
findConflictsInTree(_syncPal->updateTree(ReplicaSide::Local), _syncPal->updateTree(ReplicaSide::Remote),
localMoveDirNodes, remoteMoveDirNodes);
}

// Move-Move Cycle
const std::optional<std::vector<Conflict>> moveMoveCycleList =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ ExitCode ConflictResolverWorker::generateLocalRenameOperation(const Conflict &co
if (!generateConflictedName(conflict.localNode(), newName)) {
// New path is too long, move file to the root directory
// TODO : we need to discuss this behavior again!
const std::scoped_lock lock(SyncPal::updateTreesMutex);
if (!_syncPal->updateTree(ReplicaSide::Local)) return ExitCode::LogicError;
op->setNewParentNode(_syncPal->updateTree(ReplicaSide::Local)->rootNode());
}
op->setNewName(newName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,23 @@ void OperationGeneratorWorker::execute() {
_syncPal->_syncOps->clear();
_bytesToDownload = 0;

// Mark all nodes "Unprocessed"
_syncPal->updateTree(ReplicaSide::Local)->markAllNodesUnprocessed();
_syncPal->updateTree(ReplicaSide::Remote)->markAllNodesUnprocessed();
{
const std::scoped_lock lock(SyncPal::updateTreesMutex);
if (!_syncPal->updateTree(ReplicaSide::Local) || !_syncPal->updateTree(ReplicaSide::Remote)) {
setDone(ExitCode::LogicError);
return;
}

// Mark all nodes "Unprocessed"
_syncPal->updateTree(ReplicaSide::Local)->markAllNodesUnprocessed();
_syncPal->updateTree(ReplicaSide::Remote)->markAllNodesUnprocessed();

_deletedNodes.clear();
_deletedNodes.clear();

// Initiate breadth-first search with root nodes from both update trees
_queuedToExplore.push(_syncPal->updateTree(ReplicaSide::Local)->rootNode());
_queuedToExplore.push(_syncPal->updateTree(ReplicaSide::Remote)->rootNode());
// Initiate breadth-first search with root nodes from both update trees
_queuedToExplore.push(_syncPal->updateTree(ReplicaSide::Local)->rootNode());
_queuedToExplore.push(_syncPal->updateTree(ReplicaSide::Remote)->rootNode());
}

// Explore both update trees
sentry::pTraces::counterScoped::GenerateItemOperations perfMonitor(syncDbId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ void PlatformInconsistencyCheckerWorker::execute() {
checkTree(ReplicaSide::Remote);
checkTree(ReplicaSide::Local);

const std::scoped_lock lock(SyncPal::updateTreesMutex);
if (!_syncPal->updateTree(ReplicaSide::Local) || !_syncPal->updateTree(ReplicaSide::Remote)) {
setDone(ExitCode::LogicError);
return;
}

for (const auto &[remoteId, localId]: _idsToBeRemoved) {
if (!remoteId.empty() && !_syncPal->updateTree(ReplicaSide::Remote)->deleteNode(remoteId)) {
LOGW_SYNCPAL_WARN(_logger, L"Error in UpdateTree::deleteNode: node id=" << CommonUtility::s2ws(remoteId));
Expand Down
2 changes: 1 addition & 1 deletion src/libsyncengine/requests/exclusiontemplatecache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void ExclusionTemplateCache::updateRegexPatterns() {
static const std::string anyCharacter =
"(.|\n)*?"; // The question mark `?` is used for lazy matching, i.e. it matches as few characters as possible.

const std::lock_guard<std::mutex> lock(_mutex);
const std::scoped_lock<std::mutex> lock(_mutex);
_regexPatterns.clear();

for (const auto &exclPattern: exclusionTemplates()) {
Expand Down
26 changes: 18 additions & 8 deletions src/libsyncengine/syncpal/syncpal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ static const auto estimatedRemainingTime = "estimatedRemainingTime";

namespace KDC {

std::recursive_mutex SyncPal::updateTreesMutex;

void SyncProgress::toDynamicStruct(Poco::DynamicStruct &dstruct) const {
CommonUtility::writeValueToStruct(dstruct, currentFile, _currentFile);
CommonUtility::writeValueToStruct(dstruct, totalFiles, _totalFiles);
Expand Down Expand Up @@ -405,6 +407,7 @@ void SyncPal::loadProgress(SyncProgress &syncProgress) const {

void SyncPal::createSharedObjects() {
LOG_SYNCPAL_DEBUG(_logger, "Create shared objects");
const std::scoped_lock lock(updateTreesMutex);
_localOperationSet = std::make_shared<FSOperationSet>(ReplicaSide::Local);
_remoteOperationSet = std::make_shared<FSOperationSet>(ReplicaSide::Remote);
_localUpdateTree = std::make_shared<UpdateTree>(ReplicaSide::Local, _syncDb->rootNode());
Expand All @@ -418,6 +421,7 @@ void SyncPal::createSharedObjects() {

void SyncPal::freeSharedObjects() {
LOG_SYNCPAL_DEBUG(_logger, "Free shared objects");
const std::scoped_lock lock(updateTreesMutex);
_localSnapshot.reset();
_remoteSnapshot.reset();
_localOperationSet.reset();
Expand All @@ -442,6 +446,7 @@ void SyncPal::freeSharedObjects() {

void SyncPal::initSharedObjects() {
LOG_SYNCPAL_DEBUG(_logger, "Init shared objects");
const std::scoped_lock lock(updateTreesMutex);
if (_localUpdateTree) _localUpdateTree->init();
if (_remoteUpdateTree) _remoteUpdateTree->init();

Expand All @@ -450,7 +455,7 @@ void SyncPal::initSharedObjects() {

void SyncPal::resetSharedObjects() {
LOG_SYNCPAL_DEBUG(_logger, "Reset shared objects");

const std::scoped_lock lock(updateTreesMutex);
if (_localOperationSet) _localOperationSet->clear();
if (_remoteOperationSet) _remoteOperationSet->clear();
if (_localUpdateTree) _localUpdateTree->clear();
Expand Down Expand Up @@ -623,7 +628,7 @@ bool SyncPal::setProgressComplete(const SyncPath &relativeLocalPath, SyncFileSta
}

void SyncPal::directDownloadCallback(UniqueId jobId) {
const std::lock_guard lock(_directDownloadJobsMapMutex);
const std::scoped_lock lock(_directDownloadJobsMapMutex);
auto directDownloadJobsMapIt = _directDownloadJobsMap.find(jobId);
if (directDownloadJobsMapIt == _directDownloadJobsMap.end()) {
// No need to send a warning, the job might have been canceled, and therefor not in the map anymore
Expand Down Expand Up @@ -730,7 +735,7 @@ ExitCode SyncPal::addDlDirectJob(const SyncPath &relativePath, const SyncPath &a
job->setAdditionalCallback(callback);
SyncJobManagerSingleton::instance()->queueAsyncJob(job, Poco::Thread::PRIO_HIGH);

const std::lock_guard lock(_directDownloadJobsMapMutex);
const std::scoped_lock lock(_directDownloadJobsMapMutex);
(void) _directDownloadJobsMap.try_emplace(job->jobId(), job);
(void) _syncPathToDownloadJobMap.try_emplace(absoluteLocalPath, job->jobId());
if (!parentFolderPath.empty() && _folderHydrationInProgress.contains(parentFolderPath)) {
Expand All @@ -741,14 +746,14 @@ ExitCode SyncPal::addDlDirectJob(const SyncPath &relativePath, const SyncPath &a
}

void SyncPal::monitorFolderHydration(const SyncPath &absoluteLocalPath) {
const std::lock_guard lock(_directDownloadJobsMapMutex);
const std::scoped_lock lock(_directDownloadJobsMapMutex);
(void) _folderHydrationInProgress.try_emplace(absoluteLocalPath);
LOGW_INFO(_logger, L"Monitoring folder hydration: " << Utility::formatSyncPath(absoluteLocalPath));
}

ExitCode SyncPal::cancelDlDirectJobs(const std::vector<SyncPath> &fileList) {
for (const auto &filePath: fileList) {
const std::lock_guard lock(_directDownloadJobsMapMutex);
const std::scoped_lock lock(_directDownloadJobsMapMutex);

if (const auto itId = _syncPathToDownloadJobMap.find(filePath); itId != _syncPathToDownloadJobMap.end()) {
if (const auto itJob = _directDownloadJobsMap.find(itId->second); itJob != _directDownloadJobsMap.end()) {
Expand All @@ -769,7 +774,7 @@ ExitCode SyncPal::cancelDlDirectJobs(const std::vector<SyncPath> &fileList) {
ExitCode SyncPal::cancelAllDlDirectJobs(bool quit) {
LOG_SYNCPAL_INFO(_logger, "Cancelling all direct download jobs");

const std::lock_guard<std::mutex> lock(_directDownloadJobsMapMutex);
const std::scoped_lock<std::mutex> lock(_directDownloadJobsMapMutex);
for (auto &directDownloadJobsMapElt: _directDownloadJobsMap) {
LOG_SYNCPAL_DEBUG(_logger, "Cancelling download job " << directDownloadJobsMapElt.first);
if (quit) {
Expand Down Expand Up @@ -1421,16 +1426,21 @@ ExitInfo SyncPal::handleAccessDeniedItem(const SyncPath &relativeLocalPath, std:
}

// Blacklist the item
const std::scoped_lock lock(updateTreesMutex);
if (!updateTree(ReplicaSide::Local) || !updateTree(ReplicaSide::Remote)) {
return ExitCode::LogicError;
}

if (!localNodeId.empty()) {
_tmpBlacklistManager->blacklistItem(localNodeId, relativeLocalPath, ReplicaSide::Local);
if (!updateTree(ReplicaSide::Local)->deleteNode(localNodeId)) {
if (updateTree(ReplicaSide::Local) && !updateTree(ReplicaSide::Local)->deleteNode(localNodeId)) {
// Do nothing: Can happen if the UpdateTreeWorker step has never been launched
}
}

if (!remoteNodeId.empty()) {
_tmpBlacklistManager->blacklistItem(remoteNodeId, relativeLocalPath, ReplicaSide::Remote);
if (!updateTree(ReplicaSide::Remote)->deleteNode(remoteNodeId)) {
if (updateTree(ReplicaSide::Remote) && !updateTree(ReplicaSide::Remote)->deleteNode(remoteNodeId)) {
// Do nothing: Can happen if the UpdateTreeWorker step has never been launched
}
}
Expand Down
10 changes: 7 additions & 3 deletions src/libsyncengine/syncpal/syncpal.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ struct SyncProgress {

class SYNCENGINE_EXPORT SyncPal : public std::enable_shared_from_this<SyncPal> {
public:
static std::recursive_mutex updateTreesMutex;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not comfortable with the idea of a mutex being accessible anywhere in the code. This seems to have high risk of deadlock and/or impacting performance.
Moreover, it seems to me that only the running module should be accessing and modifying the update trees. Therefor, no concurrent access should occur. Maybe the algorithm is to be refactored somewhere instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The LocalFileSystemObserverWorker updates the update trees and runs in parallel with the other modules:
LocalFileSystemObserverWorker::sendAccessDeniedError => SyncPal::handleAccessDeniedItem => UpdateTree::deleteNode


SyncPal(std::shared_ptr<Vfs> vfs, const SyncPath &syncDbPath, const std::string &version, const bool hasFullyCompleted);
SyncPal(std::shared_ptr<Vfs> vfs, const int syncDbId, const std::string &version);
virtual ~SyncPal();
Expand Down Expand Up @@ -255,9 +257,11 @@ class SYNCENGINE_EXPORT SyncPal : public std::enable_shared_from_this<SyncPal> {
tailored to the context.
\return The exit info of the function.
*/
ExitInfo handleAccessDeniedItem(const SyncPath &relativeLocalPath, ExitCause cause = ExitCause::FileAccessError);
ExitInfo handleAccessDeniedItem(const SyncPath &relativeLocalPath, std::shared_ptr<Node> &localBlacklistedNode,
std::shared_ptr<Node> &remoteBlacklistedNode, ExitCause cause);
[[nodiscard]] ExitInfo handleAccessDeniedItem(const SyncPath &relativeLocalPath,
ExitCause cause = ExitCause::FileAccessError);
[[nodiscard]] ExitInfo handleAccessDeniedItem(const SyncPath &relativeLocalPath,
std::shared_ptr<Node> &localBlacklistedNode,
std::shared_ptr<Node> &remoteBlacklistedNode, ExitCause cause);

//! Makes copies of real-time snapshots to be used by synchronization workers.
void copySnapshots();
Expand Down
1 change: 1 addition & 0 deletions src/libsyncengine/syncpal/syncpalworker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ void SyncPalWorker::execute() {
// Start workers
LOG_SYNCPAL_INFO(_logger, "***** Step " << stepName(_step) << " start");
isStepInProgress = true;
const std::scoped_lock lock(SyncPal::updateTreesMutex);
for (int index = 0; index < 2; index++) {
if (inputSharedObject[index]) {
inputSharedObject[index]->startRead();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void ContentChecksumWorker::computeChecksum(const NodeId &id, const SyncPath &fi
}

void ContentChecksumWorker::callback(UniqueId jobId) {
const std::lock_guard<std::mutex> lock(_checksumMutex);
const std::scoped_lock<std::mutex> lock(_checksumMutex);
_runningJobs.extract(jobId);
}

Expand All @@ -69,7 +69,7 @@ void ContentChecksumWorker::execute() {
// Do nothing
}

const std::lock_guard<std::mutex> lock(_checksumMutex);
const std::scoped_lock<std::mutex> lock(_checksumMutex);
while (_runningJobs.begin() != _runningJobs.end()) {
_runningJobs.erase(_runningJobs.begin());
}
Expand All @@ -87,7 +87,7 @@ void ContentChecksumWorker::execute() {
}

if (_threadPool.available()) {
const std::lock_guard<std::mutex> lock(_checksumMutex);
const std::scoped_lock<std::mutex> lock(_checksumMutex);
std::shared_ptr<ComputeChecksumJob> job =
std::make_shared<ComputeChecksumJob>(_toCompute.front().first, _toCompute.front().second, _localSnapshot);
_runningJobs.insert({job->jobId(), job});
Expand Down
Loading