diff --git a/chain/substreams/src/data_source.rs b/chain/substreams/src/data_source.rs index dff2cfa31c4..65f0cffa584 100644 --- a/chain/substreams/src/data_source.rs +++ b/chain/substreams/src/data_source.rs @@ -331,7 +331,7 @@ mod test { blockchain::{DataSource as _, UnresolvedDataSource as _}, components::link_resolver::LinkResolver, data::subgraph::LATEST_VERSION, - prelude::{async_trait, serde_yaml, JsonValueStream, Link}, + prelude::{async_trait, serde_yaml, DeploymentHash, JsonValueStream, Link}, slog::{o, Discard, Logger}, substreams::{ module::{ @@ -705,6 +705,13 @@ mod test { unimplemented!() } + fn for_deployment( + &self, + _deployment: DeploymentHash, + ) -> Result, Error> { + unimplemented!() + } + async fn cat(&self, _logger: &Logger, _link: &Link) -> Result, Error> { Ok(gen_package().encode_to_vec()) } diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 8c2b76e5b6c..332eebb513d 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -287,7 +287,12 @@ impl SubgraphInstanceManager { let manifest = UnresolvedSubgraphManifest::parse(deployment.hash.cheap_clone(), manifest)?; // Allow for infinite retries for subgraph definition files. - let link_resolver = Arc::from(self.link_resolver.with_retries()); + let link_resolver = Arc::from( + self.link_resolver + .for_deployment(deployment.hash.clone()) + .map_err(SubgraphRegistrarError::Unknown)? + .with_retries(), + ); // Make sure the `raw_yaml` is present on both this subgraph and the graft base. self.subgraph_store diff --git a/core/src/subgraph/provider.rs b/core/src/subgraph/provider.rs index 00d379db01f..d566389fe27 100644 --- a/core/src/subgraph/provider.rs +++ b/core/src/subgraph/provider.rs @@ -86,8 +86,12 @@ impl SubgraphAssignmentProviderTrait for SubgraphAss )); } - let file_bytes = self + let link_resolver = self .link_resolver + .for_deployment(loc.hash.clone()) + .map_err(SubgraphAssignmentProviderError::ResolveError)?; + + let file_bytes = link_resolver .cat(&logger, &loc.hash.to_ipfs_link()) .await .map_err(SubgraphAssignmentProviderError::ResolveError)?; diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 6f7ae17425f..c75ee542571 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -278,6 +278,7 @@ where start_block_override: Option, graft_block_override: Option, history_blocks: Option, + ignore_graft_base: bool, ) -> Result { // We don't have a location for the subgraph yet; that will be // assigned when we deploy for real. For logging purposes, make up a @@ -286,9 +287,14 @@ where .logger_factory .subgraph_logger(&DeploymentLocator::new(DeploymentId(0), hash.clone())); + let resolver: Arc = Arc::from( + self.resolver + .for_deployment(hash.clone()) + .map_err(SubgraphRegistrarError::Unknown)?, + ); + let raw: serde_yaml::Mapping = { - let file_bytes = self - .resolver + let file_bytes = resolver .cat(&logger, &hash.to_ipfs_link()) .await .map_err(|e| { @@ -323,8 +329,9 @@ where node_id, debug_fork, self.version_switching_mode, - &self.resolver, + &resolver, history_blocks, + ignore_graft_base, ) .await? } @@ -341,8 +348,9 @@ where node_id, debug_fork, self.version_switching_mode, - &self.resolver, + &resolver, history_blocks, + ignore_graft_base, ) .await? } @@ -359,8 +367,9 @@ where node_id, debug_fork, self.version_switching_mode, - &self.resolver, + &resolver, history_blocks, + ignore_graft_base, ) .await? } @@ -377,8 +386,9 @@ where node_id, debug_fork, self.version_switching_mode, - &self.resolver, + &resolver, history_blocks, + ignore_graft_base, ) .await? } @@ -565,12 +575,14 @@ async fn create_subgraph_version( version_switching_mode: SubgraphVersionSwitchingMode, resolver: &Arc, history_blocks_override: Option, + ignore_graft_base: bool, ) -> Result { let raw_string = serde_yaml::to_string(&raw).unwrap(); + let unvalidated = UnvalidatedSubgraphManifest::::resolve( deployment.clone(), raw, - resolver, + &resolver, logger, ENV_VARS.max_spec_version.clone(), ) @@ -585,10 +597,21 @@ async fn create_subgraph_version( Err(StoreError::DeploymentNotFound(_)) => true, Err(e) => return Err(SubgraphRegistrarError::StoreError(e)), }; - let manifest = unvalidated - .validate(store.cheap_clone(), should_validate) - .await - .map_err(SubgraphRegistrarError::ManifestValidationError)?; + + let manifest = { + let should_validate = should_validate && !ignore_graft_base; + + let mut manifest = unvalidated + .validate(store.cheap_clone(), should_validate) + .await + .map_err(SubgraphRegistrarError::ManifestValidationError)?; + + if ignore_graft_base { + manifest.graft = None; + } + + manifest + }; let network_name: Word = manifest.network_name().into(); diff --git a/graph/src/components/link_resolver/file.rs b/graph/src/components/link_resolver/file.rs index eb0943c5f5f..fc4eee2786c 100644 --- a/graph/src/components/link_resolver/file.rs +++ b/graph/src/components/link_resolver/file.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::time::Duration; @@ -6,22 +7,35 @@ use async_trait::async_trait; use slog::Logger; use crate::data::subgraph::Link; -use crate::prelude::{Error, JsonValueStream, LinkResolver as LinkResolverTrait}; +use crate::prelude::{DeploymentHash, Error, JsonValueStream, LinkResolver as LinkResolverTrait}; #[derive(Clone, Debug)] pub struct FileLinkResolver { base_dir: Option, timeout: Duration, + // This is a hashmap that maps the alias name to the path of the file that is aliased + aliases: HashMap, +} + +impl Default for FileLinkResolver { + fn default() -> Self { + Self { + base_dir: None, + timeout: Duration::from_secs(30), + aliases: HashMap::new(), + } + } } impl FileLinkResolver { /// Create a new FileLinkResolver /// /// All paths are treated as absolute paths. - pub fn new() -> Self { + pub fn new(base_dir: Option, aliases: HashMap) -> Self { Self { - base_dir: None, + base_dir: base_dir, timeout: Duration::from_secs(30), + aliases, } } @@ -33,12 +47,18 @@ impl FileLinkResolver { Self { base_dir: Some(base_dir.as_ref().to_owned()), timeout: Duration::from_secs(30), + aliases: HashMap::new(), } } fn resolve_path(&self, link: &str) -> PathBuf { let path = Path::new(link); + // If the path is an alias, use the aliased path + if let Some(aliased) = self.aliases.get(link) { + return aliased.clone(); + } + // If the path is already absolute or if we don't have a base_dir, return it as is if path.is_absolute() || self.base_dir.is_none() { path.to_owned() @@ -47,6 +67,42 @@ impl FileLinkResolver { self.base_dir.as_ref().unwrap().join(link) } } + + /// This method creates a new resolver that is scoped to a specific subgraph + /// It will set the base directory to the parent directory of the manifest path + /// This is required because paths mentioned in the subgraph manifest are relative paths + /// and we need a new resolver with the right base directory for the specific subgraph + fn clone_for_deployment(&self, deployment: DeploymentHash) -> Result { + let mut resolver = self.clone(); + + let deployment_str = deployment.to_string(); + + // Create a path to the manifest based on the current resolver's + // base directory or default to using the deployment string as path + // If the deployment string is an alias, use the aliased path + let manifest_path = if let Some(aliased) = self.aliases.get(&deployment_str) { + aliased.clone() + } else { + match &resolver.base_dir { + Some(dir) => dir.join(&deployment_str), + None => PathBuf::from(deployment_str), + } + }; + + let canonical_manifest_path = manifest_path + .canonicalize() + .map_err(|e| Error::from(anyhow!("Failed to canonicalize manifest path: {}", e)))?; + + // The manifest path is the path of the subgraph manifest file in the build directory + // We use the parent directory as the base directory for the new resolver + let base_dir = canonical_manifest_path + .parent() + .ok_or_else(|| Error::from(anyhow!("Manifest path has no parent directory")))? + .to_path_buf(); + + resolver.base_dir = Some(base_dir); + Ok(resolver) + } } pub fn remove_prefix(link: &str) -> &str { @@ -87,6 +143,13 @@ impl LinkResolverTrait for FileLinkResolver { } } + fn for_deployment( + &self, + deployment: DeploymentHash, + ) -> Result, Error> { + Ok(Box::new(self.clone_for_deployment(deployment)?)) + } + async fn get_block(&self, _logger: &Logger, _link: &Link) -> Result, Error> { Err(anyhow!("get_block is not implemented for FileLinkResolver").into()) } @@ -118,7 +181,7 @@ mod tests { file.write_all(test_content).unwrap(); // Create a resolver without a base directory - let resolver = FileLinkResolver::new(); + let resolver = FileLinkResolver::default(); let logger = slog::Logger::root(slog::Discard, slog::o!()); // Test valid path resolution @@ -220,4 +283,65 @@ mod tests { let _ = fs::remove_file(test_file_path); let _ = fs::remove_dir(temp_dir); } + + #[tokio::test] + async fn test_file_resolver_with_aliases() { + // Create a temporary directory for test files + let temp_dir = env::temp_dir().join("file_resolver_test_aliases"); + let _ = fs::create_dir_all(&temp_dir); + + // Create two test files with different content + let test_file1_path = temp_dir.join("file.txt"); + let test_content1 = b"This is the file content"; + let mut file1 = fs::File::create(&test_file1_path).unwrap(); + file1.write_all(test_content1).unwrap(); + + let test_file2_path = temp_dir.join("another_file.txt"); + let test_content2 = b"This is another file content"; + let mut file2 = fs::File::create(&test_file2_path).unwrap(); + file2.write_all(test_content2).unwrap(); + + // Create aliases mapping + let mut aliases = HashMap::new(); + aliases.insert("alias1".to_string(), test_file1_path.clone()); + aliases.insert("alias2".to_string(), test_file2_path.clone()); + aliases.insert("deployment-id".to_string(), test_file1_path.clone()); + + // Create resolver with aliases + let resolver = FileLinkResolver::new(Some(temp_dir.clone()), aliases); + let logger = slog::Logger::root(slog::Discard, slog::o!()); + + // Test resolving by aliases + let link1 = Link { + link: "alias1".to_string(), + }; + let result1 = resolver.cat(&logger, &link1).await.unwrap(); + assert_eq!(result1, test_content1); + + let link2 = Link { + link: "alias2".to_string(), + }; + let result2 = resolver.cat(&logger, &link2).await.unwrap(); + assert_eq!(result2, test_content2); + + // Test that the alias works in for_deployment as well + let deployment = DeploymentHash::new("deployment-id").unwrap(); + let deployment_resolver = resolver.clone_for_deployment(deployment).unwrap(); + + let expected_dir = test_file1_path.parent().unwrap(); + let deployment_base_dir = deployment_resolver.base_dir.clone().unwrap(); + + let canonical_expected_dir = expected_dir.canonicalize().unwrap(); + let canonical_deployment_dir = deployment_base_dir.canonicalize().unwrap(); + + assert_eq!( + canonical_deployment_dir, canonical_expected_dir, + "Build directory paths don't match" + ); + + // Clean up + let _ = fs::remove_file(test_file1_path); + let _ = fs::remove_file(test_file2_path); + let _ = fs::remove_dir(temp_dir); + } } diff --git a/graph/src/components/link_resolver/ipfs.rs b/graph/src/components/link_resolver/ipfs.rs index 9ecf4ff02e3..1897d781c3c 100644 --- a/graph/src/components/link_resolver/ipfs.rs +++ b/graph/src/components/link_resolver/ipfs.rs @@ -74,6 +74,13 @@ impl LinkResolverTrait for IpfsResolver { Box::new(s) } + fn for_deployment( + &self, + _deployment: DeploymentHash, + ) -> Result, Error> { + Ok(Box::new(self.cheap_clone())) + } + async fn cat(&self, logger: &Logger, link: &Link) -> Result, Error> { let path = ContentPath::new(&link.link)?; let timeout = self.timeout; diff --git a/graph/src/components/link_resolver/mod.rs b/graph/src/components/link_resolver/mod.rs index 851b4296b47..05728bbcc29 100644 --- a/graph/src/components/link_resolver/mod.rs +++ b/graph/src/components/link_resolver/mod.rs @@ -3,7 +3,7 @@ use std::time::Duration; use slog::Logger; use crate::data::subgraph::Link; -use crate::prelude::Error; +use crate::prelude::{DeploymentHash, Error}; use std::fmt::Debug; mod arweave; @@ -30,6 +30,13 @@ pub trait LinkResolver: Send + Sync + 'static + Debug { /// Fetches the IPLD block contents as bytes. async fn get_block(&self, logger: &Logger, link: &Link) -> Result, Error>; + /// Creates a new resolver that is scoped to a specific subgraph + /// This is used by FileLinkResolver to create a new resolver for a specific subgraph + /// For other resolvers, this method will simply return the current resolver + /// This is required because paths mentioned in the subgraph manifest are relative paths + /// and we need a new resolver with the right base directory for the specific subgraph + fn for_deployment(&self, deployment: DeploymentHash) -> Result, Error>; + /// Read the contents of `link` and deserialize them into a stream of JSON /// values. The values must each be on a single line; newlines are significant /// as they are used to split the file contents and each line is deserialized diff --git a/graph/src/components/subgraph/registrar.rs b/graph/src/components/subgraph/registrar.rs index 691c341e38b..361a704e754 100644 --- a/graph/src/components/subgraph/registrar.rs +++ b/graph/src/components/subgraph/registrar.rs @@ -45,6 +45,7 @@ pub trait SubgraphRegistrar: Send + Sync + 'static { start_block_block: Option, graft_block_override: Option, history_blocks: Option, + ignore_graft_base: bool, ) -> Result; async fn remove_subgraph(&self, name: SubgraphName) -> Result<(), SubgraphRegistrarError>; diff --git a/graph/src/data_source/subgraph.rs b/graph/src/data_source/subgraph.rs index 87b44e66174..3170754d499 100644 --- a/graph/src/data_source/subgraph.rs +++ b/graph/src/data_source/subgraph.rs @@ -259,6 +259,8 @@ impl UnresolvedDataSource { resolver: &Arc, logger: &Logger, ) -> Result>, Error> { + let resolver: Arc = + Arc::from(resolver.for_deployment(self.source.address.clone())?); let source_raw = resolver .cat(logger, &self.source.address.to_ipfs_link()) .await @@ -281,8 +283,10 @@ impl UnresolvedDataSource { self.source.address ))?; + let resolver: Arc = + Arc::from(resolver.for_deployment(self.source.address.clone())?); source_manifest - .resolve(resolver, logger, LATEST_VERSION.clone()) + .resolve(&resolver, logger, LATEST_VERSION.clone()) .await .context(format!( "Failed to resolve source subgraph [{}] manifest", diff --git a/node/src/bin/dev.rs b/node/src/bin/dev.rs index 1545c4bad4c..540319139b2 100644 --- a/node/src/bin/dev.rs +++ b/node/src/bin/dev.rs @@ -7,10 +7,14 @@ use graph::{ components::link_resolver::FileLinkResolver, env::EnvVars, log::logger, + slog::{error, info}, tokio::{self, sync::mpsc}, }; use graph_node::{ - dev::{helpers::DevModeContext, watcher::watch_subgraph_dir}, + dev::{ + helpers::DevModeContext, + watcher::{parse_manifest_args, watch_subgraphs}, + }, launcher, opt::Opt, }; @@ -38,10 +42,27 @@ pub struct DevOpt { #[clap( long, - help = "The location of the subgraph manifest file.", - default_value = "./build/subgraph.yaml" + value_name = "MANIFEST:[BUILD_DIR]", + help = "The location of the subgraph manifest file. If no build directory is provided, the default is 'build'. The file can be an alias, in the format '[BUILD_DIR:]manifest' where 'manifest' is the path to the manifest file, and 'BUILD_DIR' is the path to the build directory relative to the manifest file.", + default_value = "./subgraph.yaml", + value_delimiter = ',' )] - pub manifest: String, + pub manifests: Vec, + + #[clap( + long, + value_name = "ALIAS:MANIFEST:[BUILD_DIR]", + value_delimiter = ',', + help = "The location of the source subgraph manifest files. This is used to resolve aliases in the manifest files for subgraph data sources. The format is ALIAS:MANIFEST:[BUILD_DIR], where ALIAS is the alias name, BUILD_DIR is the build directory relative to the manifest file, and MANIFEST is the manifest file location." + )] + pub sources: Vec, + + #[clap( + long, + help = "The location of the database directory.", + default_value = "./build" + )] + pub database_dir: String, #[clap( long, @@ -63,7 +84,7 @@ pub struct DevOpt { } /// Builds the Graph Node options from DevOpt -fn build_args(dev_opt: &DevOpt, db_url: &str, manifest_path: &str) -> Result { +fn build_args(dev_opt: &DevOpt, db_url: &str) -> Result { let mut args = vec!["gnd".to_string()]; if !dev_opt.ipfs.is_empty() { @@ -76,16 +97,6 @@ fn build_args(dev_opt: &DevOpt, db_url: &str, manifest_path: &str) -> Result Result Result { - let manifest_path = Path::new(manifest_path_str); - - if !manifest_path.exists() { - anyhow::bail!("Subgraph manifest file not found at {}", manifest_path_str); - } - - let dir = manifest_path - .parent() - .context("Failed to get parent directory of manifest")?; - - dir.canonicalize() - .context("Failed to canonicalize build directory path") -} - async fn run_graph_node(opt: Opt, ctx: Option) -> Result<()> { let env_vars = Arc::new(EnvVars::from_env().context("Failed to load environment variables")?); @@ -122,18 +117,26 @@ async fn main() -> Result<()> { env_logger::init(); let dev_opt = DevOpt::parse(); - let build_dir = get_build_dir(&dev_opt.manifest)?; + let database_dir = Path::new(&dev_opt.database_dir); + + let logger = logger(true); + + info!(logger, "Starting Graph Node Dev"); + info!(logger, "Database directory: {}", database_dir.display()); let db = PgTempDBBuilder::new() - .with_data_dir_prefix(build_dir.clone()) + .with_data_dir_prefix(database_dir) .with_initdb_param("-E", "UTF8") .with_initdb_param("--locale", "C") .start_async() .await; let (tx, rx) = mpsc::channel(1); - let opt = build_args(&dev_opt, &db.connection_uri(), &dev_opt.manifest)?; - let file_link_resolver = Arc::new(FileLinkResolver::with_base_dir(&build_dir)); + let opt = build_args(&dev_opt, &db.connection_uri())?; + + let (manifests_paths, source_subgraph_aliases) = + parse_manifest_args(dev_opt.manifests, dev_opt.sources, &logger)?; + let file_link_resolver = Arc::new(FileLinkResolver::new(None, source_subgraph_aliases.clone())); let ctx = DevModeContext { watch: dev_opt.watch, @@ -141,11 +144,6 @@ async fn main() -> Result<()> { updates_rx: rx, }; - let subgraph = opt.subgraph.clone().unwrap(); - - // Set up logger - let logger = logger(opt.debug); - // Run graph node graph::spawn(async move { let _ = run_graph_node(opt, Some(ctx)).await; @@ -153,14 +151,18 @@ async fn main() -> Result<()> { if dev_opt.watch { graph::spawn_blocking(async move { - watch_subgraph_dir( + let result = watch_subgraphs( &logger, - build_dir, - subgraph, + manifests_paths, + source_subgraph_aliases, vec!["pgtemp-*".to_string()], tx, ) .await; + if let Err(e) = result { + error!(logger, "Error watching subgraphs"; "error" => e.to_string()); + std::process::exit(1); + } }); } diff --git a/node/src/dev/helpers.rs b/node/src/dev/helpers.rs index 45f7af9b75e..6a2bc97da0d 100644 --- a/node/src/dev/helpers.rs +++ b/node/src/dev/helpers.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use anyhow::Result; +use anyhow::{Context, Result}; use graph::components::link_resolver::FileLinkResolver; use graph::prelude::{ BlockPtr, DeploymentHash, NodeId, SubgraphRegistrarError, SubgraphStore as SubgraphStoreTrait, @@ -55,6 +55,7 @@ async fn deploy_subgraph( start_block, None, None, + true ) .await .and_then(|locator| { @@ -118,9 +119,44 @@ pub async fn watch_subgraph_updates( "hash" => hash.to_string(), "error" => e.to_string() ); + std::process::exit(1); } } error!(logger, "Subgraph watcher terminated unexpectedly"; "action" => "exiting"); std::process::exit(1); } + +/// Parse an alias string into a tuple of (alias_name, manifest, Option) +pub fn parse_alias(alias: &str) -> anyhow::Result<(String, String, Option)> { + let mut split = alias.split(':'); + let alias_name = split.next(); + let alias_value = split.next(); + + if alias_name.is_none() || alias_value.is_none() || split.next().is_some() { + return Err(anyhow::anyhow!( + "Invalid alias format: expected 'alias=[BUILD_DIR:]manifest', got '{}'", + alias + )); + } + + let alias_name = alias_name.unwrap().to_owned(); + let (manifest, build_dir) = parse_manifest_arg(alias_value.unwrap()) + .with_context(|| format!("While parsing alias '{}'", alias))?; + + Ok((alias_name, manifest, build_dir)) +} + +/// Parse a manifest string into a tuple of (manifest, Option) +pub fn parse_manifest_arg(value: &str) -> anyhow::Result<(String, Option)> { + match value.split_once(':') { + Some((manifest, build_dir)) if !manifest.is_empty() => { + Ok((manifest.to_owned(), Some(build_dir.to_owned()))) + } + Some(_) => Err(anyhow::anyhow!( + "Invalid manifest arg: missing manifest in '{}'", + value + )), + None => Ok((value.to_owned(), None)), + } +} diff --git a/node/src/dev/watcher.rs b/node/src/dev/watcher.rs index 53fbd729bcd..08acad403c0 100644 --- a/node/src/dev/watcher.rs +++ b/node/src/dev/watcher.rs @@ -1,28 +1,160 @@ +use anyhow::{anyhow, Context, Result}; use globset::{Glob, GlobSet, GlobSetBuilder}; use graph::prelude::{DeploymentHash, SubgraphName}; -use graph::slog::{error, info, Logger}; +use graph::slog::{self, error, info, Logger}; use graph::tokio::sync::mpsc::Sender; use notify::{recommended_watcher, Event, RecursiveMode, Watcher}; +use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::mpsc; use std::time::Duration; +use super::helpers::{parse_alias, parse_manifest_arg}; + const WATCH_DELAY: Duration = Duration::from_secs(5); +const DEFAULT_BUILD_DIR: &str = "build"; + +// Parses manifest arguments and returns a vector of paths to the manifest files +pub fn parse_manifest_args( + manifests: Vec, + subgraph_sources: Vec, + logger: &Logger, +) -> Result<(Vec, HashMap)> { + let mut manifests_paths = Vec::new(); + let mut source_subgraph_aliases = HashMap::new(); + + for subgraph_source in subgraph_sources { + let (alias_name, manifest_path_str, build_dir_opt) = parse_alias(&subgraph_source)?; + let manifest_path = + process_manifest(build_dir_opt, &manifest_path_str, Some(&alias_name), logger)?; + + manifests_paths.push(manifest_path.clone()); + source_subgraph_aliases.insert(alias_name, manifest_path); + } + + for manifest_str in manifests { + let (manifest_path_str, build_dir_opt) = parse_manifest_arg(&manifest_str) + .with_context(|| format!("While parsing manifest '{}'", manifest_str))?; + + let built_manifest_path = + process_manifest(build_dir_opt, &manifest_path_str, None, logger)?; + + manifests_paths.push(built_manifest_path); + } + + Ok((manifests_paths, source_subgraph_aliases)) +} + +/// Helper function to process a manifest +fn process_manifest( + build_dir_opt: Option, + manifest_path_str: &str, + alias_name: Option<&String>, + logger: &Logger, +) -> Result { + let build_dir_str = build_dir_opt.unwrap_or_else(|| DEFAULT_BUILD_DIR.to_owned()); + + info!(logger, "Validating manifest: {}", manifest_path_str); + + let manifest_path = Path::new(manifest_path_str); + let manifest_path = manifest_path + .canonicalize() + .with_context(|| format!("Manifest path does not exist: {}", manifest_path_str))?; + + // Get the parent directory of the manifest + let parent_dir = manifest_path + .parent() + .ok_or_else(|| { + anyhow!( + "Failed to get parent directory for manifest: {}", + manifest_path_str + ) + })? + .canonicalize() + .with_context(|| { + format!( + "Parent directory does not exist for manifest: {}", + manifest_path_str + ) + })?; + + // Create the build directory path by joining the parent directory with the build_dir_str + let build_dir = parent_dir.join(build_dir_str); + let build_dir = build_dir + .canonicalize() + .with_context(|| format!("Build directory does not exist: {}", build_dir.display()))?; + + let manifest_file_name = manifest_path.file_name().ok_or_else(|| { + anyhow!( + "Failed to get file name for manifest: {}", + manifest_path_str + ) + })?; + + let built_manifest_path = build_dir.join(manifest_file_name); + + info!( + logger, + "Watching manifest: {}", + built_manifest_path.display() + ); + + if let Some(name) = alias_name { + info!( + logger, + "Using build directory for {}: {}", + name, + build_dir.display() + ); + } else { + info!(logger, "Using build directory: {}", build_dir.display()); + } + + Ok(built_manifest_path) +} /// Sets up a watcher for the given directory with optional exclusions. /// Exclusions can include glob patterns like "pgtemp-*". -pub async fn watch_subgraph_dir( +pub async fn watch_subgraphs( logger: &Logger, - dir: PathBuf, - id: String, + manifests_paths: Vec, + source_subgraph_aliases: HashMap, exclusions: Vec, sender: Sender<(DeploymentHash, SubgraphName)>, -) { +) -> Result<()> { + let logger = logger.new(slog::o!("component" => "Watcher")); + + watch_subgraph_dirs( + &logger, + manifests_paths, + source_subgraph_aliases, + exclusions, + sender, + ) + .await?; + Ok(()) +} + +/// Sets up a watcher for the given directories with optional exclusions. +/// Exclusions can include glob patterns like "pgtemp-*". +pub async fn watch_subgraph_dirs( + logger: &Logger, + manifests_paths: Vec, + source_subgraph_aliases: HashMap, + exclusions: Vec, + sender: Sender<(DeploymentHash, SubgraphName)>, +) -> Result<()> { + if manifests_paths.is_empty() { + info!(logger, "No directories to watch"); + return Ok(()); + } + info!( logger, - "Watching for changes in directory: {}", - dir.display() + "Watching for changes in {} directories", + manifests_paths.len() ); + if !exclusions.is_empty() { info!(logger, "Excluding patterns: {}", exclusions.join(", ")); } @@ -33,31 +165,63 @@ pub async fn watch_subgraph_dir( // Create a channel to receive the events let (tx, rx) = mpsc::channel(); - // Create a watcher object let mut watcher = match recommended_watcher(tx) { Ok(w) => w, Err(e) => { error!(logger, "Error creating file watcher: {}", e); - return; + return Err(anyhow!("Error creating file watcher")); } }; - if let Err(e) = watcher.watch(&dir, RecursiveMode::Recursive) { - error!(logger, "Error watching directory {}: {}", dir.display(), e); - return; + for manifest_path in manifests_paths.iter() { + let dir = manifest_path.parent().unwrap(); + if let Err(e) = watcher.watch(dir, RecursiveMode::Recursive) { + error!(logger, "Error watching directory {}: {}", dir.display(), e); + std::process::exit(1); + } + info!(logger, "Watching directory: {}", dir.display()); } - let watch_dir = dir.clone(); - let watch_exclusion_set = exclusion_set.clone(); + // Process file change events + process_file_events( + logger, + rx, + &exclusion_set, + &manifests_paths, + &source_subgraph_aliases, + sender, + ) + .await +} +/// Processes file change events and triggers redeployments +async fn process_file_events( + logger: &Logger, + rx: mpsc::Receiver>, + exclusion_set: &GlobSet, + manifests_paths: &Vec, + source_subgraph_aliases: &HashMap, + sender: Sender<(DeploymentHash, SubgraphName)>, +) -> Result<()> { loop { - let first_event = match rx.recv() { - Ok(Ok(e)) if should_process_event(&e, &watch_dir, &watch_exclusion_set) => Some(e), + // Wait for an event + let event = match rx.recv() { + Ok(Ok(e)) => e, Ok(_) => continue, - Err(_) => break, + Err(_) => { + error!(logger, "Error receiving file change event"); + return Err(anyhow!("Error receiving file change event")); + } }; - if first_event.is_none() { + if !is_relevant_event( + &event, + manifests_paths + .iter() + .map(|p| p.parent().unwrap().to_path_buf()) + .collect(), + exclusion_set, + ) { continue; } @@ -73,13 +237,52 @@ pub async fn watch_subgraph_dir( } } + // Redeploy all subgraphs + redeploy_all_subgraphs(logger, manifests_paths, source_subgraph_aliases, &sender).await?; + } +} + +/// Checks if an event is relevant for any of the watched directories +fn is_relevant_event(event: &Event, watched_dirs: Vec, exclusion_set: &GlobSet) -> bool { + for path in event.paths.iter() { + for dir in watched_dirs.iter() { + if path.starts_with(dir) && should_process_event(event, dir, exclusion_set) { + return true; + } + } + } + false +} + +/// Redeploys all subgraphs in the order defined by the BTreeMap +async fn redeploy_all_subgraphs( + logger: &Logger, + manifests_paths: &Vec, + source_subgraph_aliases: &HashMap, + sender: &Sender<(DeploymentHash, SubgraphName)>, +) -> Result<()> { + info!(logger, "File change detected, redeploying all subgraphs"); + let mut count = 0; + for manifest_path in manifests_paths { + let alias_name = source_subgraph_aliases + .iter() + .find(|(_, path)| path == &manifest_path) + .map(|(name, _)| name); + + let id = alias_name + .map(|s| s.to_owned()) + .unwrap_or_else(|| manifest_path.display().to_string()); + let _ = sender .send(( - DeploymentHash::new(id.clone()).unwrap(), - SubgraphName::new("test").unwrap(), + DeploymentHash::new(id).map_err(|_| anyhow!("Failed to create deployment hash"))?, + SubgraphName::new(format!("subgraph-{}", count)) + .map_err(|_| anyhow!("Failed to create subgraph name"))?, )) .await; + count += 1; } + Ok(()) } /// Build a GlobSet from the provided patterns diff --git a/node/src/launcher.rs b/node/src/launcher.rs index d82a5d0fcbf..34c653f4d68 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -253,6 +253,7 @@ fn deploy_subgraph_from_flag( start_block, None, None, + false, ) .await } diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index 2c6bfdcb148..1892353c6a9 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -213,6 +213,7 @@ pub async fn run( None, None, None, + false, ) .await?; diff --git a/server/json-rpc/src/lib.rs b/server/json-rpc/src/lib.rs index 103d36f806c..970bb3959d3 100644 --- a/server/json-rpc/src/lib.rs +++ b/server/json-rpc/src/lib.rs @@ -133,6 +133,7 @@ impl ServerState { None, None, params.history_blocks, + false, ) .await { diff --git a/store/test-store/tests/chain/ethereum/manifest.rs b/store/test-store/tests/chain/ethereum/manifest.rs index f025be2e626..8b888c13da4 100644 --- a/store/test-store/tests/chain/ethereum/manifest.rs +++ b/store/test-store/tests/chain/ethereum/manifest.rs @@ -91,6 +91,13 @@ impl LinkResolverTrait for TextResolver { Box::new(self.clone()) } + fn for_deployment( + &self, + _deployment: DeploymentHash, + ) -> Result, anyhow::Error> { + Ok(Box::new(self.clone())) + } + async fn cat(&self, _logger: &Logger, link: &Link) -> Result, anyhow::Error> { self.texts .get(&link.link) diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index b8151857db3..b9c07a41e7d 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -612,6 +612,7 @@ pub async fn setup_inner( None, graft_block, None, + false, ) .await .expect("failed to create subgraph version");