Skip to content

Add commit proxy long-running commit request tracing #12152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions fdbclient/ServerKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( BLOB_GRANULE_LOCATION_MAX_QUEUE_SIZE, 1e5 ); if ( randomize && BUGGIFY ) BLOB_GRANULE_LOCATION_MAX_QUEUE_SIZE = 100;
init( COMMIT_PROXY_LIVENESS_TIMEOUT, 20.0 );
init( COMMIT_PROXY_MAX_LIVENESS_TIMEOUT, 600.0 ); if ( randomize && BUGGIFY ) COMMIT_PROXY_MAX_LIVENESS_TIMEOUT = 20.0;
init( COMMIT_BATCH_TIME_LOG_CUTOFF_SEC, 0.5 ); if ( isSimulated ) COMMIT_BATCH_TIME_LOG_CUTOFF_SEC = deterministicRandom()->random01();
init( COMMIT_PROXY_SERVICE_LATENCY_LOG_INTERVAL, 10.0 );

init( COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE, 0.0005 ); if( randomize && BUGGIFY ) COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE = 0.005;
init( COMMIT_TRANSACTION_BATCH_INTERVAL_MIN, 0.001 ); if( randomize && BUGGIFY ) COMMIT_TRANSACTION_BATCH_INTERVAL_MIN = 0.1;
Expand Down
2 changes: 2 additions & 0 deletions fdbclient/include/fdbclient/ServerKnobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,8 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ServerKnobs : public KnobsImpl<ServerKno
int BLOB_GRANULE_LOCATION_MAX_QUEUE_SIZE;
double COMMIT_PROXY_LIVENESS_TIMEOUT;
double COMMIT_PROXY_MAX_LIVENESS_TIMEOUT;
double COMMIT_BATCH_TIME_LOG_CUTOFF_SEC;
double COMMIT_PROXY_SERVICE_LATENCY_LOG_INTERVAL;

double COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE;
double COMMIT_TRANSACTION_BATCH_INTERVAL_MIN;
Expand Down
48 changes: 46 additions & 2 deletions fdbserver/CommitProxyServer.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,8 @@ struct CommitBatchContext {

double startTime;

std::shared_ptr<CommitBatchLatency> latencyMetrics = nullptr;

// The current stage of batch commit
std::string_view stage = UNSET;

Expand Down Expand Up @@ -893,6 +895,8 @@ CommitBatchContext::CommitBatchContext(ProxyCommitData* const pProxyCommitData_,

// since we are using just the former to limit the number of versions actually in flight!
ASSERT(SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS <= SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT);

latencyMetrics = std::make_shared<CommitBatchLatency>();
}

void CommitBatchContext::setupTraceBatch() {
Expand Down Expand Up @@ -941,7 +945,7 @@ double computeReleaseDelay(CommitBatchContext* self, double latencyBucket) {
}

ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {

state std::chrono::high_resolution_clock::time_point startTimeHighRes = std::chrono::high_resolution_clock::now();
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
state std::vector<CommitTransactionRequest>& trs = self->trs;
state const int64_t localBatchNumber = self->localBatchNumber;
Expand Down Expand Up @@ -989,6 +993,8 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
pProxyCommitData->stats.txnCommitOut += trs.size();
pProxyCommitData->stats.txnRejectedForQueuedTooLong += trs.size();
self->rejected = true;
self->latencyMetrics->preResolutionTime =
std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - startTimeHighRes).count();
return Void();
}

Expand Down Expand Up @@ -1039,6 +1045,8 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "CommitProxyServer.commitBatch.GotCommitVersion");
}

self->latencyMetrics->preResolutionTime =
std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - startTimeHighRes).count();
return Void();
}

Expand Down Expand Up @@ -1094,6 +1102,7 @@ EncryptCipherDomainId getEncryptDetailsFromMutationRef(ProxyCommitData* commitDa
} // namespace

ACTOR Future<Void> getResolution(CommitBatchContext* self) {
state std::chrono::high_resolution_clock::time_point startTime = std::chrono::high_resolution_clock::now();
state double resolutionStart = g_network->timer_monotonic();
// Sending these requests is the fuzzy border between phase 1 and phase 2; it could conceivably overlap with
// resolution processing but is still using CPU
Expand Down Expand Up @@ -1188,7 +1197,8 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> cipherKeys = wait(getCipherKeys);
self->cipherKeys = cipherKeys;
}

self->latencyMetrics->resolutionTime =
std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - startTime).count();
return Void();
}

Expand Down Expand Up @@ -2302,6 +2312,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
}

ACTOR Future<Void> postResolution(CommitBatchContext* self) {
state std::chrono::high_resolution_clock::time_point startTime = std::chrono::high_resolution_clock::now();
state double postResolutionStart = g_network->timer_monotonic();
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
state std::vector<CommitTransactionRequest>& trs = self->trs;
Expand Down Expand Up @@ -2589,10 +2600,13 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
}

pProxyCommitData->stats.processingMutationDist->sampleSeconds(g_network->timer_monotonic() - postResolutionQueuing);
self->latencyMetrics->postResolutionTime =
std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - startTime).count();
return Void();
}

ACTOR Future<Void> transactionLogging(CommitBatchContext* self) {
state std::chrono::high_resolution_clock::time_point startTime = std::chrono::high_resolution_clock::now();
state double tLoggingStart = g_network->timer_monotonic();
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
state Span span("MP:transactionLogging"_loc, self->span.context);
Expand Down Expand Up @@ -2631,10 +2645,13 @@ ACTOR Future<Void> transactionLogging(CommitBatchContext* self) {
}
pProxyCommitData->logSystem->popTxs(self->msg.popTo);
pProxyCommitData->stats.tlogLoggingDist->sampleSeconds(g_network->timer_monotonic() - tLoggingStart);
self->latencyMetrics->transactionLoggingTime =
std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - startTime).count();
return Void();
}

ACTOR Future<Void> reply(CommitBatchContext* self) {
state std::chrono::high_resolution_clock::time_point startTime = std::chrono::high_resolution_clock::now();
state double replyStart = g_network->timer_monotonic();
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
state Span span("MP:reply"_loc, self->span.context);
Expand Down Expand Up @@ -2834,11 +2851,16 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
ASSERT_ABORT(pProxyCommitData->commitBatchesMemBytesCount >= 0);
wait(self->releaseFuture);
pProxyCommitData->stats.replyCommitDist->sampleSeconds(g_network->timer_monotonic() - replyStart);
self->latencyMetrics->replyTime =
std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - startTime).count();
return Void();
}

// Commit one batch of transactions trs
ACTOR Future<Void> commitBatchImpl(CommitBatchContext* pContext) {
pContext->latencyMetrics->reset();

state std::chrono::high_resolution_clock::time_point startTime = std::chrono::high_resolution_clock::now();
// WARNING: this code is run at a high priority (until the first delay(0)), so it needs to do as little work as
// possible

Expand All @@ -2853,12 +2875,20 @@ ACTOR Future<Void> commitBatchImpl(CommitBatchContext* pContext) {
++pContext->pProxyCommitData->stats.commitBatchIn;
pContext->setupTraceBatch();

pContext->latencyMetrics->initializationTime =
std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - startTime).count();

/////// Phase 1: Pre-resolution processing (CPU bound except waiting for a version # which is separately pipelined
/// and *should* be available by now (unless empty commit); ordered; currently atomic but could yield)
pContext->stage = PRE_RESOLUTION;
wait(CommitBatch::preresolutionProcessing(pContext));
if (pContext->rejected) {
pContext->pProxyCommitData->commitBatchesMemBytesCount -= pContext->currentBatchMemBytesCount;
pContext->latencyMetrics->totalTime =
std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - startTime).count();
if (pContext->latencyMetrics->totalTime > SERVER_KNOBS->COMMIT_BATCH_TIME_LOG_CUTOFF_SEC) {
pContext->pProxyCommitData->serviceTracing->update(pContext->latencyMetrics);
}
return Void();
}

Expand All @@ -2881,6 +2911,12 @@ ACTOR Future<Void> commitBatchImpl(CommitBatchContext* pContext) {
wait(CommitBatch::reply(pContext));

pContext->stage = COMPLETE;

pContext->latencyMetrics->totalTime =
std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - startTime).count();
if ((pContext->latencyMetrics->totalTime) > SERVER_KNOBS->COMMIT_BATCH_TIME_LOG_CUTOFF_SEC) {
pContext->pProxyCommitData->serviceTracing->update(pContext->latencyMetrics);
}
return Void();
}

Expand Down Expand Up @@ -3964,6 +4000,13 @@ ACTOR Future<Void> logDetailedMetrics(ProxyCommitData* commitData) {
}
}

ACTOR Future<Void> commitProxyServiceTracingLogger(ProxyCommitData* commitData) {
loop {
wait(delay(SERVER_KNOBS->COMMIT_PROXY_SERVICE_LATENCY_LOG_INTERVAL));
commitData->serviceTracing->log();
}
}

ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
MasterInterface master,
LifetimeToken masterLifetime,
Expand Down Expand Up @@ -4063,6 +4106,7 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
addActor.send(ddMetricsRequestServer(proxy, db));
addActor.send(reportTxnTagCommitCost(proxy.id(), db, &commitData.ssTrTagCommitCost));
addActor.send(logDetailedMetrics(&commitData));
addActor.send(commitProxyServiceTracingLogger(&commitData));

auto openDb = openDBOnServer(db);

Expand Down
72 changes: 72 additions & 0 deletions fdbserver/include/fdbserver/ProxyCommitData.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,75 @@ struct ExpectedIdempotencyIdCountForKey {
: commitVersion(commitVersion), idempotencyIdCount(idempotencyIdCount), batchIndexHighByte(batchIndexHighByte) {}
};

struct CommitBatchLatency {
double initializationTime = 0;
double preResolutionTime = 0;
double resolutionTime = 0;
double postResolutionTime = 0;
double transactionLoggingTime = 0;
double replyTime = 0;
double totalTime = 0;

CommitBatchLatency() {}

inline void reset() {
initializationTime = 0;
preResolutionTime = 0;
resolutionTime = 0;
postResolutionTime = 0;
transactionLoggingTime = 0;
replyTime = 0;
totalTime = 0;
}
};

struct CommitProxyServiceTracing {
public:
CommitProxyServiceTracing(const UID& debugID) : debugID(debugID) {}

inline void update(std::shared_ptr<CommitBatchLatency> latencyMetrics) {
initializationTime = initializationTime + latencyMetrics->initializationTime;
preResolutionTime = preResolutionTime + latencyMetrics->preResolutionTime;
resolutionTime = resolutionTime + latencyMetrics->resolutionTime;
postResolutionTime = postResolutionTime + latencyMetrics->postResolutionTime;
transactionLoggingTime = transactionLoggingTime + latencyMetrics->transactionLoggingTime;
replyTime = replyTime + latencyMetrics->replyTime;
totalTime = totalTime + latencyMetrics->totalTime;
}

inline void log() {
TraceEvent(SevInfo, "CommitProxyServiceTracing", debugID)
.detail("InitializationTime", initializationTime)
.detail("PreResolutionTime", preResolutionTime)
.detail("ResolutionTime", resolutionTime)
.detail("PostResolutionTime", postResolutionTime)
.detail("TransactionLoggingTime", transactionLoggingTime)
.detail("ReplyTime", replyTime)
.detail("TotalTime", totalTime);
reset();
}

private:
inline void reset() {
initializationTime = 0;
preResolutionTime = 0;
resolutionTime = 0;
postResolutionTime = 0;
transactionLoggingTime = 0;
replyTime = 0;
totalTime = 0;
}

UID debugID;
double initializationTime = 0;
double preResolutionTime = 0;
double resolutionTime = 0;
double postResolutionTime = 0;
double transactionLoggingTime = 0;
double replyTime = 0;
double totalTime = 0;
};

struct RangeLock;
struct ProxyCommitData {
UID dbgid;
Expand Down Expand Up @@ -281,6 +350,8 @@ struct ProxyCommitData {

std::shared_ptr<RangeLock> rangeLock = nullptr;

std::shared_ptr<CommitProxyServiceTracing> serviceTracing = nullptr;

// The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly
// more CPU efficient. When a tag related to a storage server does change, we empty out all of these vectors to
// signify they must be repopulated. We do not repopulate them immediately to avoid a slow task.
Expand Down Expand Up @@ -380,6 +451,7 @@ struct ProxyCommitData {
: nullptr),
lastShardMove(invalidVersion), epoch(epoch) {
commitComputePerOperation.resize(SERVER_KNOBS->PROXY_COMPUTE_BUCKETS, 0.0);
serviceTracing = std::make_shared<CommitProxyServiceTracing>(dbgid);
}
};
struct RangeLock {
Expand Down