Skip to content

Commit 0cc46fc

Browse files
Merge pull request #17 from OpenArchive/create-backup-server
Create backup server
2 parents 094b4ed + 697014d commit 0cc46fc

File tree

8 files changed

+1011
-71
lines changed

8 files changed

+1011
-71
lines changed

Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,17 @@ xdg = "2.4"
1313
tmpdir = "1"
1414
serde = "1.0.204"
1515
serde_cbor = "0.11.2"
16-
clap = "4.5.9"
16+
clap = { version = "4.5.9", features = ["derive"] }
1717
anyhow = "1.0.86"
1818
tokio = {version ="1.39.3", features=["full"] }
1919
tokio-stream = "0.1.16"
2020
async-stream = "0.3.5"
21-
futures-core = "0.3.30"
21+
futures = "0.3.31"
22+
futures-core = "0.3.31"
23+
futures-util = "0.3.31"
2224
bytes = "1.6.1"
2325
serial_test = "3.1.1"
2426
url = "2.5.2"
2527
hex = "0.4.3"
2628
rand = "0.8.5"
29+
base64 = "0.22.1"

src/backend.rs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ pub struct BackendInner {
4444
update_rx: Option<broadcast::Receiver<VeilidUpdate>>,
4545
groups: HashMap<CryptoKey, Box<Group>>,
4646
pub iroh_blobs: Option<VeilidIrohBlobs>,
47+
on_new_route_callback: Option<OnNewRouteCallback>,
4748
}
4849

4950
impl BackendInner {
@@ -85,6 +86,7 @@ impl BackendInner {
8586
}
8687
}
8788

89+
#[derive(Clone)]
8890
pub struct Backend {
8991
inner: Arc<Mutex<BackendInner>>,
9092
}
@@ -97,6 +99,7 @@ impl Backend {
9799
update_rx: None,
98100
groups: HashMap::new(),
99101
iroh_blobs: None,
102+
on_new_route_callback: None,
100103
};
101104

102105
let backend = Backend {
@@ -118,6 +121,7 @@ impl Backend {
118121
update_rx: Some(update_rx),
119122
groups: HashMap::new(),
120123
iroh_blobs: None,
124+
on_new_route_callback: None,
121125
};
122126

123127
let backend = Backend {
@@ -126,12 +130,16 @@ impl Backend {
126130

127131
let inner_clone = backend.inner.clone();
128132

129-
let on_new_route_callback: OnNewRouteCallback = Arc::new(move |_, _| {
133+
let on_new_route_callback: OnNewRouteCallback = Arc::new(move |route_id, route_id_blob| {
130134
let inner = inner_clone.clone();
131135
println!("Re-generating route");
132136
tokio::spawn(async move {
133137
let inner = inner.lock().await;
134138

139+
if let Some(on_new_route) = &inner.on_new_route_callback {
140+
on_new_route(route_id, route_id_blob)
141+
}
142+
135143
for group in inner.groups.clone().into_values() {
136144
if let Some(repo) = group.get_own_repo().await {
137145
if let Err(err) = repo.update_route_on_dht().await {
@@ -204,12 +212,16 @@ impl Backend {
204212

205213
let inner_clone = self.inner.clone();
206214

207-
let on_new_route_callback: OnNewRouteCallback = Arc::new(move |_, _| {
215+
let on_new_route_callback: OnNewRouteCallback = Arc::new(move |route_id, route_id_blob| {
208216
let inner = inner_clone.clone();
209217
println!("Re-generating route");
210218
tokio::spawn(async move {
211219
let inner = inner.lock().await;
212220

221+
if let Some(on_new_route) = &inner.on_new_route_callback {
222+
on_new_route(route_id, route_id_blob)
223+
}
224+
213225
for group in inner.groups.clone().into_values() {
214226
if let Some(repo) = group.get_own_repo().await {
215227
if let Err(err) = repo.update_route_on_dht().await {
@@ -264,11 +276,27 @@ impl Backend {
264276
Ok(())
265277
}
266278

279+
pub async fn set_on_new_route_callback(
280+
&self,
281+
on_new_route_connected_callback: OnNewRouteCallback,
282+
) {
283+
let mut inner = self.inner.lock().await;
284+
inner.on_new_route_callback = Some(on_new_route_connected_callback);
285+
}
286+
267287
pub async fn join_from_url(&self, url_string: &str) -> Result<Box<Group>> {
268288
let keys = parse_url(url_string)?;
269289
self.join_group(keys).await
270290
}
271291

292+
pub async fn get_route_id_blob(&self) -> Result<Vec<u8>> {
293+
if let Some(blobs) = self.get_iroh_blobs().await {
294+
Ok(blobs.route_id_blob().await)
295+
} else {
296+
Err(anyhow!("Veilid not initialized"))
297+
}
298+
}
299+
272300
pub async fn join_group(&self, keys: CommonKeypair) -> Result<Box<Group>> {
273301
let mut inner = self.inner.lock().await;
274302

@@ -487,6 +515,11 @@ impl Backend {
487515
let mut inner = self.inner.lock().await;
488516
inner.iroh_blobs.clone()
489517
}
518+
519+
pub async fn get_routing_context(&self) -> Option<RoutingContext> {
520+
let veilid_api = self.get_veilid_api().await?;
521+
veilid_api.routing_context().ok()
522+
}
490523
}
491524

492525
async fn wait_for_network(update_rx: &mut broadcast::Receiver<VeilidUpdate>) -> Result<()> {
@@ -511,7 +544,7 @@ fn find_query(url: &Url, key: &str) -> Result<String> {
511544
Err(anyhow!("Unable to find parameter {} in URL {:?}", key, url))
512545
}
513546

514-
fn crypto_key_from_query(url: &Url, key: &str) -> Result<CryptoKey> {
547+
pub fn crypto_key_from_query(url: &Url, key: &str) -> Result<CryptoKey> {
515548
let value = find_query(url, key)?;
516549
let bytes = hex::decode(value)?;
517550
let mut key_vec: [u8; CRYPTO_KEY_LENGTH] = [0; CRYPTO_KEY_LENGTH];

src/common.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,28 @@
11
#![allow(async_fn_in_trait)]
22
#![allow(clippy::async_yields_async)]
33

4-
use crate::constants::ROUTE_ID_DHT_KEY;
54
use anyhow::{anyhow, Result};
65
use serde::{Deserialize, Serialize};
76
use std::{path::Path, path::PathBuf, sync::Arc};
87
use tokio::sync::broadcast::{self, Receiver};
8+
use url::Url;
99
use veilid_core::{
1010
CryptoKey, CryptoSystem, CryptoSystemVLD0, CryptoTyped, DHTRecordDescriptor, KeyPair, Nonce,
1111
ProtectedStore, RouteId, RoutingContext, Sequencing, SharedSecret, Stability, UpdateCallback,
1212
VeilidAPI, VeilidConfigInner, VeilidUpdate, CRYPTO_KIND_VLD0, VALID_CRYPTO_KINDS,
1313
};
1414

15+
use crate::constants::ROUTE_ID_DHT_KEY;
16+
1517
pub async fn make_route(veilid: &VeilidAPI) -> Result<(RouteId, Vec<u8>)> {
1618
let mut retries = 6;
1719
while retries > 0 {
1820
retries -= 1;
1921
let result = veilid
2022
.new_custom_private_route(
2123
&VALID_CRYPTO_KINDS,
22-
Stability::Reliable,
23-
Sequencing::EnsureOrdered,
24+
Stability::LowLatency,
25+
Sequencing::NoPreference,
2426
)
2527
.await;
2628

@@ -73,7 +75,7 @@ pub async fn init_veilid(
7375
}
7476

7577
pub fn config_for_dir(base_dir: PathBuf, namespace: String) -> VeilidConfigInner {
76-
return VeilidConfigInner {
78+
VeilidConfigInner {
7779
program_name: "save-dweb-backend".to_string(),
7880
namespace,
7981
protected_store: veilid_core::VeilidConfigProtectedStore {
@@ -94,10 +96,10 @@ pub fn config_for_dir(base_dir: PathBuf, namespace: String) -> VeilidConfigInner
9496
..Default::default()
9597
},
9698
..Default::default()
97-
};
99+
}
98100
}
99101

100-
#[derive(Serialize, Deserialize)]
102+
#[derive(Serialize, Deserialize, Clone)]
101103
pub struct CommonKeypair {
102104
pub id: CryptoKey,
103105
pub public_key: CryptoKey,

src/group.rs

Lines changed: 71 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,17 @@ use rand::thread_rng;
1111
use serde::{Deserialize, Serialize};
1212
use std::any::Any;
1313
use std::collections::HashMap;
14+
use std::future::Future;
15+
use std::time::{SystemTime, UNIX_EPOCH};
16+
1417
use std::path::PathBuf;
1518
use std::result;
1619
use std::sync::Arc;
1720
use tokio::sync::{mpsc, Mutex};
1821
use url::Url;
1922
use veilid_core::{
2023
CryptoKey, CryptoSystemVLD0, CryptoTyped, DHTRecordDescriptor, DHTReportScope, DHTSchema,
21-
KeyPair, ProtectedStore, RoutingContext, SharedSecret, TypedKey, ValueSubkeyRangeSet,
24+
KeyPair, ProtectedStore, RoutingContext, SharedSecret, TypedKey, ValueSubkeyRangeSet, VeilidUpdate,
2225
VeilidAPI, CRYPTO_KEY_LENGTH, CRYPTO_KIND_VLD0,
2326
};
2427
use veilid_iroh_blobs::iroh::VeilidIrohBlobs;
@@ -80,8 +83,7 @@ impl Group {
8083
.lock()
8184
.await
8285
.get(id)
83-
.ok_or_else(|| anyhow!("Repo not loaded"))
84-
.map(|repo| repo.clone())
86+
.ok_or_else(|| anyhow!("Repo not loaded")).cloned()
8587
}
8688

8789
pub async fn has_repo(&self, id: &CryptoKey) -> bool {
@@ -122,7 +124,7 @@ impl Group {
122124
let mut repos = self.list_peer_repos().await;
123125
repos.shuffle(&mut rng);
124126

125-
if repos.len() == 0 {
127+
if repos.is_empty() {
126128
return Err(anyhow!("Cannot download hash. No other peers found"));
127129
}
128130

@@ -310,6 +312,15 @@ impl Group {
310312

311313
let mut repo_id_buffer: [u8; CRYPTO_KEY_LENGTH] = [0; CRYPTO_KEY_LENGTH];
312314

315+
// Validate the length before copying
316+
if repo_id_raw.data().len() != repo_id_buffer.len() {
317+
return Err(anyhow!(
318+
"Slice length mismatch: expected {}, got {}",
319+
repo_id_buffer.len(),
320+
repo_id_raw.data().len()
321+
));
322+
}
323+
313324
repo_id_buffer.copy_from_slice(repo_id_raw.data());
314325

315326
let repo_id = TypedKey::new(CRYPTO_KIND_VLD0, CryptoKey::from(repo_id_buffer));
@@ -338,9 +349,9 @@ impl Group {
338349
pub async fn try_load_repo_from_disk(&mut self) -> bool {
339350
if let Err(err) = self.load_repo_from_disk().await {
340351
eprintln!("Unable to load own repo from disk {}", err);
341-
return false;
352+
false
342353
} else {
343-
return true;
354+
true
344355
}
345356
}
346357

@@ -429,7 +440,7 @@ impl Group {
429440
let keypair = CommonKeypair {
430441
id: repo.id(),
431442
public_key: repo_dht_record.owner().clone(),
432-
secret_key: repo_dht_record.owner_secret().map(|key| *key),
443+
secret_key: repo_dht_record.owner_secret().copied(),
433444
encryption_key: encryption_key.clone(),
434445
};
435446

@@ -463,6 +474,59 @@ impl Group {
463474
.await
464475
.map_err(|_| anyhow!("Failed to load keypair for repo_id: {:?}", repo_id))
465476
}
477+
478+
pub async fn watch_changes<F, Fut>(&self, on_change: F) -> Result<()>
479+
where
480+
F: Fn() -> Fut + Send + Sync + 'static,
481+
Fut: Future<Output = Result<()>> + Send + 'static,
482+
{
483+
let repo_count = self.dht_repo_count().await?;
484+
let range = if repo_count > 0 {
485+
ValueSubkeyRangeSet::single_range(0, repo_count as u32 - 1)
486+
} else {
487+
ValueSubkeyRangeSet::full()
488+
};
489+
490+
let expiration_duration = 600_000_000;
491+
let expiration = SystemTime::now()
492+
.duration_since(UNIX_EPOCH)?
493+
.as_micros() as u64 + expiration_duration;
494+
let count = 0;
495+
496+
// Clone necessary data for the async block
497+
let routing_context = self.routing_context.clone();
498+
let dht_record_key = self.dht_record.key().clone();
499+
500+
// Spawn a task that uses only owned data
501+
tokio::spawn(async move {
502+
match routing_context
503+
.watch_dht_values(dht_record_key.clone(), range.clone(), expiration.into(), count)
504+
.await
505+
{
506+
Ok(_) => {
507+
println!("DHT watch successfully set on record key {:?}", dht_record_key);
508+
509+
loop {
510+
if let Ok(change) = routing_context
511+
.watch_dht_values(dht_record_key.clone(), range.clone(), expiration.into(), count)
512+
.await
513+
{
514+
if change > 0.into() {
515+
if let Err(e) = on_change().await {
516+
eprintln!("Failed to re-download files: {:?}", e);
517+
}
518+
}
519+
}
520+
}
521+
}
522+
Err(e) => eprintln!("Failed to set DHT watch: {:?}", e),
523+
}
524+
});
525+
526+
Ok(())
527+
}
528+
529+
466530
}
467531

468532
impl DHTEntity for Group {

0 commit comments

Comments
 (0)