Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/pm-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,12 @@ jobs:
- name: Disable Windows Defender
if: runner.os == 'Windows'
shell: powershell
run: Set-MpPreference -DisableRealtimeMonitoring $true
run: |
try {
Set-MpPreference -DisableRealtimeMonitoring $true -ErrorAction Stop
} catch {
Write-Warning "Unable to disable Windows Defender real-time monitoring: $_"
}
# Add: Configure Git longpaths on Windows
- name: Configure Git (Windows)
Expand Down
18 changes: 16 additions & 2 deletions .github/workflows/pm-e2e-bench.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ jobs:
targets: x86_64-unknown-linux-gnu
- name: Cache cargo
uses: Swatinem/rust-cache@v2
continue-on-error: true
with:
shared-key: pm-build-linux
cache-bin: false
Expand Down Expand Up @@ -200,6 +201,7 @@ jobs:
targets: aarch64-apple-darwin
- name: Cache cargo
uses: Swatinem/rust-cache@v2
continue-on-error: true
with:
shared-key: pm-build-mac-arm64
cache-bin: false
Expand Down Expand Up @@ -232,6 +234,7 @@ jobs:
run: rustup target add x86_64-apple-darwin
- name: Cache cargo
uses: Swatinem/rust-cache@v2
continue-on-error: true
with:
shared-key: pm-build-mac-x64
cache-bin: false
Expand All @@ -254,7 +257,12 @@ jobs:
- uses: actions/checkout@v4
- name: Disable Windows Defender
shell: powershell
run: Set-MpPreference -DisableRealtimeMonitoring $true
run: |
try {
Set-MpPreference -DisableRealtimeMonitoring $true -ErrorAction Stop
} catch {
Write-Warning "Unable to disable Windows Defender real-time monitoring: $_"
}
- name: Init git submodules
run: git submodule update --init --recursive --depth 1
- name: Setup Rust toolchain
Expand All @@ -264,6 +272,7 @@ jobs:
targets: x86_64-pc-windows-msvc
- name: Cache cargo
uses: Swatinem/rust-cache@v2
continue-on-error: true
with:
shared-key: pm-build-windows
cache-bin: false
Expand Down Expand Up @@ -408,7 +417,12 @@ jobs:
- uses: actions/checkout@v4
- name: Disable Windows Defender
shell: powershell
run: Set-MpPreference -DisableRealtimeMonitoring $true
run: |
try {
Set-MpPreference -DisableRealtimeMonitoring $true -ErrorAction Stop
} catch {
Write-Warning "Unable to disable Windows Defender real-time monitoring: $_"
}
- name: Setup node
uses: actions/setup-node@v4
with:
Expand Down
41 changes: 37 additions & 4 deletions crates/pm/src/cmd/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ use crate::util::user_config::get_registry;

/// View package information from registry, similar to npm view
pub async fn view(package_spec: &str) -> Result<()> {
let registry_url = get_registry();
view_with_registry(package_spec, &registry_url).await
}

async fn view_with_registry(package_spec: &str, registry_url: &str) -> Result<()> {
tracing::debug!("Viewing package: {package_spec}");

// Parse package specification
Expand All @@ -18,9 +23,8 @@ pub async fn view(package_spec: &str) -> Result<()> {
tracing::debug!("Resolved package: {name} (spec: {version_spec})");

// Fetch full manifest directly from registry (Complete format for display, no ETag)
let registry_url = get_registry();
let (full_manifest, _etag) =
fetch_full_manifest_fresh(&registry_url, name, MetadataFormat::Complete)
fetch_full_manifest_fresh(registry_url, name, MetadataFormat::Complete)
.await
.map_err(|e| anyhow!("Failed to fetch package info for {}: {}", package_spec, e))?;

Expand Down Expand Up @@ -356,12 +360,41 @@ mod tests {
/// because the registry service used ETag caching.
#[tokio::test]
async fn test_view_twice_no_304_error() {
use mockito::Matcher;

let mut server = mockito::Server::new_async().await;
let manifest = r#"{
"name": "is-odd",
"description": "mock package",
"dist-tags": { "latest": "1.0.0" },
"versions": {
"1.0.0": {
"name": "is-odd",
"version": "1.0.0",
"description": "mock package",
"dist": {}
}
}
}"#;
let mock = server
.mock("GET", "/is-odd")
.match_header("accept", "application/json")
.match_header("if-none-match", Matcher::Missing)
.with_status(200)
.with_header("content-type", "application/json")
.with_header("etag", "\"mock-etag\"")
.with_body(manifest)
.expect(2)
.create_async()
.await;

// First view - should succeed
let result1 = view("is-odd").await;
let result1 = view_with_registry("is-odd", &server.url()).await;
assert!(result1.is_ok(), "First view failed: {:?}", result1.err());

// Second view - should also succeed (not fail with 304 error)
let result2 = view("is-odd").await;
let result2 = view_with_registry("is-odd", &server.url()).await;
assert!(result2.is_ok(), "Second view failed: {:?}", result2.err());
mock.assert_async().await;
}
}
5 changes: 3 additions & 2 deletions crates/pm/src/helper/ruborist_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use crate::util::logger::ProgressReceiver;
use crate::util::manifest_store::DiskManifestStore;
use crate::util::project_cache;
use crate::util::user_config::{
get_catalogs, get_manifests_concurrency_limit, get_peer_deps, get_registry, get_supports_semver,
get_catalogs, get_peer_deps, get_registry, get_resolver_manifests_concurrency_limit,
get_supports_semver,
};

/// Tokio-based glob implementation.
Expand Down Expand Up @@ -57,7 +58,7 @@ impl Context {
cache_dir: Some(get_cache_dir()),
manifest_store: Self::manifest_store(),
warm_project_cache,
concurrency: get_manifests_concurrency_limit().await,
concurrency: get_resolver_manifests_concurrency_limit().await,
peer_deps: get_peer_deps().await,
glob: TokioGlob,
receiver,
Expand Down
2 changes: 1 addition & 1 deletion crates/pm/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ struct Cli {
#[arg(long, global = true, action = clap::ArgAction::SetTrue)]
legacy_peer_deps: Option<bool>,

/// Maximum concurrent manifest fetches (default: 64)
/// Maximum concurrent manifest fetches (default: 64; npmjs/non-semver resolver: 256)
#[arg(long, global = true)]
manifests_concurrency_limit: Option<usize>,

Expand Down
2 changes: 1 addition & 1 deletion crates/pm/src/service/pipeline/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ mod tests {
}));

// Should not forward other events
receiver.on_event(BuildEvent::PreloadStart { count: 10 });
receiver.on_event(BuildEvent::LevelStart { node_count: 10 });

// Only one message should be in the download channel
assert!(channels.download_rx.try_recv().is_ok());
Expand Down
60 changes: 0 additions & 60 deletions crates/pm/src/util/json.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::fs::File;
use std::io::{self, BufWriter, Write};
use std::path::Path;

use anyhow::{Context, Result};
use serde::Serialize;
use serde::de::DeserializeOwned;

/// Read and parse a JSON file into the specified type.
Expand All @@ -15,34 +12,6 @@ pub async fn read_json_file<T: DeserializeOwned>(path: &Path) -> Result<T> {
serde_json::from_slice(&bytes).with_context(|| format!("Failed to parse JSON from {path:?}"))
}

/// Serialize `value` as compact JSON and stream it to `path` through a
/// [`BufWriter`], skipping the intermediate `Vec<u8>` that
/// `serde_json::to_vec` + `std::fs::write` would allocate.
///
/// Synchronous on purpose: the caller in [`crate::util::manifest_store`] runs
/// it on a dedicated OS thread so manifest persistence never touches the
/// async runtime's worker or blocking pool. Async callers should wrap this
/// in `tokio::task::spawn_blocking` (or write the async-aware counterpart
/// when one is needed).
///
/// The parent directory of `path` must already exist; this helper does *not*
/// `mkdir -p`. The cost-benefit of "try the write first, recover on
/// `NotFound`" is policy-level (warm-cache rewrites want to skip the extra
/// syscall every time), so the recovery loop lives at the call site, not
/// here. A missing parent surfaces as [`io::ErrorKind::NotFound`] for the
/// caller to match on.
///
/// Serialization failures — rare for `derive(Serialize)` types, possible for
/// hand-written impls or maps with non-string keys — are folded into
/// [`io::Error`] via [`io::Error::other`] so the whole API speaks one error
/// type and callers can keep matching on [`io::ErrorKind`].
pub fn write_compact_sync<T: Serialize>(path: &Path, value: &T) -> io::Result<()> {
let file = File::create(path)?;
let mut writer = BufWriter::new(file);
serde_json::to_writer(&mut writer, value).map_err(io::Error::other)?;
writer.flush()
}

/// Load package.json from a directory path and deserialize into the caller's
/// chosen view type `T`. Use a full `PackageJson` for root projects, or a
/// minimal view (e.g. `ScriptsView`) for node_modules to avoid parsing
Expand Down Expand Up @@ -166,35 +135,6 @@ mod tests {
assert_eq!(view.scripts.get("test").unwrap(), "node build/test.js");
}

#[tokio::test]
async fn write_compact_sync_round_trips_through_read_json_file() {
let dir = tempdir().unwrap();
let path = dir.path().join("out.json");
let value = json!({
"name": "test",
"version": "1.0.0",
"deps": ["a", "b", "c"],
});

super::write_compact_sync(&path, &value).unwrap();

let read_back: Value = read_json_file(&path).await.unwrap();
assert_eq!(read_back, value);

// Compact form: no inter-token whitespace.
let raw = std::fs::read_to_string(&path).unwrap();
assert!(!raw.contains(": "));
assert!(!raw.contains(", "));
}

#[test]
fn write_compact_sync_requires_existing_parent_directory() {
let dir = tempdir().unwrap();
let path = dir.path().join("missing").join("out.json");
let err = super::write_compact_sync(&path, &json!({})).unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::NotFound);
}

#[tokio::test]
async fn test_error_handling() {
let non_existent_path = Path::new("non_existent.json");
Expand Down
15 changes: 0 additions & 15 deletions crates/pm/src/util/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,21 +219,6 @@ impl utoo_ruborist::progress::EventReceiver for ProgressReceiver {
}
use utoo_ruborist::progress::BuildEvent;
match event {
BuildEvent::PreloadStart { count } | BuildEvent::PreloadQueued { count } => {
PROGRESS_BAR.inc_length(count as u64);
}
BuildEvent::PreloadFetching { name } => {
log_progress(&format!("fetching {}", name));
}
BuildEvent::PreloadProgress { name, .. } => {
PROGRESS_BAR.inc(1);
log_progress(&format!("resolved {}", name));
}
BuildEvent::PreloadComplete { success, failed } => {
PROGRESS_BAR.set_position(0);
PROGRESS_BAR.set_length(0);
log_progress(&format!("preload: {} ok, {} failed", success, failed));
}
BuildEvent::DependencyCount { count } => {
PROGRESS_BAR.inc_length(count as u64);
}
Expand Down
48 changes: 20 additions & 28 deletions crates/pm/src/util/manifest_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,17 @@
//! Serialization and file writes run on a dedicated writer thread so manifest
//! persistence does not occupy async runtime workers or Tokio's blocking pool.

use std::fs;
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::mpsc::{self, SyncSender, TrySendError};
use std::sync::mpsc::{self, Sender};
use std::thread::JoinHandle;

use async_trait::async_trait;
use serde::Serialize;
use utoo_ruborist::model::manifest::CoreVersionManifest;
use utoo_ruborist::service::{ManifestStore, VersionsInfo};

use crate::util::json::{read_json_file, write_compact_sync};

/// Opportunistic writer backlog. If disk stalls beyond this, new cache writes
/// are dropped instead of letting resolver memory grow without bound.
const MANIFEST_WRITE_QUEUE_CAPACITY: usize = 1024;
use crate::util::json::read_json_file;

pub struct DiskManifestStore {
cache_dir: PathBuf,
Expand Down Expand Up @@ -112,13 +106,13 @@ enum ManifestWriteJob {
}

struct ManifestWriter {
tx: SyncSender<ManifestWriteJob>,
tx: Sender<ManifestWriteJob>,
handle: JoinHandle<()>,
}

impl ManifestWriter {
fn spawn() -> Self {
let (tx, rx) = mpsc::sync_channel(MANIFEST_WRITE_QUEUE_CAPACITY);
let (tx, rx) = mpsc::channel();
Comment on lines +109 to +115
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The change from a bounded channel to an unbounded one removes the safety mechanism that prevented memory growth when disk I/O stalled. Since the manifest store is opportunistic, it is safer to use a bounded SyncSender and drop writes when the queue is full to ensure memory usage remains bounded, especially given that the previous implementation explicitly handled this case.

Suggested change
tx: Sender<ManifestWriteJob>,
handle: JoinHandle<()>,
}
impl ManifestWriter {
fn spawn() -> Self {
let (tx, rx) = mpsc::sync_channel(MANIFEST_WRITE_QUEUE_CAPACITY);
let (tx, rx) = mpsc::channel();
tx: mpsc::SyncSender<ManifestWriteJob>,
handle: JoinHandle<()>,
}
impl ManifestWriter {
fn spawn() -> Self {
let (tx, rx) = mpsc::sync_channel(1024);

let handle = std::thread::Builder::new()
.name("utoo-manifest-store".to_string())
.spawn(move || {
Expand All @@ -138,14 +132,8 @@ impl ManifestWriter {
}

fn enqueue(&self, job: ManifestWriteJob) {
match self.tx.try_send(job) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
tracing::debug!("Manifest store writer queue full; dropping cache write");
}
Err(TrySendError::Disconnected(_)) => {
tracing::debug!("Manifest store writer stopped before accepting write");
}
if self.tx.send(job).is_err() {
tracing::debug!("Manifest store writer stopped before accepting write");
}
Comment on lines +135 to 137
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To maintain the non-blocking, "fire-and-forget" nature of the manifest store, use try_send instead of a blocking send. This ensures the resolver is never stalled by disk latency, which is critical for performance and consistent with the opportunistic nature of the cache.

        match self.tx.try_send(job) {
            Ok(()) => {}
            Err(mpsc::TrySendError::Full(_)) => {
                tracing::debug!("Manifest store writer queue full; dropping cache write");
            }
            Err(mpsc::TrySendError::Disconnected(_)) => {
                tracing::debug!("Manifest store writer stopped before accepting write");
            }
        }

}

Expand All @@ -157,23 +145,27 @@ impl ManifestWriter {
}
}

/// Apply the manifest-cache write policy on top of
/// [`crate::util::json::write_compact_sync`]: on `NotFound`, create the
/// parent directory once and retry — this is how the resolver hot path
/// avoids the up-front `mkdir` syscall on every warm-cache rewrite. All
/// errors are swallowed at the `debug` log level because the disk cache is
/// opportunistic; a dropped write only costs a future cache miss.
/// Serialize `value` and write to `path`. On `NotFound`, create the parent
/// directory and retry once — saves the mkdir syscall on every warm-cache
/// rewrite. Errors are logged at debug; disk cache is opportunistic.
fn write_json_sync<T: Serialize>(path: &Path, value: &T) {
match write_compact_sync(path, value) {
let bytes = match serde_json::to_vec(value) {
Ok(b) => b,
Err(e) => {
tracing::debug!("Failed to serialize {path:?}: {e}");
return;
}
};
match std::fs::write(path, &bytes) {
Ok(()) => {}
Err(e) if e.kind() == ErrorKind::NotFound => {
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
if let Some(parent) = path.parent()
&& let Err(e) = fs::create_dir_all(parent)
&& let Err(e) = std::fs::create_dir_all(parent)
{
tracing::debug!("Failed to create {parent:?}: {e}");
return;
}
if let Err(e) = write_compact_sync(path, value) {
if let Err(e) = std::fs::write(path, &bytes) {
tracing::debug!("Failed to write {path:?}: {e}");
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/pm/src/util/project_cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Disk persistence for ruborist's project-level manifest cache.
//!
//! Stored at `<root>/node_modules/.utoo-manifest.json`. Used to skip the
//! preload phase on warm installs.
//! Stored at `<root>/node_modules/.utoo-manifest.json`. Used to warm the
//! demand resolver's in-memory manifest cache across installs.

use std::path::{Path, PathBuf};

Expand Down
2 changes: 1 addition & 1 deletion crates/pm/src/util/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub fn build_dns_cached_client() -> reqwest::Client {
.read_timeout(std::time::Duration::from_secs(30)) // Timeout for individual read operations
// No total timeout - large files (e.g. node binary ~100MB) need longer download time
// No pool_max_idle_per_host - let reqwest manage connections freely
// Concurrency is controlled by semaphore in preload service
// Concurrency is controlled by each caller's semaphore.
.build()
.expect("Failed to build reqwest client")
}
Expand Down
Loading
Loading