Skip to content

Commit 367f6e2

Browse files
Rajeev975meta-codesync[bot]
authored andcommitted
feat(expr-eval): Adaptive per-function CPU sampling for Velox expression evaluation (facebookincubator#17002)
Summary: Pull Request resolved: facebookincubator#17002 This is similar to facebookincubator#16646 Previous landing was reverted because the test asserted that slow_add must be in kAlwaysTrack state and plus must be in kSampling state. Replaced absolute state assertions with a relative comparison: slow_add sampling rate must be ≤ plus sampling rate. This is robust because both functions share the same timerOverheadNanos_ per ExprSet, so measurement noise affects both equally and cannot flip the relative ordering. Differential Revision: D99126870
1 parent 77087c5 commit 367f6e2

File tree

7 files changed

+968
-9
lines changed

7 files changed

+968
-9
lines changed

velox/core/QueryConfig.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,25 @@ class QueryConfig {
8787
static constexpr const char* kExprTrackCpuUsageForFunctions =
8888
"expression.track_cpu_usage_for_functions";
8989

90+
/// Enables adaptive per-function CPU usage sampling. When enabled, each
91+
/// function is calibrated over the first 6 batches (1 warmup + 5
92+
/// calibration) to measure the overhead of CPU tracking (clock_gettime).
93+
/// Functions where tracking overhead exceeds
94+
/// kExprAdaptiveCpuSamplingMaxOverheadPct are automatically sampled at a
95+
/// rate proportional to their overhead. Functions with low overhead are
96+
/// always tracked. Disabled by default.
97+
static constexpr const char* kExprAdaptiveCpuSampling =
98+
"expression.adaptive_cpu_sampling";
99+
100+
/// Maximum acceptable overhead percentage for CPU tracking per function.
101+
/// Used with kExprAdaptiveCpuSampling. Functions whose CPU tracking overhead
102+
/// exceeds this threshold are sampled at a rate of
103+
/// ceil(overhead_pct / max_overhead_pct). For example, with max_overhead=1.0,
104+
/// a function with 70% tracking overhead is sampled every 70th batch.
105+
/// Default: 1.0 (1% overhead target).
106+
static constexpr const char* kExprAdaptiveCpuSamplingMaxOverheadPct =
107+
"expression.adaptive_cpu_sampling_max_overhead_pct";
108+
90109
/// Controls whether non-deterministic expressions are deduplicated during
91110
/// compilation. This is intended for testing and debugging purposes. By
92111
/// default, this is set to true to preserve standard behavior. If set to
@@ -1387,6 +1406,14 @@ class QueryConfig {
13871406
return get<std::string>(kExprTrackCpuUsageForFunctions, "");
13881407
}
13891408

1409+
bool exprAdaptiveCpuSampling() const {
1410+
return get<bool>(kExprAdaptiveCpuSampling, false);
1411+
}
1412+
1413+
double exprAdaptiveCpuSamplingMaxOverheadPct() const {
1414+
return get<double>(kExprAdaptiveCpuSamplingMaxOverheadPct, 1.0);
1415+
}
1416+
13901417
bool exprDedupNonDeterministic() const {
13911418
return get<bool>(kExprDedupNonDeterministic, true);
13921419
}

velox/docs/configs.rst

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,22 @@ Expression Evaluation Configuration
286286
``expression.track_cpu_usage`` is set to false. Function names are case-insensitive and will be normalized
287287
to lowercase. This allows fine-grained control over CPU tracking overhead when only specific functions need to
288288
be monitored.
289+
* - expression.adaptive_cpu_sampling
290+
- boolean
291+
- false
292+
- Enables adaptive per-function CPU usage sampling. Each function is calibrated over 6 batches (1 warmup + 5
293+
calibration) to measure the overhead of CPU tracking (clock_gettime) relative to the function's execution time.
294+
The timer overhead is measured once per ExprSet and shared across all functions. Functions where tracking overhead
295+
is acceptable are always tracked; functions where overhead exceeds ``expression.adaptive_cpu_sampling_max_overhead_pct``
296+
are sampled at a rate proportional to their overhead. Sampled timing stats are extrapolated to approximate
297+
full-population values.
298+
* - expression.adaptive_cpu_sampling_max_overhead_pct
299+
- float
300+
- 1.0
301+
- Maximum acceptable CPU tracking overhead percentage per function, used with ``expression.adaptive_cpu_sampling``.
302+
Functions whose tracking overhead exceeds this threshold are sampled at a rate of
303+
ceil(overhead_pct / max_overhead_pct). For example, with max_overhead=1.0, a function with 70% tracking overhead
304+
is sampled every 70th batch, bounding its effective overhead to ~1%. Must be greater than 0.
289305
* - legacy_cast
290306
- bool
291307
- false

velox/expression/EvalCtx.h

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,34 @@ class EvalCtx {
539539
return execCtx_->optimizationParams().dictionaryMemoizationEnabled;
540540
}
541541

542+
/// Returns true if adaptive per-function CPU sampling is enabled.
543+
bool adaptiveCpuSamplingEnabled() const {
544+
return adaptiveCpuSamplingEnabled_;
545+
}
546+
547+
void setAdaptiveCpuSamplingEnabled(bool enabled) {
548+
adaptiveCpuSamplingEnabled_ = enabled;
549+
}
550+
551+
/// Returns the maximum acceptable overhead pct for adaptive sampling.
552+
double adaptiveCpuSamplingMaxOverheadPct() const {
553+
return adaptiveCpuSamplingMaxOverheadPct_;
554+
}
555+
556+
void setAdaptiveCpuSamplingMaxOverheadPct(double pct) {
557+
adaptiveCpuSamplingMaxOverheadPct_ = pct;
558+
}
559+
560+
/// Returns the measured CpuWallTimer overhead in nanoseconds (per
561+
/// invocation). Measured once per ExprSet and shared across all Expr nodes.
562+
uint64_t timerOverheadNanos() const {
563+
return timerOverheadNanos_;
564+
}
565+
566+
void setTimerOverheadNanos(uint64_t nanos) {
567+
timerOverheadNanos_ = nanos;
568+
}
569+
542570
/// Returns the maximum number of distinct inputs to cache results for in a
543571
/// given shared subexpression.
544572
uint32_t maxSharedSubexprResultsCached() const {
@@ -610,6 +638,15 @@ class EvalCtx {
610638
// If 'captureErrorDetails()' is false, stores flags indicating which rows had
611639
// errors without storing actual exceptions.
612640
EvalErrorsPtr errors_;
641+
642+
// Whether adaptive per-function CPU sampling is enabled.
643+
bool adaptiveCpuSamplingEnabled_{false};
644+
645+
// Maximum acceptable overhead percentage for adaptive CPU sampling.
646+
double adaptiveCpuSamplingMaxOverheadPct_{1.0};
647+
648+
// Measured CpuWallTimer overhead (nanos per invocation), shared across Exprs.
649+
uint64_t timerOverheadNanos_{0};
613650
};
614651

615652
/// Utility wrapper struct that is used to temporarily reset the value of the

velox/expression/Expr.cpp

Lines changed: 150 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <boost/lexical_cast.hpp>
1717
#include <boost/uuid/uuid_generators.hpp>
1818
#include <boost/uuid/uuid_io.hpp>
19+
#include <cmath>
1920

2021
#include "velox/common/base/Exceptions.h"
2122
#include "velox/common/base/Fs.h"
@@ -1587,13 +1588,94 @@ bool Expr::applyFunctionWithPeeling(
15871588
return true;
15881589
}
15891590

1591+
std::unique_ptr<CpuWallTimer> Expr::cpuWallTimer(const EvalCtx& context) {
1592+
// 1. Compile-time tracking (set via trackCpuUsage_) always wins.
1593+
if (trackCpuUsage_) {
1594+
return std::make_unique<CpuWallTimer>(stats_.timing);
1595+
}
1596+
1597+
// 2. Adaptive per-function sampling.
1598+
if (context.adaptiveCpuSamplingEnabled()) {
1599+
switch (adaptiveState_) {
1600+
case AdaptiveCpuSamplingState::kWarmup:
1601+
// Warmup batch: just run the function, no timing.
1602+
return nullptr;
1603+
case AdaptiveCpuSamplingState::kCalibrating: {
1604+
// Measure function execution time (without CpuWallTimer).
1605+
// Timer overhead is measured once per ExprSet and shared via EvalCtx.
1606+
calibrationStopWatch_.emplace();
1607+
return nullptr;
1608+
}
1609+
case AdaptiveCpuSamplingState::kAlwaysTrack:
1610+
return std::make_unique<CpuWallTimer>(stats_.timing);
1611+
case AdaptiveCpuSamplingState::kSampling:
1612+
if (++adaptiveSamplingCounter_ % adaptiveSamplingRate_ == 0) {
1613+
return std::make_unique<CpuWallTimer>(stats_.timing);
1614+
}
1615+
return nullptr;
1616+
}
1617+
}
1618+
1619+
return nullptr;
1620+
}
1621+
1622+
void Expr::finalizeAdaptiveCalibration(
1623+
double maxOverheadPct,
1624+
uint64_t timerOverheadNanos) {
1625+
switch (adaptiveState_) {
1626+
case AdaptiveCpuSamplingState::kWarmup: {
1627+
adaptiveState_ = AdaptiveCpuSamplingState::kCalibrating;
1628+
break;
1629+
}
1630+
case AdaptiveCpuSamplingState::kCalibrating: {
1631+
calibrationFunctionWallNanos_ +=
1632+
calibrationStopWatch_->elapsed().wallNanos;
1633+
calibrationStopWatch_.reset();
1634+
1635+
if (++calibrationBatchCount_ < kCalibrationBatches) {
1636+
break;
1637+
}
1638+
1639+
// Use the shared timer overhead measurement, scaled by calibration
1640+
// batch count. The overhead per invocation is a platform constant
1641+
// measured once per ExprSet.
1642+
auto totalTimerOverhead = timerOverheadNanos * calibrationBatchCount_;
1643+
1644+
if (calibrationFunctionWallNanos_ > 0 && maxOverheadPct > 0) {
1645+
double overheadPct = 100.0 * static_cast<double>(totalTimerOverhead) /
1646+
static_cast<double>(calibrationFunctionWallNanos_);
1647+
1648+
if (overheadPct > maxOverheadPct) {
1649+
adaptiveSamplingRate_ =
1650+
static_cast<uint32_t>(std::ceil(overheadPct / maxOverheadPct));
1651+
// Start counter at rate-1 so the first post-calibration batch is
1652+
// always timed (++counter hits rate, which passes % rate == 0).
1653+
adaptiveSamplingCounter_ = adaptiveSamplingRate_ - 1;
1654+
adaptiveState_ = AdaptiveCpuSamplingState::kSampling;
1655+
} else {
1656+
adaptiveState_ = AdaptiveCpuSamplingState::kAlwaysTrack;
1657+
}
1658+
} else {
1659+
// Function ~0ns — timer dominates. Aggressive sampling.
1660+
adaptiveSamplingRate_ = 100;
1661+
adaptiveSamplingCounter_ = adaptiveSamplingRate_ - 1;
1662+
adaptiveState_ = AdaptiveCpuSamplingState::kSampling;
1663+
}
1664+
break;
1665+
}
1666+
default:
1667+
VELOX_UNREACHABLE(
1668+
"Unexpected adaptive sampling state in finalizeAdaptiveCalibration");
1669+
}
1670+
}
1671+
15901672
void Expr::applyFunction(
15911673
const SelectivityVector& rows,
15921674
EvalCtx& context,
15931675
VectorPtr& result) {
15941676
stats_.numProcessedVectors += 1;
15951677
stats_.numProcessedRows += rows.countSelected();
1596-
auto timer = cpuWallTimer();
1678+
auto timer = cpuWallTimer(context);
15971679

15981680
computeIsAsciiForInputs(vectorFunction_.get(), inputValues_, rows);
15991681
auto isAscii = type()->isVarchar()
@@ -1633,6 +1715,14 @@ void Expr::applyFunction(
16331715
result->asUnchecked<SimpleVector<StringView>>()->setIsAscii(
16341716
isAscii.value(), rows);
16351717
}
1718+
1719+
// Only do Adaptive Calibration if the adaptive sampling is on and we are in
1720+
// warmup or calibrating state.
1721+
if (context.adaptiveCpuSamplingEnabled() && isCalibrating()) {
1722+
finalizeAdaptiveCalibration(
1723+
context.adaptiveCpuSamplingMaxOverheadPct(),
1724+
context.timerOverheadNanos());
1725+
}
16361726
}
16371727

16381728
void Expr::evalSpecialFormWithStats(
@@ -1641,9 +1731,17 @@ void Expr::evalSpecialFormWithStats(
16411731
VectorPtr& result) {
16421732
stats_.numProcessedVectors += 1;
16431733
stats_.numProcessedRows += rows.countSelected();
1644-
auto timer = cpuWallTimer();
1734+
auto timer = cpuWallTimer(context);
16451735

16461736
evalSpecialForm(rows, context, result);
1737+
1738+
// Only do Adaptive Calibration if the adaptive sampling is on and we are in
1739+
// warmup or calibrating state.
1740+
if (context.adaptiveCpuSamplingEnabled() && isCalibrating()) {
1741+
finalizeAdaptiveCalibration(
1742+
context.adaptiveCpuSamplingMaxOverheadPct(),
1743+
context.timerOverheadNanos());
1744+
}
16471745
}
16481746

16491747
namespace {
@@ -1873,7 +1971,14 @@ ExprSet::ExprSet(
18731971
core::ExecCtx* execCtx,
18741972
bool enableConstantFolding,
18751973
bool lazyDereference)
1876-
: execCtx_(execCtx), lazyDereference_(lazyDereference) {
1974+
: execCtx_(execCtx),
1975+
lazyDereference_(lazyDereference),
1976+
adaptiveCpuSampling_(
1977+
execCtx->queryCtx()->queryConfig().exprAdaptiveCpuSampling()),
1978+
adaptiveCpuSamplingMaxOverheadPct_(
1979+
execCtx->queryCtx()
1980+
->queryConfig()
1981+
.exprAdaptiveCpuSamplingMaxOverheadPct()) {
18771982
exprs_ = compileExpressions(sources, execCtx, this, enableConstantFolding);
18781983
if (lazyDereference_) {
18791984
validateLazyDereference(exprs_);
@@ -1886,6 +1991,24 @@ ExprSet::ExprSet(
18861991
}
18871992

18881993
namespace {
1994+
1995+
/// If the expression is in adaptive sampling mode, extrapolate timing stats
1996+
/// to approximate full-population values. Otherwise, return raw stats.
1997+
exec::ExprStats adjustStats(const exec::Expr& expr) {
1998+
if (expr.isAdaptiveSampling() && expr.stats().timing.count > 0) {
1999+
exec::ExprStats adjusted = expr.stats();
2000+
double ratio = static_cast<double>(adjusted.numProcessedVectors) /
2001+
static_cast<double>(adjusted.timing.count);
2002+
adjusted.timing.cpuNanos = static_cast<uint64_t>(
2003+
static_cast<double>(adjusted.timing.cpuNanos) * ratio);
2004+
adjusted.timing.wallNanos = static_cast<uint64_t>(
2005+
static_cast<double>(adjusted.timing.wallNanos) * ratio);
2006+
adjusted.timing.count = adjusted.numProcessedVectors;
2007+
return adjusted;
2008+
}
2009+
return expr.stats();
2010+
}
2011+
18892012
void addStats(
18902013
const exec::Expr& expr,
18912014
std::unordered_map<std::string, exec::ExprStats>& stats,
@@ -1904,7 +2027,7 @@ void addStats(
19042027
bool emptyStats =
19052028
!expr.stats().numProcessedRows && !expr.stats().defaultNullRowsSkipped;
19062029
if (!emptyStats && !excludeSplFormExpr) {
1907-
stats[expr.name()].add(expr.stats());
2030+
stats[expr.name()].add(adjustStats(expr));
19082031
}
19092032

19102033
for (const auto& input : expr.inputs()) {
@@ -2016,6 +2139,24 @@ void printInputAndExprs(
20162139
}
20172140
} // namespace
20182141

2142+
void ExprSet::initializeAdaptiveCpuSampling(EvalCtx& context) {
2143+
context.setAdaptiveCpuSamplingEnabled(true);
2144+
context.setAdaptiveCpuSamplingMaxOverheadPct(
2145+
adaptiveCpuSamplingMaxOverheadPct_);
2146+
2147+
// Measure CpuWallTimer overhead once per ExprSet (platform constant).
2148+
if (!timerOverheadMeasured_) {
2149+
CpuWallTiming dummyTiming;
2150+
DeltaCpuWallTimeStopWatch overheadWatch;
2151+
{
2152+
auto dummy = std::make_unique<CpuWallTimer>(dummyTiming);
2153+
}
2154+
timerOverheadNanos_ = overheadWatch.elapsed().wallNanos;
2155+
timerOverheadMeasured_ = true;
2156+
}
2157+
context.setTimerOverheadNanos(timerOverheadNanos_);
2158+
}
2159+
20192160
void ExprSet::eval(
20202161
int32_t begin,
20212162
int32_t end,
@@ -2029,6 +2170,11 @@ void ExprSet::eval(
20292170
clearSharedSubexprs();
20302171
}
20312172

2173+
// Apply adaptive per-function CPU sampling if configured.
2174+
if (adaptiveCpuSampling_) {
2175+
initializeAdaptiveCpuSampling(context);
2176+
}
2177+
20322178
if (!lazyDereference_) {
20332179
// Make sure LazyVectors, referenced by multiple expressions, are loaded for
20342180
// all the "rows".

0 commit comments

Comments
 (0)