Skip to content

Commit a41be83

Browse files
committed
debug
1 parent 4ea5e21 commit a41be83

File tree

1 file changed

+23
-11
lines changed

1 file changed

+23
-11
lines changed

src/query/service/src/persistent_log/global_persistent_log.rs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use std::time::Duration;
1919

2020
use databend_common_base::base::GlobalInstance;
2121
use databend_common_base::runtime::spawn;
22-
use databend_common_base::runtime::GlobalIORuntime;
2322
use databend_common_base::runtime::MemStat;
2423
use databend_common_base::runtime::Runtime;
2524
use databend_common_base::runtime::ThreadTracker;
@@ -50,6 +49,7 @@ use futures_util::TryStreamExt;
5049
use log::error;
5150
use log::info;
5251
use rand::random;
52+
use tokio::sync::Mutex;
5353
use tokio::time::sleep;
5454
use uuid::Uuid;
5555

@@ -66,6 +66,12 @@ pub struct GlobalPersistentLog {
6666
initialized: AtomicBool,
6767
retention_interval: usize,
6868
tables: Vec<Arc<HistoryTable>>,
69+
_runtime: Arc<Runtime>,
70+
71+
// Observe transform and clean hang for the concurrent execution, so add a
72+
// lock to prevent it.
73+
local_transform_lock: Arc<Mutex<()>>,
74+
local_clean_lock: Arc<Mutex<()>>,
6975
}
7076

7177
impl GlobalPersistentLog {
@@ -76,6 +82,11 @@ impl GlobalPersistentLog {
7682
ErrorCode::Internal("Create MetaClient failed for GlobalPersistentLog")
7783
})?;
7884
let stage_name = cfg.log.history.stage_name.clone();
85+
let runtime = Arc::new(Runtime::with_worker_threads(
86+
4,
87+
Some("log-transform-worker".to_owned()),
88+
)?);
89+
7990
let instance = Arc::new(Self {
8091
meta_client,
8192
interval: cfg.log.history.interval as u64,
@@ -86,18 +97,15 @@ impl GlobalPersistentLog {
8697
initialized: AtomicBool::new(false),
8798
retention_interval: cfg.log.history.retention_interval,
8899
tables: init_history_tables(&cfg.log.history)?,
100+
local_transform_lock: Arc::new(Mutex::new(())),
101+
local_clean_lock: Arc::new(Mutex::new(())),
102+
_runtime: runtime.clone(),
89103
});
90104
GlobalInstance::set(instance);
91-
GlobalIORuntime::instance().spawn(async move {
92-
let runtime = Runtime::with_worker_threads(2, Some("log-transform-worker".to_owned()))?;
93-
runtime
94-
.spawn(async move {
95-
if let Err(e) = GlobalPersistentLog::instance().work().await {
96-
error!("System history exit with {}", e);
97-
}
98-
})
99-
.await?;
100-
Ok::<(), ErrorCode>(())
105+
runtime.spawn(async move {
106+
if let Err(e) = GlobalPersistentLog::instance().work().await {
107+
error!("System history exit with {}", e);
108+
}
101109
});
102110
Ok(())
103111
}
@@ -291,6 +299,7 @@ impl GlobalPersistentLog {
291299
}
292300

293301
pub async fn transform(&self, table: &HistoryTable, meta_key: &str) -> Result<bool> {
302+
let _local_lock_guard = self.local_transform_lock.lock().await;
294303
let may_permit = self
295304
.acquire(&format!("{}/{}/lock", meta_key, table.name), self.interval)
296305
.await?;
@@ -329,10 +338,12 @@ impl GlobalPersistentLog {
329338
drop(_guard);
330339
return Ok(true);
331340
}
341+
drop(_local_lock_guard);
332342
Ok(false)
333343
}
334344

335345
pub async fn clean(&self, table: &HistoryTable, meta_key: &str) -> Result<bool> {
346+
let _local_lock_guard = self.local_clean_lock.lock().await;
336347
let may_permit = self
337348
.acquire(
338349
&format!("{}/{}/lock", meta_key, table.name),
@@ -355,6 +366,7 @@ impl GlobalPersistentLog {
355366
drop(_guard);
356367
return Ok(true);
357368
}
369+
drop(_local_lock_guard);
358370
Ok(false)
359371
}
360372

0 commit comments

Comments
 (0)