Skip to content

Commit 9fe9205

Browse files
committed
feat: Add a new database client to store shared state for persistence
1 parent d2856f0 commit 9fe9205

4 files changed

Lines changed: 129 additions & 3 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,4 @@ uuid = { version = "1.23.1", features = ["v4"] }
4141
dotenv = "0.15.0"
4242
url = "2.5.8"
4343
regex = "1.12.3"
44+
rusqlite = { version = "0.31", features = ["bundled"] }

src/database.rs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
use crate::settings::{PutItem, RsyncTrack, Status};
2+
use rusqlite::{params, Connection};
3+
use std::collections::HashMap;
4+
5+
/// Opens (or creates) the SQLite database and ensures the schema exists.
6+
pub fn open() -> Connection {
7+
let conn = Connection::open("rutorrent.db").expect("Failed to open database");
8+
conn.execute_batch(
9+
"CREATE TABLE IF NOT EXISTS state (
10+
hash TEXT PRIMARY KEY,
11+
name TEXT NOT NULL,
12+
status TEXT NOT NULL,
13+
progress REAL NOT NULL DEFAULT 0.0,
14+
url TEXT NOT NULL,
15+
save_path TEXT NOT NULL,
16+
remote_host TEXT NOT NULL,
17+
remote_user TEXT NOT NULL,
18+
remote_path TEXT NOT NULL,
19+
delete_after_copy INTEGER NOT NULL DEFAULT 0
20+
);",
21+
)
22+
.expect("Failed to create schema");
23+
conn
24+
}
25+
26+
/// Inserts or replaces a tracked torrent entry.
27+
pub fn upsert(conn: &Connection, hash: &str, entry: &RsyncTrack) {
28+
let (status, progress) = encode_status(&entry.status);
29+
conn.execute(
30+
"INSERT OR REPLACE INTO state
31+
(hash, name, status, progress, url, save_path, remote_host, remote_user, remote_path, delete_after_copy)
32+
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
33+
params![
34+
hash,
35+
entry.name,
36+
status,
37+
progress,
38+
entry.put_item.url,
39+
entry.put_item.save_path,
40+
entry.put_item.remote_host,
41+
entry.put_item.remote_username,
42+
entry.put_item.remote_path,
43+
entry.put_item.delete_after_copy as i32,
44+
],
45+
)
46+
.expect("Failed to upsert state");
47+
}
48+
49+
/// Removes a torrent entry by hash.
50+
pub fn remove(conn: &Connection, hash: &str) {
51+
conn.execute("DELETE FROM state WHERE hash = ?1", params![hash])
52+
.expect("Failed to remove state");
53+
}
54+
55+
/// Loads all persisted entries back into a HashMap on startup.
56+
pub fn load_all(conn: &Connection) -> HashMap<String, RsyncTrack> {
57+
let mut stmt = conn
58+
.prepare("SELECT hash, name, status, progress, url, save_path, remote_host, remote_user, remote_path, delete_after_copy FROM state")
59+
.expect("Failed to prepare load query");
60+
61+
stmt.query_map([], |row| {
62+
let hash: String = row.get(0)?;
63+
let name: String = row.get(1)?;
64+
let status_str: String = row.get(2)?;
65+
let progress: f64 = row.get(3)?;
66+
let url: String = row.get(4)?;
67+
let save_path: String = row.get(5)?;
68+
let remote_host: String = row.get(6)?;
69+
let remote_username: String = row.get(7)?;
70+
let remote_path: String = row.get(8)?;
71+
let delete_after_copy: i32 = row.get(9)?;
72+
73+
let status = decode_status(&status_str, progress);
74+
let put_item = PutItem {
75+
url,
76+
name: None,
77+
hash: None,
78+
trackers: None,
79+
save_path,
80+
remote_host,
81+
remote_username,
82+
remote_path,
83+
delete_after_copy: delete_after_copy != 0,
84+
};
85+
86+
Ok((hash, RsyncTrack { name, status, put_item }))
87+
})
88+
.expect("Failed to query state")
89+
.filter_map(|r| r.ok())
90+
.collect()
91+
}
92+
93+
fn encode_status(status: &Status) -> (&'static str, f64) {
94+
match status {
95+
Status::Downloading(p) => ("Downloading", *p),
96+
Status::Copying(p) => ("Copying", *p),
97+
Status::Completed => ("Completed", 1.0),
98+
Status::Failed => ("Failed", 0.0),
99+
}
100+
}
101+
102+
fn decode_status(s: &str, progress: f64) -> Status {
103+
match s {
104+
"Copying" => Status::Copying(progress),
105+
"Completed" => Status::Completed,
106+
"Failed" => Status::Failed,
107+
_ => Status::Downloading(progress),
108+
}
109+
}

src/lib.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ mod settings;
1414
mod squire;
1515
mod swagger;
1616
mod telegram;
17+
mod database;
1718

1819
/// Contains entrypoint and initializer settings to trigger the asynchronous `HTTPServer`
1920
///
@@ -29,13 +30,16 @@ pub async fn start() -> std::io::Result<()> {
2930
squire::load_env_file();
3031
let config = settings::Config::new();
3132
logger::init_logger(config.utc_logger, config.log_level);
32-
let state: settings::SharedState = Arc::new(RwLock::new(HashMap::new()));
33+
let db_conn = database::open();
34+
let initial_state = database::load_all(&db_conn);
35+
log::info!("Loaded {} entries from database", initial_state.len());
36+
let state: settings::SharedState = Arc::new(RwLock::new(initial_state));
3337
let pending: settings::PendingMap = Arc::new(RwLock::new(HashMap::new()));
3438

3539
let client = qb::client(&config)
3640
.await
3741
.expect("Failed to authenticate qBittorrent");
38-
squire::spawn_worker(client, state.clone(), pending.clone(), config.clone());
42+
squire::spawn_worker(client, state.clone(), pending.clone(), config.clone(), db_conn);
3943

4044
let host = config.host.clone();
4145
let port = config.port;
@@ -61,6 +65,7 @@ pub async fn start() -> std::io::Result<()> {
6165
.route("/torrent", web::delete().to(api::delete_torrent))
6266
.route("/swagger", web::get().to(swagger::redirector))
6367
.route("/ui", web::get().to(swagger::redirector))
68+
.route("/", web::get().to(swagger::redirector))
6469
.service(swagger::service())
6570
})
6671
.bind((host, port))?

src/squire.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{ntfy, qb, rsync, settings, telegram};
1+
use crate::{database, ntfy, qb, rsync, settings, telegram};
22
use regex::Regex;
33
use reqwest::Client;
44
use serde_json::Value;
@@ -124,7 +124,9 @@ pub fn spawn_worker(
124124
state: settings::SharedState,
125125
pending: settings::PendingMap,
126126
config: settings::Config,
127+
db_conn: rusqlite::Connection,
127128
) {
129+
let db_conn = std::sync::Mutex::new(db_conn);
128130
tokio::spawn(async move {
129131
log::info!("Worker started");
130132

@@ -204,6 +206,9 @@ pub fn spawn_worker(
204206
if !returned.contains(h.as_str()) {
205207
log::info!("Torrent removed from QBitAPI, dropping from state: {}", h);
206208
db.remove(h);
209+
if let Ok(conn) = db_conn.lock() {
210+
database::remove(&conn, h);
211+
}
207212
}
208213
});
209214

@@ -229,6 +234,9 @@ pub fn spawn_worker(
229234
config_cloned,
230235
);
231236
db.remove(&hash);
237+
if let Ok(conn) = db_conn.lock() {
238+
database::remove(&conn, &hash);
239+
}
232240
}
233241

234242
settings::Status::Completed => {
@@ -266,6 +274,9 @@ pub fn spawn_worker(
266274
}
267275
}
268276
db.remove(&hash);
277+
if let Ok(conn) = db_conn.lock() {
278+
database::remove(&conn, &hash);
279+
}
269280
}
270281

271282
settings::Status::Downloading(_) => {

0 commit comments

Comments
 (0)