Skip to content

Commit ab35efa

Browse files
committed
WIP
1 parent 7e58998 commit ab35efa

File tree

1 file changed

+42
-3
lines changed

1 file changed

+42
-3
lines changed

src/storage/src/hummock/compactor/table_change_log_compactor_runner.rs

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,49 @@ pub(crate) async fn compact_table_change_log(
8383
compact_task_to_string(&compact_task),
8484
);
8585
let group_label = compact_task.compaction_group_id.to_string();
86+
let mut task_status = TaskStatus::Success;
8687
let timer = context
8788
.compactor_metrics
8889
.compact_task_duration
8990
.with_label_values(&[&group_label, &"0".to_string()])
9091
.start_timer();
92+
let memory_detector = context
93+
.memory_limiter
94+
.try_require_memory(task_memory_capacity_with_parallelism);
95+
if memory_detector.is_none() {
96+
tracing::warn!(
97+
"Not enough memory to serve the task {} task_memory_capacity_with_parallelism {} memory_usage {} memory_quota {}",
98+
compact_task.task_id,
99+
task_memory_capacity_with_parallelism,
100+
context.memory_limiter.get_memory_usage(),
101+
context.memory_limiter.quota()
102+
);
103+
task_status = TaskStatus::NoAvailMemoryResourceCanceled;
104+
return (
105+
seal_table_change_log_compaction_task(
106+
compact_task,
107+
context.clone(),
108+
vec![],
109+
vec![],
110+
task_status,
111+
),
112+
memory_detector,
113+
);
114+
}
115+
116+
context.compactor_metrics.compact_task_pending_num.inc();
117+
context
118+
.compactor_metrics
119+
.compact_task_pending_parallelism
120+
.add(parallelism as _);
121+
let _release_metrics_guard =
122+
scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
123+
context.compactor_metrics.compact_task_pending_num.dec();
124+
context
125+
.compactor_metrics
126+
.compact_task_pending_parallelism
127+
.sub(parallelism as _);
128+
});
91129
let mut compaction_futures = vec![];
92130
let mut abort_handles = vec![];
93131
let task_progress_guard =
@@ -125,7 +163,6 @@ pub(crate) async fn compact_table_change_log(
125163
compaction_futures.push(handle);
126164
}
127165
let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
128-
let mut task_status = TaskStatus::Success;
129166
loop {
130167
tokio::select! {
131168
_ = &mut shutdown_rx => {
@@ -190,8 +227,10 @@ pub(crate) async fn compact_table_change_log(
190227
cost_time,
191228
compact_task_output_to_string(&compact_task)
192229
);
193-
// TODO(ZW): impl memory_detector
194-
((compact_task, table_stats, object_timestamps), None)
230+
(
231+
(compact_task, table_stats, object_timestamps),
232+
memory_detector,
233+
)
195234
}
196235

197236
fn estimate_table_change_log_task_output_capacity(

0 commit comments

Comments
 (0)