Skip to content

Commit 2ea964b

Browse files
akacaseburmecia
andauthored
fix(wasm): clean up download, byte handling and hashing (#379)
* fix(wasm): clean up download, byte handling and hashing --------- Co-authored-by: Bo Lu <[email protected]>
1 parent 1a3cbd3 commit 2ea964b

File tree

4 files changed

+130
-52
lines changed

4 files changed

+130
-52
lines changed

Cargo.lock

+4-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

wrappers/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ wasm_fdw = [
139139
"serde",
140140
"serde_json",
141141
"jwt-simple",
142+
"bytes",
142143
]
143144
# Does not include helloworld_fdw because of its general uselessness
144145
native_fdws = [
@@ -241,7 +242,7 @@ jwt-simple = { version = "0.12.9", default-features = false, features = [
241242
dirs = { version = "5.0.1", optional = true }
242243
sha2 = { version = "0.10.8", optional = true }
243244
hex = { version = "0.4.3", optional = true }
244-
245+
bytes = { version = "1.9.0", optional = true }
245246
thiserror = { version = "1.0.48", optional = true }
246247
anyhow = { version = "1.0.81", optional = true }
247248

wrappers/src/fdw/wasm_fdw/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ This is Wasm foreign data wrapper host, please visit each Wasm foreign data wrap
66

77
| Version | Date | Notes |
88
| ------- | ---------- | ---------------------------------------------------- |
9+
| 0.1.4 | 2024-12-09 | Improve remote wasm downloading and caching |
910
| 0.1.3 | 2024-09-30 | Support for pgrx 0.12.6 |
1011
| 0.1.2 | 2024-07-07 | Add fdw_package_checksum server option |
1112
| 0.1.1 | 2024-07-05 | Fix missing wasm package cache dir issue |

wrappers/src/fdw/wasm_fdw/wasm_fdw.rs

+123-48
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
use bytes::Bytes;
12
use pgrx::pg_sys;
23
use semver::{Version, VersionReq};
34
use sha2::{Digest, Sha256};
45
use std::collections::HashMap;
56
use std::fs;
7+
use std::path::{Path, PathBuf};
68
use warg_client as warg;
79
use wasmtime::component::*;
810
use wasmtime::{Config, Engine, Store};
@@ -41,8 +43,6 @@ fn load_component_from_file(
4143
Component::from_file(engine, file_path).map_err(|_| WasmFdwError::InvalidWasmComponent)
4244
}
4345

44-
// Download wasm component package from warg registry or custom url.
45-
// The url protoal can be 'file://', 'warg(s)://' or 'http(s)://'.
4646
fn download_component(
4747
rt: &Runtime,
4848
engine: &Engine,
@@ -51,78 +51,153 @@ fn download_component(
5151
version: &str,
5252
checksum: Option<&str>,
5353
) -> WasmFdwResult<Component> {
54+
// handle local file paths
5455
if let Some(file_path) = url.strip_prefix("file://") {
5556
return load_component_from_file(engine, file_path);
5657
}
5758

59+
// handle warg registry URLs
5860
if url.starts_with("warg://") || url.starts_with("wargs://") {
59-
let url = url
60-
.replacen("warg://", "http://", 1)
61-
.replacen("wargs://", "https://", 1);
62-
63-
// download from warg registry
64-
let config = warg::Config {
65-
disable_interactive: true,
66-
..Default::default()
67-
};
68-
let client = rt.block_on(warg::FileSystemClient::new_with_config(
69-
Some(&url),
70-
&config,
71-
None,
72-
))?;
73-
74-
let pkg_name = warg_protocol::registry::PackageName::new(name)?;
75-
let ver = semver::VersionReq::parse(version)?;
76-
let pkg = rt
77-
.block_on(client.download(&pkg_name, &ver))?
78-
.ok_or(format!("{}@{} not found on {}", name, version, url))?;
79-
80-
return load_component_from_file(engine, pkg.path);
61+
return download_from_warg(rt, engine, url, name, version);
8162
}
8263

83-
// otherwise, download from custom url if it is not in local cache
64+
// handle direct URLs with caching
65+
download_from_url(rt, engine, url, name, version, checksum)
66+
}
67+
68+
fn download_from_warg(
69+
rt: &Runtime,
70+
engine: &Engine,
71+
url: &str,
72+
name: &str,
73+
version: &str,
74+
) -> WasmFdwResult<Component> {
75+
let url = url
76+
.replacen("warg://", "http://", 1)
77+
.replacen("wargs://", "https://", 1);
78+
79+
let config = warg::Config {
80+
disable_interactive: true,
81+
..Default::default()
82+
};
83+
84+
let client = rt.block_on(warg::FileSystemClient::new_with_config(
85+
Some(&url),
86+
&config,
87+
None,
88+
))?;
89+
90+
let pkg_name = warg_protocol::registry::PackageName::new(name)
91+
.map_err(|e| format!("invalid package name '{}': {}", name, e))?;
92+
93+
let ver = semver::VersionReq::parse(version)
94+
.map_err(|e| format!("invalid version requirement '{}': {}", version, e))?;
95+
96+
let pkg = rt
97+
.block_on(client.download(&pkg_name, &ver))?
98+
.ok_or_else(|| format!("{}@{} not found on {}", name, version, url))?;
99+
100+
load_component_from_file(engine, pkg.path)
101+
}
102+
103+
fn download_from_url(
104+
rt: &Runtime,
105+
engine: &Engine,
106+
url: &str,
107+
name: &str,
108+
version: &str,
109+
checksum: Option<&str>,
110+
) -> WasmFdwResult<Component> {
111+
// validate URL
112+
let url = url
113+
.parse::<reqwest::Url>()
114+
.map_err(|e| format!("invalid URL '{}': {}", url, e))?;
115+
116+
// calculate cache path
117+
let cache_path = get_cache_path(url.as_str(), name, version)?;
118+
119+
// return cached component if it exists and is valid
120+
if cache_path.exists() {
121+
if let Ok(component) = load_component_from_file(engine, &cache_path) {
122+
return Ok(component);
123+
}
124+
// if loading fails, remove invalid cache file
125+
let _ = fs::remove_file(&cache_path);
126+
}
127+
128+
// ensure checksum is provided for remote downloads
129+
let checksum = checksum
130+
.ok_or_else(|| "package checksum must be specified for remote downloads".to_string())?;
131+
132+
// download and verify component
133+
let bytes = download_and_verify(rt, url, checksum)?;
134+
135+
// save to cache
136+
save_to_cache(&cache_path, &bytes)?;
137+
138+
// load component
139+
load_component_from_file(engine, &cache_path).inspect_err(|_| {
140+
let _ = fs::remove_file(&cache_path);
141+
})
142+
}
84143

85-
// calculate file name hash and make up cache path
144+
fn get_cache_path(url: &str, name: &str, version: &str) -> WasmFdwResult<PathBuf> {
86145
let hash = Sha256::digest(format!(
87146
"{}:{}:{}@{}",
88147
unsafe { pg_sys::GetUserId().as_u32() },
89148
url,
90149
name,
91150
version
92151
));
152+
93153
let file_name = hex::encode(hash);
94-
let mut path = dirs::cache_dir().expect("no cache dir found");
154+
let mut path = dirs::cache_dir().ok_or_else(|| "no cache directory found".to_string())?;
155+
95156
path.push(file_name);
96157
path.set_extension("wasm");
97158

98-
if !path.exists() {
99-
// package checksum must be specified
100-
let option_checksum = checksum.ok_or("package checksum option not specified".to_owned())?;
159+
Ok(path)
160+
}
101161

102-
// download component wasm from remote and check its checksum
103-
let resp = rt.block_on(reqwest::get(url))?;
104-
let bytes = rt.block_on(resp.bytes())?;
105-
let bytes_checksum = hex::encode(Sha256::digest(&bytes));
106-
if bytes_checksum != option_checksum {
107-
return Err("package checksum not match".to_string().into());
108-
}
162+
fn download_and_verify(
163+
rt: &Runtime,
164+
url: reqwest::Url,
165+
expected_checksum: &str,
166+
) -> WasmFdwResult<Bytes> {
167+
let resp = rt
168+
.block_on(reqwest::get(url.clone()))
169+
.map_err(|_| "failed to download component".to_string())?;
170+
171+
if !resp.status().is_success() {
172+
return Err("component download failed - server error"
173+
.to_string()
174+
.into());
175+
}
109176

110-
// save the component wasm to local cache
111-
if let Some(parent) = path.parent() {
112-
// create all parent directories if they do not exist
113-
fs::create_dir_all(parent)?;
114-
}
115-
fs::write(&path, bytes)?;
177+
let bytes = rt
178+
.block_on(resp.bytes())
179+
.map_err(|_| "failed to read component data".to_string())?;
180+
181+
let actual_checksum = hex::encode(Sha256::digest(&bytes));
182+
if actual_checksum != expected_checksum {
183+
return Err("component verification failed".to_string().into());
116184
}
117185

118-
load_component_from_file(engine, &path).inspect_err(|_| {
119-
// remove the cache file if it cannot be loaded as component
120-
let _ = fs::remove_file(&path);
121-
})
186+
Ok(bytes)
187+
}
188+
189+
fn save_to_cache(path: &Path, bytes: &[u8]) -> WasmFdwResult<()> {
190+
if let Some(parent) = path.parent() {
191+
fs::create_dir_all(parent).map_err(|_| "cache access error".to_string())?;
192+
}
193+
194+
fs::write(path, bytes).map_err(|_| "cache write error".to_string())?;
195+
196+
Ok(())
122197
}
123198

124199
#[wrappers_fdw(
125-
version = "0.1.3",
200+
version = "0.1.4",
126201
author = "Supabase",
127202
website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/wasm_fdw",
128203
error_type = "WasmFdwError"

0 commit comments

Comments
 (0)