Skip to content

Commit 09dab63

Browse files
committed
addressed all the comments
Signed-off-by: Jiaxiao Zhou <duibao55328@gmail.com>
1 parent 181c721 commit 09dab63

File tree

5 files changed

+180
-187
lines changed

5 files changed

+180
-187
lines changed

crates/wassette/src/component_storage.rs

Lines changed: 122 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@
44
//! Filesystem helpers that manage component artifacts, metadata, and cache
55
//! layout for the lifecycle manager.
66
7+
use std::io::{BufReader, Read};
78
use std::path::{Path, PathBuf};
89
use std::sync::Arc;
910

1011
use anyhow::{anyhow, Context, Result};
11-
use sha2::Digest;
12-
use tokio::sync::Semaphore;
12+
use sha2::{Digest, Sha256};
13+
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
14+
use tokio::task::spawn_blocking;
1315

16+
use crate::loader::DownloadedResource;
1417
use crate::{ComponentMetadata, ValidationStamp};
1518

1619
/// Handles filesystem layout and metadata persistence for components.
@@ -23,34 +26,28 @@ pub struct ComponentStorage {
2326

2427
impl ComponentStorage {
2528
/// Create a new storage manager rooted at the plugin directory.
26-
pub fn new(root: PathBuf, max_concurrent_downloads: usize) -> Self {
29+
pub async fn new(root: impl Into<PathBuf>, max_concurrent_downloads: usize) -> Result<Self> {
30+
let root = root.into();
2731
let downloads_dir = root.join(crate::DOWNLOADS_DIR);
28-
Self {
29-
root,
30-
downloads_dir,
31-
downloads_semaphore: Arc::new(Semaphore::new(max_concurrent_downloads.max(1))),
32-
}
33-
}
3432

35-
/// Ensure the directory structure exists on disk.
36-
pub async fn ensure_layout(&self) -> Result<()> {
37-
tokio::fs::create_dir_all(&self.root)
33+
tokio::fs::create_dir_all(&root)
3834
.await
39-
.with_context(|| {
40-
format!(
41-
"Failed to create plugin directory at {}",
42-
self.root.display()
43-
)
44-
})?;
45-
tokio::fs::create_dir_all(&self.downloads_dir)
35+
.with_context(|| format!("Failed to create plugin directory at {}", root.display()))?;
36+
37+
tokio::fs::create_dir_all(&downloads_dir)
4638
.await
4739
.with_context(|| {
4840
format!(
4941
"Failed to create downloads directory at {}",
50-
self.downloads_dir.display()
42+
downloads_dir.display()
5143
)
5244
})?;
53-
Ok(())
45+
46+
Ok(Self {
47+
root,
48+
downloads_dir,
49+
downloads_semaphore: Arc::new(Semaphore::new(max_concurrent_downloads.max(1))),
50+
})
5451
}
5552

5653
/// Root plugin directory containing components.
@@ -64,15 +61,6 @@ impl ComponentStorage {
6461
&self.downloads_dir
6562
}
6663

67-
/// Acquire a permit for filesystem-bound downloads.
68-
pub async fn acquire_download_permit(&self) -> tokio::sync::OwnedSemaphorePermit {
69-
self.downloads_semaphore
70-
.clone()
71-
.acquire_owned()
72-
.await
73-
.expect("Semaphore closed")
74-
}
75-
7664
/// Absolute path to the component `.wasm` file.
7765
pub fn component_path(&self, component_id: &str) -> PathBuf {
7866
self.root.join(format!("{component_id}.wasm"))
@@ -100,6 +88,49 @@ impl ComponentStorage {
10088
self.root.join(format!("{component_id}.policy.meta.json"))
10189
}
10290

91+
/// Stage a downloaded component artifact into storage, replacing any existing files.
92+
pub async fn install_component_artifact(
93+
&self,
94+
component_id: &str,
95+
resource: DownloadedResource,
96+
) -> Result<PathBuf> {
97+
let _permit = self.acquire_download_permit().await;
98+
99+
self.remove_component_artifacts(component_id).await?;
100+
101+
resource.copy_to(self.root()).await.with_context(|| {
102+
format!(
103+
"Failed to copy component to destination: {}",
104+
self.root.display()
105+
)
106+
})?;
107+
108+
Ok(self.component_path(component_id))
109+
}
110+
111+
/// Remove persisted component artifacts (wasm, metadata, cache) if they exist.
112+
pub async fn remove_component_artifacts(&self, component_id: &str) -> Result<()> {
113+
self.remove_if_exists(
114+
&self.component_path(component_id),
115+
"component file",
116+
component_id,
117+
)
118+
.await?;
119+
self.remove_if_exists(
120+
&self.metadata_path(component_id),
121+
"component metadata file",
122+
component_id,
123+
)
124+
.await?;
125+
self.remove_if_exists(
126+
&self.precompiled_path(component_id),
127+
"precompiled component file",
128+
component_id,
129+
)
130+
.await?;
131+
Ok(())
132+
}
133+
103134
/// Persist component metadata to disk.
104135
pub async fn write_metadata(&self, metadata: &ComponentMetadata) -> Result<()> {
105136
let path = self.metadata_path(&metadata.component_id);
@@ -117,13 +148,17 @@ impl ComponentStorage {
117148
return Ok(None);
118149
}
119150

120-
let contents = tokio::fs::read_to_string(&path).await.with_context(|| {
121-
format!("Failed to read component metadata from {}", path.display())
122-
})?;
151+
let file = tokio::fs::File::open(&path)
152+
.await
153+
.with_context(|| format!("Failed to open component metadata at {}", path.display()))?;
123154

124-
let metadata = serde_json::from_str(&contents).with_context(|| {
125-
format!("Failed to parse component metadata from {}", path.display())
126-
})?;
155+
let file = file.into_std().await;
156+
157+
let metadata = spawn_blocking(move || {
158+
let reader = BufReader::new(file);
159+
serde_json::from_reader(reader).context("Failed to deserialize component metadata")
160+
})
161+
.await??;
127162
Ok(Some(metadata))
128163
}
129164

@@ -170,6 +205,7 @@ impl ComponentStorage {
170205
/// recorded in addition to size and modification time so changes can be
171206
/// detected even when timestamps are unreliable.
172207
pub async fn create_validation_stamp(
208+
&self,
173209
path: &Path,
174210
include_hash: bool,
175211
) -> Result<ValidationStamp> {
@@ -188,12 +224,7 @@ impl ComponentStorage {
188224
.as_secs();
189225

190226
let content_hash = if include_hash {
191-
let bytes = tokio::fs::read(path)
192-
.await
193-
.with_context(|| format!("Failed to read {} for hashing", path.display()))?;
194-
let mut hasher = sha2::Sha256::new();
195-
hasher.update(&bytes);
196-
Some(format!("{:x}", hasher.finalize()))
227+
Some(compute_file_hash(path).await?)
197228
} else {
198229
None
199230
};
@@ -207,42 +238,76 @@ impl ComponentStorage {
207238

208239
/// Check if the validation stamp matches the current file on disk.
209240
pub async fn validate_stamp(path: &Path, stamp: &ValidationStamp) -> bool {
210-
let Ok(metadata) = tokio::fs::metadata(path).await else {
211-
return false;
241+
let metadata = match tokio::fs::metadata(path).await {
242+
Ok(metadata) => metadata,
243+
Err(_) => return false,
212244
};
213245

214246
if metadata.len() != stamp.file_size {
215247
return false;
216248
}
217249

218-
let Ok(mtime) = metadata
250+
if let Some(expected_hash) = &stamp.content_hash {
251+
match compute_file_hash(path).await {
252+
Ok(actual_hash) => return actual_hash == *expected_hash,
253+
Err(_) => return false,
254+
}
255+
}
256+
257+
let mtime = match metadata
219258
.modified()
220259
.map_err(|_| std::io::Error::from(std::io::ErrorKind::Other))
221260
.and_then(|t| {
222261
t.duration_since(std::time::UNIX_EPOCH)
223262
.map_err(|_| std::io::Error::from(std::io::ErrorKind::Other))
224263
})
225264
.map(|d| d.as_secs())
226-
else {
227-
return false;
265+
{
266+
Ok(mtime) => mtime,
267+
Err(_) => return false,
228268
};
229269

230270
if mtime != stamp.mtime {
231271
return false;
232272
}
233273

234-
if let Some(expected_hash) = &stamp.content_hash {
235-
let Ok(content) = tokio::fs::read(path).await else {
236-
return false;
237-
};
238-
let mut hasher = sha2::Sha256::new();
239-
hasher.update(&content);
240-
let actual_hash = format!("{:x}", hasher.finalize());
241-
if &actual_hash != expected_hash {
242-
return false;
274+
true
275+
}
276+
}
277+
278+
async fn compute_file_hash(path: &Path) -> Result<String> {
279+
let file = tokio::fs::File::open(path)
280+
.await
281+
.with_context(|| format!("Failed to open {} for hashing", path.display()))?;
282+
283+
let file = file.into_std().await;
284+
285+
let path = path.to_path_buf();
286+
spawn_blocking(move || -> Result<String> {
287+
let mut reader = BufReader::new(file);
288+
let mut hasher = Sha256::new();
289+
let mut buffer = [0u8; 16 * 1024];
290+
291+
loop {
292+
let read = reader.read(&mut buffer)?;
293+
if read == 0 {
294+
break;
243295
}
296+
hasher.update(&buffer[..read]);
244297
}
245298

246-
true
299+
Ok(format!("{:x}", hasher.finalize()))
300+
})
301+
.await?
302+
.with_context(|| format!("Failed to hash file {}", path.display()))
303+
}
304+
305+
impl ComponentStorage {
306+
async fn acquire_download_permit(&self) -> OwnedSemaphorePermit {
307+
self.downloads_semaphore
308+
.clone()
309+
.acquire_owned()
310+
.await
311+
.expect("Semaphore closed")
247312
}
248313
}

crates/wassette/src/config.rs

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ impl LifecycleConfig {
5656
self.eager_load
5757
}
5858

59-
fn into_parts(
59+
pub(crate) fn into_parts(
6060
self,
6161
) -> (
6262
PathBuf,
@@ -213,27 +213,3 @@ fn default_oci_client() -> Result<oci_client::Client> {
213213
..Default::default()
214214
}))
215215
}
216-
217-
impl LifecycleConfig {
218-
pub(crate) fn destructure(self) -> LifecycleConfigParts {
219-
let (plugin_dir, secrets_dir, environment_vars, http_client, oci_client, _eager_load) =
220-
self.into_parts();
221-
LifecycleConfigParts {
222-
plugin_dir,
223-
secrets_dir,
224-
environment_vars,
225-
http_client,
226-
oci_client,
227-
}
228-
}
229-
}
230-
231-
/// Internal helper that exposes the decomposed configuration values to the
232-
/// lifecycle manager for initialization.
233-
pub(crate) struct LifecycleConfigParts {
234-
pub plugin_dir: PathBuf,
235-
pub secrets_dir: PathBuf,
236-
pub environment_vars: HashMap<String, String>,
237-
pub http_client: reqwest::Client,
238-
pub oci_client: oci_client::Client,
239-
}

0 commit comments

Comments
 (0)