Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 26 additions & 17 deletions fdbserver/Resolver.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,22 +388,27 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self,
auto& stateTransactions = stateTransactionsPair.second;
int64_t stateMutations = 0;
int64_t stateBytes = 0;
std::unique_ptr<LogPushData> toCommit(nullptr); // For accumulating private mutations
std::unique_ptr<ResolverData> resolverData(nullptr);
const bool shouldApplyResolverPrivateMutations =
SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS && !req.txnStateTransactions.empty();

std::unique_ptr<LogPushData> toCommit; // For accumulating private mutations
std::unique_ptr<ResolverData> resolverData;
bool isLocked = false;
if (SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS) {
auto lockedKey = self->txnStateStore->readValue(databaseLockedKey).get();
isLocked = lockedKey.present() && lockedKey.get().size();
toCommit.reset(new LogPushData(self->logSystem, self->localTLogCount));
resolverData.reset(new ResolverData(self->dbgid,
self->logSystem,
self->txnStateStore,
&self->keyInfo,
toCommit.get(),
self->forceRecovery,
req.version + 1,
&self->storageCache,
&self->tssMapping));
if (shouldApplyResolverPrivateMutations) {
auto lockedKey = self->txnStateStore->readValue(databaseLockedKey).get();
isLocked = lockedKey.present() && lockedKey.get().size();
resolverData.reset(new ResolverData(self->dbgid,
self->logSystem,
self->txnStateStore,
&self->keyInfo,
toCommit.get(),
self->forceRecovery,
req.version + 1,
&self->storageCache,
&self->tssMapping));
}
}
for (int t : req.txnStateTransactions) {
stateMutations += req.transactions[t].mutations.size();
Expand All @@ -420,7 +425,7 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self,
// Generate private mutations for metadata mutations
// The condition here must match CommitBatch::applyMetadataToCommittedTransactions()
if (reply.committed[t] == ConflictBatch::TransactionCommitted && !self->forceRecovery &&
SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS && (!isLocked || req.transactions[t].lock_aware)) {
shouldApplyResolverPrivateMutations && (!isLocked || req.transactions[t].lock_aware)) {
SpanContext spanContext =
req.transactions[t].spanContext.present() ? req.transactions[t].spanContext.get() : SpanContext();

Expand All @@ -446,9 +451,10 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self,

// If shardChanged at or before this commit version, the proxy may have computed
// the wrong set of groups. Then we need to broadcast to all groups below.
stateTransactionsPair.first = toCommit && toCommit->haveLogsChanged();
bool logsChanged = shouldApplyResolverPrivateMutations && toCommit->haveLogsChanged();
stateTransactionsPair.first = logsChanged;
bool shardChanged = self->recentStateTransactionsInfo.applyStateTxnsToBatchReply(
&reply, firstUnseenVersion, req.version, toCommit && toCommit->haveLogsChanged());
&reply, firstUnseenVersion, req.version, logsChanged);

// Adds private mutation messages to the reply message.
if (SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS) {
Expand All @@ -459,7 +465,7 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self,
}
// merge mutation tags with sent client tags
toCommit->saveTags(reply.writtenTags);
reply.privateMutationCount = toCommit->getMutationCount();
reply.privateMutationCount = shouldApplyResolverPrivateMutations ? toCommit->getMutationCount() : 0;
}

//TraceEvent("ResolveBatch", self->dbgid).detail("PrevVersion", req.prevVersion).detail("Version", req.version).detail("StateTransactionVersions", self->recentStateTransactionsInfo.size()).detail("StateBytes", stateBytes).detail("FirstVersion", self->recentStateTransactionsInfo.empty() ? -1 : self->recentStateTransactionsInfo.firstVersion()).detail("StateMutationsIn", req.txnStateTransactions.size()).detail("StateMutationsOut", reply.stateMutations.size()).detail("From", proxyAddress);
Expand Down Expand Up @@ -498,6 +504,9 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self,
if (!self->numLogs) {
reply.tpcvMap.clear();
} else {
// The resolver allocates toCommit whenever PROXY_USE_RESOLVER_PRIVATE_MUTATIONS is enabled, so toCommit
// is non-null here.
ASSERT(SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS);
std::set<uint16_t> writtenTLogs;
// Does the given request correspond to an empty commit message? If so, broadcast it to all tLogs.
// NOTE: Ignore log router tags (in "req.writtenTags") while doing this check, because log router
Expand Down