Skip to content

Commit 9ec7d3e

Browse files
committed
chore: updated tiers watcher to use tokio async
1 parent 4eb6deb commit 9ec7d3e

File tree

1 file changed

+31
-13
lines changed

1 file changed

+31
-13
lines changed

proxy/src/tiers.rs

+31-13
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
use notify::{PollWatcher, RecursiveMode, Watcher};
1+
use notify::{Event, PollWatcher, RecursiveMode, Watcher};
22
use regex::Regex;
33
use serde::{Deserialize, Deserializer};
44
use serde_json::Value;
55
use std::{error::Error, fs, sync::Arc, time::Duration};
6+
use tokio::runtime::{Handle, Runtime};
67
use tracing::{error, info, instrument, warn};
78

89
use crate::State;
@@ -53,13 +54,22 @@ pub fn start(state: Arc<State>) {
5354
return;
5455
}
5556

56-
let (tx, rx) = std::sync::mpsc::channel();
57+
let (tx, mut rx) = tokio::sync::mpsc::channel::<Event>(1);
5758

5859
let watcher_config = notify::Config::default()
5960
.with_compare_contents(true)
6061
.with_poll_interval(state.config.proxy_tiers_poll_interval);
6162

62-
let watcher_result = PollWatcher::new(tx, watcher_config);
63+
let watcher_result = PollWatcher::new(
64+
move |res| {
65+
if let Ok(event) = res {
66+
runtime_handle()
67+
.block_on(async { tx.send(event).await })
68+
.unwrap();
69+
}
70+
},
71+
watcher_config,
72+
);
6373
if let Err(err) = watcher_result {
6474
error!(error = err.to_string(), "error to watcher tier");
6575
return;
@@ -73,17 +83,15 @@ pub fn start(state: Arc<State>) {
7383
return;
7484
}
7585

76-
for result in rx {
77-
match result {
78-
Ok(_event) => {
79-
if let Err(err) = update_tiers(state.clone()).await {
80-
error!(error = err.to_string(), "error to update tiers");
81-
continue;
82-
}
83-
84-
info!("tiers modified");
86+
loop {
87+
let result = rx.recv().await;
88+
if result.is_some() {
89+
if let Err(err) = update_tiers(state.clone()).await {
90+
error!(error = err.to_string(), "error to update tiers");
91+
continue;
8592
}
86-
Err(err) => error!(error = err.to_string(), "watch error"),
93+
94+
info!("tiers modified");
8795
}
8896
}
8997
});
@@ -110,3 +118,13 @@ async fn update_tiers(state: Arc<State>) -> Result<(), Box<dyn Error>> {
110118

111119
Ok(())
112120
}
121+
122+
fn runtime_handle() -> Handle {
123+
match Handle::try_current() {
124+
Ok(h) => h,
125+
Err(_) => {
126+
let rt = Runtime::new().unwrap();
127+
rt.handle().clone()
128+
}
129+
}
130+
}

0 commit comments

Comments
 (0)