Skip to content

gnd: Support deploying multiple subgraphs when using dev mode #5999

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: krishna/graph-dev
Choose a base branch
from
18 changes: 14 additions & 4 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
loc: DeploymentLocator,
manifest: serde_yaml::Mapping,
stop_block: Option<BlockNumber>,
link_resolver_override: Option<Arc<dyn LinkResolver>>,
) {
let runner_index = self.subgraph_start_counter.fetch_add(1, Ordering::SeqCst);

Expand Down Expand Up @@ -89,6 +90,7 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
stop_block,
Box::new(SubgraphTriggerProcessor {}),
deployment_status_metric,
link_resolver_override,
)
.await?;

Expand All @@ -104,6 +106,7 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
stop_block,
Box::new(SubgraphTriggerProcessor {}),
deployment_status_metric,
link_resolver_override,
)
.await?;

Expand All @@ -119,6 +122,7 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
stop_block,
Box::new(SubgraphTriggerProcessor {}),
deployment_status_metric,
link_resolver_override,
)
.await?;

Expand All @@ -136,6 +140,7 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
loc.clone(),
)),
deployment_status_metric,
link_resolver_override,
)
.await?;

Expand Down Expand Up @@ -247,6 +252,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
stop_block: Option<BlockNumber>,
tp: Box<dyn TriggerProcessor<C, RuntimeHostBuilder<C>>>,
deployment_status_metric: DeploymentStatusMetric,
link_resolver_override: Option<Arc<dyn LinkResolver>>,
) -> anyhow::Result<SubgraphRunner<C, RuntimeHostBuilder<C>>>
where
C: Blockchain,
Expand All @@ -261,6 +267,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
tp,
deployment_status_metric,
false,
link_resolver_override,
)
.await
}
Expand All @@ -275,6 +282,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
tp: Box<dyn TriggerProcessor<C, RuntimeHostBuilder<C>>>,
deployment_status_metric: DeploymentStatusMetric,
is_runner_test: bool,
link_resolver_override: Option<Arc<dyn LinkResolver>>,
) -> anyhow::Result<SubgraphRunner<C, RuntimeHostBuilder<C>>>
where
C: Blockchain,
Expand All @@ -287,16 +295,18 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
let manifest = UnresolvedSubgraphManifest::parse(deployment.hash.cheap_clone(), manifest)?;

// Allow for infinite retries for subgraph definition files.
let link_resolver = Arc::from(self.link_resolver.with_retries());
let link_resolver: Arc<dyn LinkResolver> = match link_resolver_override {
Some(link_resolver) => Arc::from(link_resolver.with_retries()),
None => Arc::from(self.link_resolver.with_retries()),
};

// Make sure the `raw_yaml` is present on both this subgraph and the graft base.
self.subgraph_store
.set_manifest_raw_yaml(&deployment.hash, raw_yaml)
.await?;
if let Some(graft) = &manifest.graft {
if self.subgraph_store.is_deployed(&graft.base)? {
let file_bytes = self
.link_resolver
let file_bytes = link_resolver
.cat(&logger, &graft.base.to_ipfs_link())
.await?;
let yaml = String::from_utf8(file_bytes)?;
Expand Down Expand Up @@ -482,7 +492,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
let (runtime_adapter, decoder_hook) = chain.runtime()?;
let host_builder = graph_runtime_wasm::RuntimeHostBuilder::new(
runtime_adapter,
self.link_resolver.cheap_clone(),
link_resolver.cheap_clone(),
subgraph_store.ens_lookup(),
);

Expand Down
11 changes: 8 additions & 3 deletions core/src/subgraph/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl<I: SubgraphInstanceManager> SubgraphAssignmentProviderTrait for SubgraphAss
&self,
loc: DeploymentLocator,
stop_block: Option<BlockNumber>,
link_resolver_override: Option<Arc<dyn LinkResolver>>,
) -> Result<(), SubgraphAssignmentProviderError> {
let logger = self.logger_factory.subgraph_logger(&loc);

Expand All @@ -86,8 +87,12 @@ impl<I: SubgraphInstanceManager> SubgraphAssignmentProviderTrait for SubgraphAss
));
}

let file_bytes = self
.link_resolver
let link_resolver = match link_resolver_override {
Some(link_resolver) => link_resolver,
None => self.link_resolver.clone(),
};

let file_bytes = link_resolver
.cat(&logger, &loc.hash.to_ipfs_link())
.await
.map_err(SubgraphAssignmentProviderError::ResolveError)?;
Expand All @@ -97,7 +102,7 @@ impl<I: SubgraphInstanceManager> SubgraphAssignmentProviderTrait for SubgraphAss

self.instance_manager
.cheap_clone()
.start_subgraph(loc, raw, stop_block)
.start_subgraph(loc, raw, stop_block, Some(link_resolver))
.await;

Ok(())
Expand Down
18 changes: 10 additions & 8 deletions core/src/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct SubgraphRegistrar<P, S, SM> {
logger: Logger,
logger_factory: LoggerFactory,
resolver: Arc<dyn LinkResolver>,
provider: Arc<P>,
pub provider: Arc<P>,
store: Arc<S>,
subscription_manager: Arc<SM>,
chains: Arc<BlockchainMap>,
Expand Down Expand Up @@ -278,6 +278,7 @@ where
start_block_override: Option<BlockPtr>,
graft_block_override: Option<BlockPtr>,
history_blocks: Option<i32>,
link_resolver_override: Option<Arc<dyn LinkResolver>>,
) -> Result<DeploymentLocator, SubgraphRegistrarError> {
// We don't have a location for the subgraph yet; that will be
// assigned when we deploy for real. For logging purposes, make up a
Expand All @@ -286,9 +287,10 @@ where
.logger_factory
.subgraph_logger(&DeploymentLocator::new(DeploymentId(0), hash.clone()));

let link_resolver = link_resolver_override.unwrap_or_else(|| self.resolver.clone());

let raw: serde_yaml::Mapping = {
let file_bytes = self
.resolver
let file_bytes = link_resolver
.cat(&logger, &hash.to_ipfs_link())
.await
.map_err(|e| {
Expand Down Expand Up @@ -323,7 +325,7 @@ where
node_id,
debug_fork,
self.version_switching_mode,
&self.resolver,
&link_resolver,
history_blocks,
)
.await?
Expand All @@ -341,7 +343,7 @@ where
node_id,
debug_fork,
self.version_switching_mode,
&self.resolver,
&link_resolver,
history_blocks,
)
.await?
Expand All @@ -359,7 +361,7 @@ where
node_id,
debug_fork,
self.version_switching_mode,
&self.resolver,
&link_resolver,
history_blocks,
)
.await?
Expand All @@ -377,7 +379,7 @@ where
node_id,
debug_fork,
self.version_switching_mode,
&self.resolver,
&link_resolver,
history_blocks,
)
.await?
Expand Down Expand Up @@ -479,7 +481,7 @@ async fn start_subgraph(
trace!(logger, "Start subgraph");

let start_time = Instant::now();
let result = provider.start(deployment.clone(), None).await;
let result = provider.start(deployment.clone(), None, None).await;

debug!(
logger,
Expand Down
3 changes: 2 additions & 1 deletion graph/src/components/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::prelude::BlockNumber;
use crate::prelude::{BlockNumber, LinkResolver};
use std::sync::Arc;

use crate::components::store::DeploymentLocator;
Expand All @@ -15,6 +15,7 @@ pub trait SubgraphInstanceManager: Send + Sync + 'static {
deployment: DeploymentLocator,
manifest: serde_yaml::Mapping,
stop_block: Option<BlockNumber>,
link_resolver_override: Option<Arc<dyn LinkResolver>>,
);
async fn stop_subgraph(&self, deployment: DeploymentLocator);
}
1 change: 1 addition & 0 deletions graph/src/components/subgraph/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub trait SubgraphAssignmentProvider: Send + Sync + 'static {
&self,
deployment: DeploymentLocator,
stop_block: Option<BlockNumber>,
link_resolver_override: Option<Arc<dyn LinkResolver>>,
) -> Result<(), SubgraphAssignmentProviderError>;
async fn stop(
&self,
Expand Down
1 change: 1 addition & 0 deletions graph/src/components/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub trait SubgraphRegistrar: Send + Sync + 'static {
start_block_block: Option<BlockPtr>,
graft_block_override: Option<BlockPtr>,
history_blocks: Option<i32>,
link_resolver_override: Option<Arc<dyn LinkResolver>>,
) -> Result<DeploymentLocator, SubgraphRegistrarError>;

async fn remove_subgraph(&self, name: SubgraphName) -> Result<(), SubgraphRegistrarError>;
Expand Down
71 changes: 24 additions & 47 deletions node/src/bin/dev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ use graph::{
components::link_resolver::FileLinkResolver,
env::EnvVars,
log::logger,
slog::info,
tokio::{self, sync::mpsc},
};
use graph_node::{
dev::{helpers::DevModeContext, watcher::watch_subgraph_dir},
dev::{helpers::DevModeContext, watcher::watch_subgraphs},
launcher,
opt::Opt,
};
Expand Down Expand Up @@ -39,9 +40,17 @@ pub struct DevOpt {
#[clap(
long,
help = "The location of the subgraph manifest file.",
default_value = "./build/subgraph.yaml"
default_value = "./build/subgraph.yaml",
value_delimiter = ','
)]
pub manifest: String,
pub manifests: Vec<String>,

#[clap(
long,
help = "The location of the database directory.",
default_value = "./build"
)]
pub database_dir: String,

#[clap(
long,
Expand All @@ -63,7 +72,7 @@ pub struct DevOpt {
}

/// Builds the Graph Node options from DevOpt
fn build_args(dev_opt: &DevOpt, db_url: &str, manifest_path: &str) -> Result<Opt> {
fn build_args(dev_opt: &DevOpt, db_url: &str) -> Result<Opt> {
let mut args = vec!["gnd".to_string()];

if !dev_opt.ipfs.is_empty() {
Expand All @@ -76,16 +85,6 @@ fn build_args(dev_opt: &DevOpt, db_url: &str, manifest_path: &str) -> Result<Opt
args.push(dev_opt.ethereum_rpc.join(","));
}

let path = Path::new(manifest_path);
let file_name = path
.file_name()
.context("Invalid manifest path: no file name component")?
.to_str()
.context("Invalid file name")?;

args.push("--subgraph".to_string());
args.push(file_name.to_string());

args.push("--postgres-url".to_string());
args.push(db_url.to_string());

Expand All @@ -94,22 +93,6 @@ fn build_args(dev_opt: &DevOpt, db_url: &str, manifest_path: &str) -> Result<Opt
Ok(opt)
}

/// Validates the manifest file exists and returns the build directory
fn get_build_dir(manifest_path_str: &str) -> Result<std::path::PathBuf> {
let manifest_path = Path::new(manifest_path_str);

if !manifest_path.exists() {
anyhow::bail!("Subgraph manifest file not found at {}", manifest_path_str);
}

let dir = manifest_path
.parent()
.context("Failed to get parent directory of manifest")?;

dir.canonicalize()
.context("Failed to canonicalize build directory path")
}

async fn run_graph_node(opt: Opt, ctx: Option<DevModeContext>) -> Result<()> {
let env_vars = Arc::new(EnvVars::from_env().context("Failed to load environment variables")?);

Expand All @@ -122,45 +105,39 @@ async fn main() -> Result<()> {
env_logger::init();
let dev_opt = DevOpt::parse();

let build_dir = get_build_dir(&dev_opt.manifest)?;
let database_dir = Path::new(&dev_opt.database_dir);

let logger = logger(true);

info!(logger, "Starting Graph Node Dev");
info!(logger, "Database directory: {}", database_dir.display());

let db = PgTempDBBuilder::new()
.with_data_dir_prefix(build_dir.clone())
.with_data_dir_prefix(database_dir)
.with_initdb_param("-E", "UTF8")
.with_initdb_param("--locale", "C")
.start_async()
.await;

let (tx, rx) = mpsc::channel(1);
let opt = build_args(&dev_opt, &db.connection_uri(), &dev_opt.manifest)?;
let file_link_resolver = Arc::new(FileLinkResolver::with_base_dir(&build_dir));
let opt = build_args(&dev_opt, &db.connection_uri())?;
let file_link_resolver = Arc::new(FileLinkResolver::with_base_dir(database_dir));

let ctx = DevModeContext {
watch: dev_opt.watch,
file_link_resolver,
updates_rx: rx,
};

let subgraph = opt.subgraph.clone().unwrap();

// Set up logger
let logger = logger(opt.debug);

// Run graph node
graph::spawn(async move {
let _ = run_graph_node(opt, Some(ctx)).await;
});

if dev_opt.watch {
graph::spawn_blocking(async move {
watch_subgraph_dir(
&logger,
build_dir,
subgraph,
vec!["pgtemp-*".to_string()],
tx,
)
.await;
let _ =
watch_subgraphs(&logger, dev_opt.manifests, vec!["pgtemp-*".to_string()], tx).await;
});
}

Expand Down
Loading