@@ -19,7 +19,6 @@ use std::time::Duration;
19
19
20
20
use databend_common_base:: base:: GlobalInstance ;
21
21
use databend_common_base:: runtime:: spawn;
22
- use databend_common_base:: runtime:: GlobalIORuntime ;
23
22
use databend_common_base:: runtime:: MemStat ;
24
23
use databend_common_base:: runtime:: Runtime ;
25
24
use databend_common_base:: runtime:: ThreadTracker ;
@@ -50,6 +49,7 @@ use futures_util::TryStreamExt;
50
49
use log:: error;
51
50
use log:: info;
52
51
use rand:: random;
52
+ use tokio:: sync:: Mutex ;
53
53
use tokio:: time:: sleep;
54
54
use uuid:: Uuid ;
55
55
@@ -66,6 +66,12 @@ pub struct GlobalPersistentLog {
66
66
initialized : AtomicBool ,
67
67
retention_interval : usize ,
68
68
tables : Vec < Arc < HistoryTable > > ,
69
+ _runtime : Arc < Runtime > ,
70
+
71
+ // Observe transform and clean hang for the concurrent execution, so add a
72
+ // lock to prevent it.
73
+ local_transform_lock : Arc < Mutex < ( ) > > ,
74
+ local_clean_lock : Arc < Mutex < ( ) > > ,
69
75
}
70
76
71
77
impl GlobalPersistentLog {
@@ -76,6 +82,11 @@ impl GlobalPersistentLog {
76
82
ErrorCode :: Internal ( "Create MetaClient failed for GlobalPersistentLog" )
77
83
} ) ?;
78
84
let stage_name = cfg. log . history . stage_name . clone ( ) ;
85
+ let runtime = Arc :: new ( Runtime :: with_worker_threads (
86
+ 4 ,
87
+ Some ( "log-transform-worker" . to_owned ( ) ) ,
88
+ ) ?) ;
89
+
79
90
let instance = Arc :: new ( Self {
80
91
meta_client,
81
92
interval : cfg. log . history . interval as u64 ,
@@ -86,18 +97,15 @@ impl GlobalPersistentLog {
86
97
initialized : AtomicBool :: new ( false ) ,
87
98
retention_interval : cfg. log . history . retention_interval ,
88
99
tables : init_history_tables ( & cfg. log . history ) ?,
100
+ local_transform_lock : Arc :: new ( Mutex :: new ( ( ) ) ) ,
101
+ local_clean_lock : Arc :: new ( Mutex :: new ( ( ) ) ) ,
102
+ _runtime : runtime. clone ( ) ,
89
103
} ) ;
90
104
GlobalInstance :: set ( instance) ;
91
- GlobalIORuntime :: instance ( ) . spawn ( async move {
92
- let runtime = Runtime :: with_worker_threads ( 2 , Some ( "log-transform-worker" . to_owned ( ) ) ) ?;
93
- runtime
94
- . spawn ( async move {
95
- if let Err ( e) = GlobalPersistentLog :: instance ( ) . work ( ) . await {
96
- error ! ( "System history exit with {}" , e) ;
97
- }
98
- } )
99
- . await ?;
100
- Ok :: < ( ) , ErrorCode > ( ( ) )
105
+ runtime. spawn ( async move {
106
+ if let Err ( e) = GlobalPersistentLog :: instance ( ) . work ( ) . await {
107
+ error ! ( "System history exit with {}" , e) ;
108
+ }
101
109
} ) ;
102
110
Ok ( ( ) )
103
111
}
@@ -291,6 +299,7 @@ impl GlobalPersistentLog {
291
299
}
292
300
293
301
pub async fn transform ( & self , table : & HistoryTable , meta_key : & str ) -> Result < bool > {
302
+ let _local_lock_guard = self . local_transform_lock . lock ( ) . await ;
294
303
let may_permit = self
295
304
. acquire ( & format ! ( "{}/{}/lock" , meta_key, table. name) , self . interval )
296
305
. await ?;
@@ -329,10 +338,12 @@ impl GlobalPersistentLog {
329
338
drop ( _guard) ;
330
339
return Ok ( true ) ;
331
340
}
341
+ drop ( _local_lock_guard) ;
332
342
Ok ( false )
333
343
}
334
344
335
345
pub async fn clean ( & self , table : & HistoryTable , meta_key : & str ) -> Result < bool > {
346
+ let _local_lock_guard = self . local_clean_lock . lock ( ) . await ;
336
347
let may_permit = self
337
348
. acquire (
338
349
& format ! ( "{}/{}/lock" , meta_key, table. name) ,
@@ -355,6 +366,7 @@ impl GlobalPersistentLog {
355
366
drop ( _guard) ;
356
367
return Ok ( true ) ;
357
368
}
369
+ drop ( _local_lock_guard) ;
358
370
Ok ( false )
359
371
}
360
372
0 commit comments