@@ -9,7 +9,7 @@ use std::{cell::RefCell, mem, time::Duration};
99use tlog_tiles:: { PathElem , TlogTile } ;
1010use worker:: { Bucket , Env , Error as WorkerError , Object , Request , Response , State , Storage } ;
1111
12- use crate :: { load_public_bucket, log_ops:: CHECKPOINT_KEY , util:: now_millis} ;
12+ use crate :: { load_public_bucket, log_ops:: CHECKPOINT_KEY , obs , util:: now_millis} ;
1313
1414// Workers are limited to 1000 subrequests per invocation (including R2 operations).
1515// For each log, we'll need to perform the following subrequests:
@@ -36,13 +36,15 @@ pub struct CleanerConfig {
3636}
3737
3838pub struct GenericCleaner {
39+ state : State ,
3940 config : CleanerConfig ,
4041 storage : Storage ,
4142 bucket : Bucket ,
4243 cleaned_size : RefCell < u64 > ,
4344 current_size : RefCell < u64 > ,
4445 subrequests : RefCell < usize > ,
4546 initialized : RefCell < bool > ,
47+ wshim : Option < obs:: Wshim > ,
4648}
4749
4850impl GenericCleaner {
@@ -51,16 +53,19 @@ impl GenericCleaner {
5153 /// # Panics
5254 ///
5355 /// Panics if we can't get a handle for the public bucket.
54- pub fn new ( state : & State , env : & Env , config : CleanerConfig ) -> Self {
56+ pub fn new ( state : State , env : & Env , config : CleanerConfig ) -> Self {
5557 let bucket = load_public_bucket ( env, & config. name ) . unwrap ( ) ;
58+ let wshim = obs:: Wshim :: from_env ( env) . ok ( ) ;
5659 Self {
5760 storage : state. storage ( ) ,
61+ state,
5862 config,
5963 bucket,
6064 cleaned_size : RefCell :: new ( 0 ) ,
6165 current_size : RefCell :: new ( 0 ) ,
6266 subrequests : RefCell :: new ( 0 ) ,
6367 initialized : RefCell :: new ( false ) ,
68+ wshim,
6469 }
6570 }
6671
@@ -96,10 +101,13 @@ impl GenericCleaner {
96101 /// # Errors
97102 /// Will return an error if initialization fails.
98103 pub async fn fetch ( & self , _req : Request ) -> Result < Response , WorkerError > {
99- if !* self . initialized . borrow ( ) {
100- self . initialize ( ) . await ?;
101- }
102- Response :: ok ( "Started cleaner" )
104+ self . with_obs ( async || {
105+ if !* self . initialized . borrow ( ) {
106+ self . initialize ( ) . await ?;
107+ }
108+ Response :: ok ( "Started cleaner" )
109+ } )
110+ . await
103111 }
104112
105113 /// Alarm handler for the partial tile cleaner. This runs in a loop
@@ -109,6 +117,10 @@ impl GenericCleaner {
109117 /// # Errors
110118 /// Will return an error if initialization or cleaning fails.
111119 pub async fn alarm ( & self ) -> Result < Response , WorkerError > {
120+ self . with_obs ( async || self . alarm_impl ( ) . await ) . await
121+ }
122+
123+ async fn alarm_impl ( & self ) -> Result < Response , WorkerError > {
112124 // Reset the subrequest count.
113125 * self . subrequests . borrow_mut ( ) = 0 ;
114126
@@ -293,4 +305,17 @@ impl GenericCleaner {
293305 * self . subrequests . borrow_mut ( ) += new;
294306 Ok ( ( ) )
295307 }
308+
309+ async fn with_obs < F , R > ( & self , f : F ) -> R
310+ where
311+ F : AsyncFnOnce ( ) -> R ,
312+ {
313+ let r = f ( ) . await ;
314+ if let Some ( wshim) = self . wshim . clone ( ) {
315+ self . state . wait_until ( async move {
316+ wshim. flush ( & obs:: logs:: LOGGER ) . await ;
317+ } ) ;
318+ }
319+ r
320+ }
296321}
0 commit comments