@@ -30,60 +30,64 @@ impl TextLogView {
3030 pub fn new ( view_name : & ' static str , context : ContextArc , args : TextLogArguments ) -> Self {
3131 let flush_interval_milliseconds =
3232 Duration :: try_milliseconds ( FLUSH_INTERVAL_MILLISECONDS ) . unwrap ( ) ;
33- let start = args. start ;
34- let end = args. end . clone ( ) ;
35- let query_ids = args. query_ids . clone ( ) ;
36- let logger_names = args. logger_names . clone ( ) ;
37- let hostname = args. hostname . clone ( ) ;
38- let message_filter = args. message_filter . clone ( ) ;
39- let max_level = args. max_level . clone ( ) ;
33+ let TextLogArguments {
34+ query_ids,
35+ logger_names,
36+ hostname,
37+ message_filter,
38+ max_level,
39+ start,
40+ end,
41+ } = args;
4042 let last_event_time_microseconds = Arc :: new ( Mutex :: new ( start) ) ;
4143
42- let delay = context. lock ( ) . unwrap ( ) . options . view . delay_interval ;
44+ let ( delay, is_cluster, wrap, no_strip_hostname_suffix) = {
45+ let ctx = context. lock ( ) . unwrap ( ) ;
46+ (
47+ ctx. options . view . delay_interval ,
48+ ctx. options . clickhouse . cluster . is_some ( ) ,
49+ ctx. options . view . wrap ,
50+ ctx. options . view . no_strip_hostname_suffix ,
51+ )
52+ } ;
4353
4454 let mut bg_runner = None ;
4555 // Start pulling only if the query did not finished, i.e. we don't know the end time.
4656 // (but respect the FLUSH_INTERVAL_MILLISECONDS)
4757 let now = Local :: now ( ) ;
4858 if logger_names. is_none ( )
49- && let Some ( mut end ) = end. get_date_time ( )
50- && ( ( now - end ) >= flush_interval_milliseconds || query_ids. is_none ( ) )
59+ && let Some ( mut end_date ) = end. get_date_time ( )
60+ && ( ( now - end_date ) >= flush_interval_milliseconds || query_ids. is_none ( ) )
5161 {
5262 // It is possible to have messages in the system.text_log, whose
5363 // event_time_microseconds > max(event_time_microseconds) from system.query_log
5464 // But let's consider that 3 seconds is enough.
5565 if query_ids. is_some ( ) {
56- end += Duration :: try_seconds ( 3 ) . unwrap ( ) ;
66+ end_date += Duration :: try_seconds ( 3 ) . unwrap ( ) ;
5767 }
5868 context. lock ( ) . unwrap ( ) . worker . send (
5969 true ,
6070 WorkerEvent :: TextLog (
6171 view_name,
6272 TextLogArguments {
63- query_ids : query_ids . clone ( ) ,
73+ query_ids,
6474 logger_names : None ,
6575 hostname,
66- message_filter : message_filter . clone ( ) ,
67- max_level : max_level . clone ( ) ,
76+ message_filter,
77+ max_level,
6878 start,
69- end : RelativeDateTime :: from ( end ) ,
79+ end : RelativeDateTime :: from ( end_date ) ,
7080 } ,
7181 ) ,
7282 ) ;
7383 } else {
74- let update_query_ids = query_ids. clone ( ) ;
75- let update_logger_names = logger_names. clone ( ) ;
76- let update_hostname = hostname. clone ( ) ;
77- let update_message_filter = message_filter. clone ( ) ;
78- let update_max_level = max_level. clone ( ) ;
7984 let update_last_event_time_microseconds = last_event_time_microseconds. clone ( ) ;
8085 let update_callback_context = context. clone ( ) ;
8186
8287 let is_first_invocation = Arc :: new ( Mutex :: new ( true ) ) ;
8388 let update_callback = move |force : bool | {
84- let mut is_first = is_first_invocation. lock ( ) . unwrap ( ) ;
85- let effective_force = if * is_first {
86- * is_first = false ;
89+ let effective_force = if * is_first_invocation. lock ( ) . unwrap ( ) {
90+ * is_first_invocation. lock ( ) . unwrap ( ) = false ;
8791 true
8892 } else {
8993 force
@@ -94,39 +98,35 @@ impl TextLogView {
9498 WorkerEvent :: TextLog (
9599 view_name,
96100 TextLogArguments {
97- query_ids : update_query_ids . clone ( ) ,
98- logger_names : update_logger_names . clone ( ) ,
99- hostname : update_hostname . clone ( ) ,
100- message_filter : update_message_filter . clone ( ) ,
101- max_level : update_max_level . clone ( ) ,
101+ query_ids : query_ids . clone ( ) ,
102+ logger_names : logger_names . clone ( ) ,
103+ hostname : hostname . clone ( ) ,
104+ message_filter : message_filter . clone ( ) ,
105+ max_level : max_level . clone ( ) ,
102106 start : * update_last_event_time_microseconds. lock ( ) . unwrap ( ) ,
103107 end : end. clone ( ) ,
104108 } ,
105109 ) ,
106110 ) ;
107111 } ;
108112
109- let bg_runner_cv = context. lock ( ) . unwrap ( ) . background_runner_cv . clone ( ) ;
110- let bg_runner_force = context. lock ( ) . unwrap ( ) . background_runner_force . clone ( ) ;
113+ let ( bg_runner_cv, bg_runner_force) = {
114+ let ctx = context. lock ( ) . unwrap ( ) ;
115+ (
116+ ctx. background_runner_cv . clone ( ) ,
117+ ctx. background_runner_force . clone ( ) ,
118+ )
119+ } ;
111120 let mut created_bg_runner = BackgroundRunner :: new ( delay, bg_runner_cv, bg_runner_force) ;
112121 created_bg_runner. start ( update_callback) ;
113122 bg_runner = Some ( created_bg_runner) ;
114123 }
115124
116- let is_cluster = context. lock ( ) . unwrap ( ) . options . clickhouse . cluster . is_some ( ) ;
117- let wrap = context. lock ( ) . unwrap ( ) . options . view . wrap ;
118- let no_strip_hostname_suffix = context
119- . lock ( )
120- . unwrap ( )
121- . options
122- . view
123- . no_strip_hostname_suffix ;
124- let view = TextLogView {
125+ TextLogView {
125126 inner_view : LogView :: new ( is_cluster, wrap, no_strip_hostname_suffix) ,
126127 last_event_time_microseconds,
127128 bg_runner,
128- } ;
129- return view;
129+ }
130130 }
131131
132132 pub fn update ( & mut self , logs_block : Columns ) -> Result < ( ) > {
0 commit comments