Skip to content

Commit 317c0ae

Browse files
committed
feat: terminate gracefully #9
1 parent 43c6eeb commit 317c0ae

File tree

1 file changed

+135
-67
lines changed

1 file changed

+135
-67
lines changed

src/main.rs

Lines changed: 135 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,29 @@
11
use aw_client_rust::AwClient;
22
use aw_models::{Bucket, Event};
33
use chrono::{DateTime, Duration as ChronoDuration, NaiveDateTime, TimeDelta, Utc};
4-
use regex::Regex;
5-
use std::path::PathBuf;
4+
use dirs::config_dir;
65
use env_logger::Env;
76
use log::{info, warn};
7+
use regex::Regex;
88
use reqwest;
99
use serde_json::{Map, Value};
1010
use serde_yaml;
1111
use std::env;
12-
use dirs::config_dir;
1312
use std::fs::{DirBuilder, File};
1413
use std::io::prelude::*;
14+
use std::process;
1515
use std::thread::sleep;
16+
use tokio::signal;
1617
use tokio::time::{interval, Duration};
18+
#[cfg(unix)]
19+
use tokio::signal::unix::{signal, SignalKind};
1720

1821
fn parse_time_string(time_str: &str) -> Option<ChronoDuration> {
1922
let re = Regex::new(r"^(\d+)([dhm])$").unwrap();
2023
if let Some(caps) = re.captures(time_str) {
2124
let amount: i64 = caps.get(1)?.as_str().parse().ok()?;
2225
let unit = caps.get(2)?.as_str();
23-
26+
2427
match unit {
2528
"d" => Some(ChronoDuration::days(amount)),
2629
"h" => Some(ChronoDuration::hours(amount)),
@@ -52,24 +55,21 @@ async fn sync_historical_data(
5255
info!("Syncing {} historical tracks...", tracks.len());
5356
for track in tracks.iter().rev() {
5457
let mut event_data: Map<String, Value> = Map::new();
55-
58+
5659
event_data.insert("title".to_string(), track["name"].to_owned());
57-
event_data.insert(
58-
"artist".to_string(),
59-
track["artist"]["#text"].to_owned(),
60-
);
61-
event_data.insert(
62-
"album".to_string(),
63-
track["album"]["#text"].to_owned(),
64-
);
60+
event_data.insert("artist".to_string(), track["artist"]["#text"].to_owned());
61+
event_data.insert("album".to_string(), track["album"]["#text"].to_owned());
6562

6663
// Get timestamp from the track
6764
if let Some(date) = track["date"]["uts"].as_str() {
6865
if let Ok(timestamp) = date.parse::<i64>() {
6966
// TODO: remove the deprecated from_utc and from_timestamp
7067
let event = Event {
7168
id: None,
72-
timestamp: DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(timestamp, 0), Utc),
69+
timestamp: DateTime::<Utc>::from_utc(
70+
NaiveDateTime::from_timestamp(timestamp, 0),
71+
Utc,
72+
),
7373
duration: TimeDelta::seconds(30),
7474
data: event_data,
7575
};
@@ -120,6 +120,117 @@ async fn create_bucket(aw_client: &AwClient) -> Result<(), Box<dyn std::error::E
120120
}
121121
}
122122
}
123+
#[cfg(unix)]
124+
async fn run_unix_loop(
125+
mut interval: tokio::time::Interval,
126+
client: reqwest::Client,
127+
url: String,
128+
aw_client: AwClient,
129+
polling_time: TimeDelta,
130+
polling_interval: u64,
131+
) {
132+
let mut sigterm = signal(SignalKind::terminate())
133+
.expect("Failed to set up SIGTERM handler");
134+
135+
loop {
136+
tokio::select! {
137+
_ = signal::ctrl_c() => {
138+
info!("Ctrl+C received, shutting down...");
139+
process::exit(0);
140+
}
141+
_ = sigterm.recv() => {
142+
info!("SIGTERM received, shutting down...");
143+
process::exit(0);
144+
}
145+
_ = interval.tick() => {
146+
handle_lastfm_update(&client, &url, &aw_client, polling_time, polling_interval).await;
147+
}
148+
}
149+
}
150+
}
151+
152+
#[cfg(windows)]
153+
async fn run_windows_loop(
154+
mut interval: tokio::time::Interval,
155+
client: reqwest::Client,
156+
url: String,
157+
aw_client: AwClient,
158+
polling_time: TimeDelta,
159+
polling_interval: u64,
160+
) {
161+
loop {
162+
tokio::select! {
163+
_ = signal::ctrl_c() => {
164+
info!("Ctrl+C received, shutting down...");
165+
process::exit(0);
166+
}
167+
_ = interval.tick() => {
168+
handle_lastfm_update(&client, &url, &aw_client, polling_time, polling_interval).await;
169+
}
170+
}
171+
}
172+
}
173+
174+
async fn handle_lastfm_update(
175+
client: &reqwest::Client,
176+
url: &str,
177+
aw_client: &AwClient,
178+
polling_time: TimeDelta,
179+
polling_interval: u64,
180+
) {
181+
let response = client.get(url).send().await;
182+
let v: Value = match response {
183+
Ok(response) => match response.json().await {
184+
Ok(json) => json,
185+
Err(e) => {
186+
warn!("Error parsing json: {}", e);
187+
return;
188+
}
189+
},
190+
Err(_) => {
191+
warn!("Error connecting to last.fm");
192+
return;
193+
}
194+
};
195+
196+
if v["recenttracks"]["track"][0]["@attr"]["nowplaying"].as_str() != Some("true") {
197+
info!("No song is currently playing");
198+
return;
199+
}
200+
201+
let mut event_data: Map<String, Value> = Map::new();
202+
info!(
203+
"Track: {} - {}",
204+
v["recenttracks"]["track"][0]["name"], v["recenttracks"]["track"][0]["artist"]["#text"]
205+
);
206+
207+
event_data.insert(
208+
"title".to_string(),
209+
v["recenttracks"]["track"][0]["name"].to_owned(),
210+
);
211+
event_data.insert(
212+
"artist".to_string(),
213+
v["recenttracks"]["track"][0]["artist"]["#text"].to_owned(),
214+
);
215+
event_data.insert(
216+
"album".to_string(),
217+
v["recenttracks"]["track"][0]["album"]["#text"].to_owned(),
218+
);
219+
220+
let event = Event {
221+
id: None,
222+
timestamp: Utc::now(),
223+
duration: polling_time,
224+
data: event_data,
225+
};
226+
227+
aw_client
228+
.heartbeat("aw-watcher-lastfm", &event, polling_interval as f64)
229+
.await
230+
.unwrap_or_else(|e| {
231+
warn!("Error sending heartbeat: {:?}", e);
232+
});
233+
}
123234

124235
#[tokio::main]
125236
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -147,8 +258,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
147258
}
148259
"--sync" => {
149260
if idx + 1 < args.len() {
150-
sync_duration = Some(parse_time_string(&args[idx + 1])
151-
.expect("Invalid sync duration format. Use format: 7d, 24h, or 30m"));
261+
sync_duration = Some(
262+
parse_time_string(&args[idx + 1])
263+
.expect("Invalid sync duration format. Use format: 7d, 24h, or 30m"),
264+
);
152265
idx += 2;
153266
} else {
154267
panic!("--sync requires a duration value (e.g., 7d, 24h, 30m)");
@@ -252,56 +365,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
252365
info!("Starting real-time tracking...");
253366
}
254367

255-
loop {
256-
interval.tick().await;
257-
258-
let response = client.get(&url).send().await;
259-
let v: Value = match response {
260-
Ok(response) => match response.json().await {
261-
Ok(json) => json,
262-
Err(e) => {
263-
warn!("Error parsing json: {}", e);
264-
continue;
265-
}
266-
},
267-
Err(_) => {
268-
warn!("Error connecting to last.fm");
269-
continue;
270-
}
271-
};
368+
#[cfg(unix)]
369+
run_unix_loop(interval, client, url, aw_client, polling_time, polling_interval).await;
272370

273-
if v["recenttracks"]["track"][0]["@attr"]["nowplaying"].as_str() != Some("true") {
274-
info!("No song is currently playing");
275-
continue;
276-
}
277-
let mut event_data: Map<String, Value> = Map::new();
278-
info!(
279-
"Track: {} - {}",
280-
v["recenttracks"]["track"][0]["name"], v["recenttracks"]["track"][0]["artist"]["#text"]
281-
);
282-
event_data.insert(
283-
"title".to_string(),
284-
v["recenttracks"]["track"][0]["name"].to_owned(),
285-
);
286-
event_data.insert(
287-
"artist".to_string(),
288-
v["recenttracks"]["track"][0]["artist"]["#text"].to_owned(),
289-
);
290-
event_data.insert(
291-
"album".to_string(),
292-
v["recenttracks"]["track"][0]["album"]["#text"].to_owned(),
293-
);
294-
let event = Event {
295-
id: None,
296-
timestamp: Utc::now(),
297-
duration: polling_time,
298-
data: event_data,
299-
};
300-
aw_client
301-
.heartbeat("aw-watcher-lastfm", &event, polling_interval as f64)
302-
.await
303-
.unwrap_or_else(|e| {
304-
warn!("Error sending heartbeat: {:?}", e);
305-
});
306-
}
371+
#[cfg(windows)]
372+
run_windows_loop(interval, client, url, aw_client, polling_time, polling_interval).await;
373+
374+
return Ok(());
307375
}

0 commit comments

Comments
 (0)