Skip to content

Commit f180206

Browse files
kgpaimeta-codesync[bot]
authored andcommitted
Revert D95149629: feat(expr-eval):Adaptive per-function CPU sampling for Velox expression evaluation
Summary: Reverting D95149629 because it introduced a persistently failing test `ExprStatsTest.adaptiveCpuSamplingPerFunctionRates` that has been breaking OSS CI (Linux Build using GCC) on every merge-to-main commit since landing. This caused 6+ consecutive red builds on the Velox main branch starting from commit 793f13f (Mar 31 18:28 UTC). The feature can be re-landed after the test is fixed. Differential Revision: D99117623
1 parent 2c1ade3 commit f180206

File tree

7 files changed

+9
-979
lines changed

7 files changed

+9
-979
lines changed

velox/core/QueryConfig.h

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -87,25 +87,6 @@ class QueryConfig {
8787
static constexpr const char* kExprTrackCpuUsageForFunctions =
8888
"expression.track_cpu_usage_for_functions";
8989

90-
/// Enables adaptive per-function CPU usage sampling. When enabled, each
91-
/// function is calibrated over the first 6 batches (1 warmup + 5
92-
/// calibration) to measure the overhead of CPU tracking (clock_gettime).
93-
/// Functions where tracking overhead exceeds
94-
/// kExprAdaptiveCpuSamplingMaxOverheadPct are automatically sampled at a
95-
/// rate proportional to their overhead. Functions with low overhead are
96-
/// always tracked. Disabled by default.
97-
static constexpr const char* kExprAdaptiveCpuSampling =
98-
"expression.adaptive_cpu_sampling";
99-
100-
/// Maximum acceptable overhead percentage for CPU tracking per function.
101-
/// Used with kExprAdaptiveCpuSampling. Functions whose CPU tracking overhead
102-
/// exceeds this threshold are sampled at a rate of
103-
/// ceil(overhead_pct / max_overhead_pct). For example, with max_overhead=1.0,
104-
/// a function with 70% tracking overhead is sampled every 70th batch.
105-
/// Default: 1.0 (1% overhead target).
106-
static constexpr const char* kExprAdaptiveCpuSamplingMaxOverheadPct =
107-
"expression.adaptive_cpu_sampling_max_overhead_pct";
108-
10990
/// Controls whether non-deterministic expressions are deduplicated during
11091
/// compilation. This is intended for testing and debugging purposes. By
11192
/// default, this is set to true to preserve standard behavior. If set to
@@ -1406,14 +1387,6 @@ class QueryConfig {
14061387
return get<std::string>(kExprTrackCpuUsageForFunctions, "");
14071388
}
14081389

1409-
bool exprAdaptiveCpuSampling() const {
1410-
return get<bool>(kExprAdaptiveCpuSampling, false);
1411-
}
1412-
1413-
double exprAdaptiveCpuSamplingMaxOverheadPct() const {
1414-
return get<double>(kExprAdaptiveCpuSamplingMaxOverheadPct, 1.0);
1415-
}
1416-
14171390
bool exprDedupNonDeterministic() const {
14181391
return get<bool>(kExprDedupNonDeterministic, true);
14191392
}

velox/docs/configs.rst

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -286,22 +286,6 @@ Expression Evaluation Configuration
286286
``expression.track_cpu_usage`` is set to false. Function names are case-insensitive and will be normalized
287287
to lowercase. This allows fine-grained control over CPU tracking overhead when only specific functions need to
288288
be monitored.
289-
* - expression.adaptive_cpu_sampling
290-
- boolean
291-
- false
292-
- Enables adaptive per-function CPU usage sampling. Each function is calibrated over 6 batches (1 warmup + 5
293-
calibration) to measure the overhead of CPU tracking (clock_gettime) relative to the function's execution time.
294-
The timer overhead is measured once per ExprSet and shared across all functions. Functions where tracking overhead
295-
is acceptable are always tracked; functions where overhead exceeds ``expression.adaptive_cpu_sampling_max_overhead_pct``
296-
are sampled at a rate proportional to their overhead. Sampled timing stats are extrapolated to approximate
297-
full-population values.
298-
* - expression.adaptive_cpu_sampling_max_overhead_pct
299-
- float
300-
- 1.0
301-
- Maximum acceptable CPU tracking overhead percentage per function, used with ``expression.adaptive_cpu_sampling``.
302-
Functions whose tracking overhead exceeds this threshold are sampled at a rate of
303-
ceil(overhead_pct / max_overhead_pct). For example, with max_overhead=1.0, a function with 70% tracking overhead
304-
is sampled every 70th batch, bounding its effective overhead to ~1%. Must be greater than 0.
305289
* - legacy_cast
306290
- bool
307291
- false

velox/expression/EvalCtx.h

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -539,34 +539,6 @@ class EvalCtx {
539539
return execCtx_->optimizationParams().dictionaryMemoizationEnabled;
540540
}
541541

542-
/// Returns true if adaptive per-function CPU sampling is enabled.
543-
bool adaptiveCpuSamplingEnabled() const {
544-
return adaptiveCpuSamplingEnabled_;
545-
}
546-
547-
void setAdaptiveCpuSamplingEnabled(bool enabled) {
548-
adaptiveCpuSamplingEnabled_ = enabled;
549-
}
550-
551-
/// Returns the maximum acceptable overhead pct for adaptive sampling.
552-
double adaptiveCpuSamplingMaxOverheadPct() const {
553-
return adaptiveCpuSamplingMaxOverheadPct_;
554-
}
555-
556-
void setAdaptiveCpuSamplingMaxOverheadPct(double pct) {
557-
adaptiveCpuSamplingMaxOverheadPct_ = pct;
558-
}
559-
560-
/// Returns the measured CpuWallTimer overhead in nanoseconds (per
561-
/// invocation). Measured once per ExprSet and shared across all Expr nodes.
562-
uint64_t timerOverheadNanos() const {
563-
return timerOverheadNanos_;
564-
}
565-
566-
void setTimerOverheadNanos(uint64_t nanos) {
567-
timerOverheadNanos_ = nanos;
568-
}
569-
570542
/// Returns the maximum number of distinct inputs to cache results for in a
571543
/// given shared subexpression.
572544
uint32_t maxSharedSubexprResultsCached() const {
@@ -638,15 +610,6 @@ class EvalCtx {
638610
// If 'captureErrorDetails()' is false, stores flags indicating which rows had
639611
// errors without storing actual exceptions.
640612
EvalErrorsPtr errors_;
641-
642-
// Whether adaptive per-function CPU sampling is enabled.
643-
bool adaptiveCpuSamplingEnabled_{false};
644-
645-
// Maximum acceptable overhead percentage for adaptive CPU sampling.
646-
double adaptiveCpuSamplingMaxOverheadPct_{1.0};
647-
648-
// Measured CpuWallTimer overhead (nanos per invocation), shared across Exprs.
649-
uint64_t timerOverheadNanos_{0};
650613
};
651614

652615
/// Utility wrapper struct that is used to temporarily reset the value of the

velox/expression/Expr.cpp

Lines changed: 4 additions & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
#include <boost/lexical_cast.hpp>
1717
#include <boost/uuid/uuid_generators.hpp>
1818
#include <boost/uuid/uuid_io.hpp>
19-
#include <cmath>
2019

2120
#include "velox/common/base/Exceptions.h"
2221
#include "velox/common/base/Fs.h"
@@ -1588,94 +1587,13 @@ bool Expr::applyFunctionWithPeeling(
15881587
return true;
15891588
}
15901589

1591-
std::unique_ptr<CpuWallTimer> Expr::cpuWallTimer(const EvalCtx& context) {
1592-
// 1. Compile-time tracking (set via trackCpuUsage_) always wins.
1593-
if (trackCpuUsage_) {
1594-
return std::make_unique<CpuWallTimer>(stats_.timing);
1595-
}
1596-
1597-
// 2. Adaptive per-function sampling.
1598-
if (context.adaptiveCpuSamplingEnabled()) {
1599-
switch (adaptiveState_) {
1600-
case AdaptiveCpuSamplingState::kWarmup:
1601-
// Warmup batch: just run the function, no timing.
1602-
return nullptr;
1603-
case AdaptiveCpuSamplingState::kCalibrating: {
1604-
// Measure function execution time (without CpuWallTimer).
1605-
// Timer overhead is measured once per ExprSet and shared via EvalCtx.
1606-
calibrationStopWatch_.emplace();
1607-
return nullptr;
1608-
}
1609-
case AdaptiveCpuSamplingState::kAlwaysTrack:
1610-
return std::make_unique<CpuWallTimer>(stats_.timing);
1611-
case AdaptiveCpuSamplingState::kSampling:
1612-
if (++adaptiveSamplingCounter_ % adaptiveSamplingRate_ == 0) {
1613-
return std::make_unique<CpuWallTimer>(stats_.timing);
1614-
}
1615-
return nullptr;
1616-
}
1617-
}
1618-
1619-
return nullptr;
1620-
}
1621-
1622-
void Expr::finalizeAdaptiveCalibration(
1623-
double maxOverheadPct,
1624-
uint64_t timerOverheadNanos) {
1625-
switch (adaptiveState_) {
1626-
case AdaptiveCpuSamplingState::kWarmup: {
1627-
adaptiveState_ = AdaptiveCpuSamplingState::kCalibrating;
1628-
break;
1629-
}
1630-
case AdaptiveCpuSamplingState::kCalibrating: {
1631-
calibrationFunctionWallNanos_ +=
1632-
calibrationStopWatch_->elapsed().wallNanos;
1633-
calibrationStopWatch_.reset();
1634-
1635-
if (++calibrationBatchCount_ < kCalibrationBatches) {
1636-
break;
1637-
}
1638-
1639-
// Use the shared timer overhead measurement, scaled by calibration
1640-
// batch count. The overhead per invocation is a platform constant
1641-
// measured once per ExprSet.
1642-
auto totalTimerOverhead = timerOverheadNanos * calibrationBatchCount_;
1643-
1644-
if (calibrationFunctionWallNanos_ > 0 && maxOverheadPct > 0) {
1645-
double overheadPct = 100.0 * static_cast<double>(totalTimerOverhead) /
1646-
static_cast<double>(calibrationFunctionWallNanos_);
1647-
1648-
if (overheadPct > maxOverheadPct) {
1649-
adaptiveSamplingRate_ =
1650-
static_cast<uint32_t>(std::ceil(overheadPct / maxOverheadPct));
1651-
// Start counter at rate-1 so the first post-calibration batch is
1652-
// always timed (++counter hits rate, which passes % rate == 0).
1653-
adaptiveSamplingCounter_ = adaptiveSamplingRate_ - 1;
1654-
adaptiveState_ = AdaptiveCpuSamplingState::kSampling;
1655-
} else {
1656-
adaptiveState_ = AdaptiveCpuSamplingState::kAlwaysTrack;
1657-
}
1658-
} else {
1659-
// Function ~0ns — timer dominates. Aggressive sampling.
1660-
adaptiveSamplingRate_ = 100;
1661-
adaptiveSamplingCounter_ = adaptiveSamplingRate_ - 1;
1662-
adaptiveState_ = AdaptiveCpuSamplingState::kSampling;
1663-
}
1664-
break;
1665-
}
1666-
default:
1667-
VELOX_UNREACHABLE(
1668-
"Unexpected adaptive sampling state in finalizeAdaptiveCalibration");
1669-
}
1670-
}
1671-
16721590
void Expr::applyFunction(
16731591
const SelectivityVector& rows,
16741592
EvalCtx& context,
16751593
VectorPtr& result) {
16761594
stats_.numProcessedVectors += 1;
16771595
stats_.numProcessedRows += rows.countSelected();
1678-
auto timer = cpuWallTimer(context);
1596+
auto timer = cpuWallTimer();
16791597

16801598
computeIsAsciiForInputs(vectorFunction_.get(), inputValues_, rows);
16811599
auto isAscii = type()->isVarchar()
@@ -1715,14 +1633,6 @@ void Expr::applyFunction(
17151633
result->asUnchecked<SimpleVector<StringView>>()->setIsAscii(
17161634
isAscii.value(), rows);
17171635
}
1718-
1719-
// Only do Adaptive Calibration if the adaptive sampling is on and we are in
1720-
// warmup or calibrating state.
1721-
if (context.adaptiveCpuSamplingEnabled() && isCalibrating()) {
1722-
finalizeAdaptiveCalibration(
1723-
context.adaptiveCpuSamplingMaxOverheadPct(),
1724-
context.timerOverheadNanos());
1725-
}
17261636
}
17271637

17281638
void Expr::evalSpecialFormWithStats(
@@ -1731,17 +1641,9 @@ void Expr::evalSpecialFormWithStats(
17311641
VectorPtr& result) {
17321642
stats_.numProcessedVectors += 1;
17331643
stats_.numProcessedRows += rows.countSelected();
1734-
auto timer = cpuWallTimer(context);
1644+
auto timer = cpuWallTimer();
17351645

17361646
evalSpecialForm(rows, context, result);
1737-
1738-
// Only do Adaptive Calibration if the adaptive sampling is on and we are in
1739-
// warmup or calibrating state.
1740-
if (context.adaptiveCpuSamplingEnabled() && isCalibrating()) {
1741-
finalizeAdaptiveCalibration(
1742-
context.adaptiveCpuSamplingMaxOverheadPct(),
1743-
context.timerOverheadNanos());
1744-
}
17451647
}
17461648

17471649
namespace {
@@ -1971,14 +1873,7 @@ ExprSet::ExprSet(
19711873
core::ExecCtx* execCtx,
19721874
bool enableConstantFolding,
19731875
bool lazyDereference)
1974-
: execCtx_(execCtx),
1975-
lazyDereference_(lazyDereference),
1976-
adaptiveCpuSampling_(
1977-
execCtx->queryCtx()->queryConfig().exprAdaptiveCpuSampling()),
1978-
adaptiveCpuSamplingMaxOverheadPct_(
1979-
execCtx->queryCtx()
1980-
->queryConfig()
1981-
.exprAdaptiveCpuSamplingMaxOverheadPct()) {
1876+
: execCtx_(execCtx), lazyDereference_(lazyDereference) {
19821877
exprs_ = compileExpressions(sources, execCtx, this, enableConstantFolding);
19831878
if (lazyDereference_) {
19841879
validateLazyDereference(exprs_);
@@ -1991,24 +1886,6 @@ ExprSet::ExprSet(
19911886
}
19921887

19931888
namespace {
1994-
1995-
/// If the expression is in adaptive sampling mode, extrapolate timing stats
1996-
/// to approximate full-population values. Otherwise, return raw stats.
1997-
exec::ExprStats adjustStats(const exec::Expr& expr) {
1998-
if (expr.isAdaptiveSampling() && expr.stats().timing.count > 0) {
1999-
exec::ExprStats adjusted = expr.stats();
2000-
double ratio = static_cast<double>(adjusted.numProcessedVectors) /
2001-
static_cast<double>(adjusted.timing.count);
2002-
adjusted.timing.cpuNanos = static_cast<uint64_t>(
2003-
static_cast<double>(adjusted.timing.cpuNanos) * ratio);
2004-
adjusted.timing.wallNanos = static_cast<uint64_t>(
2005-
static_cast<double>(adjusted.timing.wallNanos) * ratio);
2006-
adjusted.timing.count = adjusted.numProcessedVectors;
2007-
return adjusted;
2008-
}
2009-
return expr.stats();
2010-
}
2011-
20121889
void addStats(
20131890
const exec::Expr& expr,
20141891
std::unordered_map<std::string, exec::ExprStats>& stats,
@@ -2027,7 +1904,7 @@ void addStats(
20271904
bool emptyStats =
20281905
!expr.stats().numProcessedRows && !expr.stats().defaultNullRowsSkipped;
20291906
if (!emptyStats && !excludeSplFormExpr) {
2030-
stats[expr.name()].add(adjustStats(expr));
1907+
stats[expr.name()].add(expr.stats());
20311908
}
20321909

20331910
for (const auto& input : expr.inputs()) {
@@ -2139,24 +2016,6 @@ void printInputAndExprs(
21392016
}
21402017
} // namespace
21412018

2142-
void ExprSet::initializeAdaptiveCpuSampling(EvalCtx& context) {
2143-
context.setAdaptiveCpuSamplingEnabled(true);
2144-
context.setAdaptiveCpuSamplingMaxOverheadPct(
2145-
adaptiveCpuSamplingMaxOverheadPct_);
2146-
2147-
// Measure CpuWallTimer overhead once per ExprSet (platform constant).
2148-
if (!timerOverheadMeasured_) {
2149-
CpuWallTiming dummyTiming;
2150-
DeltaCpuWallTimeStopWatch overheadWatch;
2151-
{
2152-
auto dummy = std::make_unique<CpuWallTimer>(dummyTiming);
2153-
}
2154-
timerOverheadNanos_ = overheadWatch.elapsed().wallNanos;
2155-
timerOverheadMeasured_ = true;
2156-
}
2157-
context.setTimerOverheadNanos(timerOverheadNanos_);
2158-
}
2159-
21602019
void ExprSet::eval(
21612020
int32_t begin,
21622021
int32_t end,
@@ -2170,11 +2029,6 @@ void ExprSet::eval(
21702029
clearSharedSubexprs();
21712030
}
21722031

2173-
// Apply adaptive per-function CPU sampling if configured.
2174-
if (adaptiveCpuSampling_) {
2175-
initializeAdaptiveCpuSampling(context);
2176-
}
2177-
21782032
if (!lazyDereference_) {
21792033
// Make sure LazyVectors, referenced by multiple expressions, are loaded for
21802034
// all the "rows".

0 commit comments

Comments
 (0)