Skip to content

Commit 65f7d4e

Browse files
feat(prover): Witness generator service (#4219)
## What ❔ The Witness generator Service is a lib for running Witness generator. The primitive exported by this lib is witness_generator_runner which is based on `ProverJobProcessor`. <!-- What are the changes this PR brings about? --> <!-- Example: This PR adds a PR template to the repo. --> <!-- (For bigger PRs adding more context is appreciated) --> ## Why ❔ The lib makes it easier for external teams to implement their own Artifacts Manager, Keystore and DB to run Witness Generator. <!-- Why are these changes done? What goal do they contribute to? What are the principles behind them? --> <!-- The `Why` has to be clear to non-Matter Labs entities running their own ZK Chain --> <!-- Example: PR templates ensure PR reviewers, observers, and future iterators are in context about the evolution of repos. --> ## Is this a breaking change? - [ ] Yes - [ ] No ## Operational changes <!-- Any config changes? Any new flags? Any changes to any scripts? --> <!-- Please add anything that non-Matter Labs entities running their own ZK Chain may need to know --> ## Checklist <!-- Check your PR fulfills the following items. --> <!-- For draft PRs check the boxes as you complete them. --> - [ ] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [ ] Code has been formatted via `zkstack dev fmt` and `zkstack dev lint`.
1 parent 0d150de commit 65f7d4e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+906
-503
lines changed

prover/Cargo.lock

Lines changed: 36 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

prover/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ zksync_prover_job_processor = { version = "=21.2.0", path = "crates/lib/prover_j
113113
zksync_circuit_prover_service = { version = "=21.2.0", path = "crates/lib/circuit_prover_service" }
114114
zksync_prover_job_monitor = { version = "=21.2.0", path = "crates/bin/prover_job_monitor" }
115115
zksync_proof_fri_compressor_service = { version = "=21.2.0", path = "crates/lib/proof_fri_compressor_service" }
116+
zksync_witness_generator_service = { version = "=21.2.0", path = "crates/lib/witness_generator_service" }
116117

117118
# for `perf` profiling
118119
[profile.perf]

prover/crates/bin/circuit_prover/src/types.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use std::{collections::HashMap, sync::Arc};
22

3+
use zksync_circuit_prover_service::types::setup_data::GoldilocksGpuProverSetupData;
34
use zksync_prover_fri_types::{
45
circuit_definitions::boojum::cs::implementations::setup::FinalizationHintsForProver,
56
ProverServiceDataKey,
67
};
7-
use zksync_prover_keystore::GoldilocksGpuProverSetupData;
88

99
// TODO: To be moved to circuit_prover_service lib & adjusted to new type idiom
1010
// cache types

prover/crates/bin/gpu_checker/src/main.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use tokio::fs;
1414
use zksync_circuit_prover_service::{
1515
gpu_circuit_prover::GpuCircuitProverExecutor,
1616
types::{
17-
circuit_prover_payload::GpuCircuitProverPayload,
17+
circuit_prover_payload::GpuCircuitProverPayload, setup_data::GoldilocksGpuProverSetupData,
1818
witness_vector_generator_payload::WitnessVectorGeneratorPayload,
1919
},
2020
witness_vector_generator::WitnessVectorGeneratorExecutor,
@@ -34,10 +34,7 @@ use zksync_prover_fri_types::{
3434
ProverServiceDataKey,
3535
};
3636
use zksync_prover_job_processor::Executor;
37-
use zksync_prover_keystore::{
38-
keystore::{Keystore, ProverServiceDataType},
39-
GoldilocksGpuProverSetupData,
40-
};
37+
use zksync_prover_keystore::keystore::{Keystore, ProverServiceDataType};
4138
use zksync_types::{
4239
basic_fri_types::AggregationRound, prover_dal::FriProverJobMetadata, L1BatchId, L1BatchNumber,
4340
L2ChainId,

prover/crates/bin/witness_generator/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,15 @@ zksync_prover_keystore.workspace = true
2525
zksync_prover_fri_types.workspace = true
2626
zksync_prover_fri_utils.workspace = true
2727
zksync_circuit_prover_service.workspace = true
28+
zksync_witness_generator_service.workspace = true
2829

2930
zkevm_test_harness = { workspace = true }
3031
circuit_definitions = { workspace = true, features = ["log_tracing"] }
3132

3233
anyhow.workspace = true
3334
tracing.workspace = true
3435
tokio = { workspace = true, features = ["time", "macros"] }
36+
tokio-util.workspace = true
3537
futures = { workspace = true, features = ["compat"] }
3638
serde = { workspace = true, features = ["derive"] }
3739
async-trait.workspace = true
Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,4 @@
11
#![allow(incomplete_features)] // We have to use generic const exprs.
22
#![feature(generic_const_exprs)]
33

4-
pub mod artifacts;
54
pub mod metrics;
6-
pub mod precalculated_merkle_paths_provider;
7-
pub mod rounds;
8-
mod storage_oracle;
9-
#[cfg(test)]
10-
mod tests;
11-
pub mod utils;
12-
mod witness;

prover/crates/bin/witness_generator/src/main.rs

Lines changed: 76 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
#![allow(incomplete_features)] // We have to use generic const exprs.
22
#![feature(generic_const_exprs)]
33

4-
use std::time::{Duration, Instant};
4+
use std::{
5+
sync::Arc,
6+
time::{Duration, Instant},
7+
};
58

69
use anyhow::{anyhow, Context as _};
7-
use futures::{channel::mpsc, executor::block_on, SinkExt, StreamExt};
810
#[cfg(not(target_env = "msvc"))]
911
use jemallocator::Jemalloc;
1012
use structopt::StructOpt;
11-
use tokio::sync::watch;
13+
use tokio::sync::oneshot;
14+
use tokio_util::sync::CancellationToken;
1215
use zksync_config::{
1316
configs::{GeneralConfig, PostgresSecrets},
1417
full_config_schema,
@@ -18,16 +21,16 @@ use zksync_object_store::ObjectStoreFactory;
1821
use zksync_prover_dal::{ConnectionPool, Prover, ProverDal};
1922
use zksync_prover_fri_types::PROVER_PROTOCOL_SEMANTIC_VERSION;
2023
use zksync_prover_keystore::keystore::Keystore;
21-
use zksync_queued_job_processor::JobProcessor;
2224
use zksync_task_management::ManagedTasks;
2325
use zksync_types::{basic_fri_types::AggregationRound, protocol_version::ProtocolSemanticVersion};
24-
use zksync_witness_generator::{
25-
metrics::SERVER_METRICS,
26-
rounds::{
27-
BasicCircuits, LeafAggregation, NodeAggregation, RecursionTip, Scheduler, WitnessGenerator,
28-
},
26+
use zksync_witness_generator::metrics::SERVER_METRICS;
27+
use zksync_witness_generator_service::{
28+
rounds::{BasicCircuits, LeafAggregation, NodeAggregation, RecursionTip, Scheduler},
29+
witness_generator_runner,
2930
};
3031

32+
const GRACEFUL_SHUTDOWN_DURATION: Duration = Duration::from_secs(20);
33+
3134
#[cfg(not(target_env = "msvc"))]
3235
#[global_allocator]
3336
static GLOBAL: Jemalloc = Jemalloc;
@@ -94,7 +97,42 @@ async fn ensure_protocol_alignment(
9497

9598
#[tokio::main]
9699
async fn main() -> anyhow::Result<()> {
97-
let started_at = Instant::now();
100+
let (stop_signal_sender, stop_signal_receiver) = oneshot::channel();
101+
let mut stop_signal_sender = Some(stop_signal_sender);
102+
ctrlc::set_handler(move || {
103+
if let Some(sender) = stop_signal_sender.take() {
104+
sender.send(()).ok();
105+
}
106+
})
107+
.context("Error setting Ctrl+C handler")?;
108+
109+
let cancellation_token = CancellationToken::new();
110+
let mut managed_tasks = ManagedTasks::new(vec![]);
111+
let (metrics_stop_sender, metrics_stop_receiver) = tokio::sync::watch::channel(false);
112+
113+
tokio::select! {
114+
res = run_inner(cancellation_token.clone(), metrics_stop_receiver, &mut managed_tasks) => {
115+
res.context("Failed to run witness generator")?;
116+
},
117+
_ = stop_signal_receiver => {
118+
tracing::info!("Stop request received, shutting down");
119+
}
120+
}
121+
let shutdown_time = Instant::now();
122+
cancellation_token.cancel();
123+
metrics_stop_sender
124+
.send(true)
125+
.context("failed to stop metrics")?;
126+
managed_tasks.complete(GRACEFUL_SHUTDOWN_DURATION).await;
127+
tracing::info!("Tasks completed in {:?}.", shutdown_time.elapsed());
128+
Ok(())
129+
}
130+
131+
async fn run_inner(
132+
cancellation_token: CancellationToken,
133+
metrics_stop_receiver: tokio::sync::watch::Receiver<bool>,
134+
managed_tasks: &mut ManagedTasks,
135+
) -> anyhow::Result<()> {
98136
let opt = Opt::from_args();
99137
let schema = full_config_schema();
100138
let config_file_paths = ConfigFilePaths {
@@ -109,6 +147,7 @@ async fn main() -> anyhow::Result<()> {
109147
let mut repo = config_sources.build_repository(&schema);
110148
let general_config: GeneralConfig = repo.parse()?;
111149
let database_secrets: PostgresSecrets = repo.parse()?;
150+
let started_at = Instant::now();
112151

113152
let prover_config = general_config.prover_config.context("prover config")?;
114153
let object_store_config = prover_config.prover_object_store;
@@ -129,7 +168,6 @@ async fn main() -> anyhow::Result<()> {
129168
.build()
130169
.await
131170
.context("failed to build a prover_connection_pool")?;
132-
let (stop_sender, stop_receiver) = watch::channel(false);
133171

134172
let protocol_version = PROVER_PROTOCOL_SEMANTIC_VERSION;
135173

@@ -159,9 +197,11 @@ async fn main() -> anyhow::Result<()> {
159197
};
160198

161199
let mut tasks = vec![tokio::spawn(
162-
prometheus_exporter_config.run(stop_receiver.clone()),
200+
prometheus_exporter_config.run(metrics_stop_receiver),
163201
)];
164202

203+
let keystore = Arc::new(keystore);
204+
165205
for round in rounds {
166206
tracing::info!(
167207
"initializing the {:?} witness generator, batch size: {:?} with protocol_version: {:?}",
@@ -170,60 +210,65 @@ async fn main() -> anyhow::Result<()> {
170210
&protocol_version
171211
);
172212

173-
let witness_generator_task = match round {
213+
let witness_generator_tasks = match round {
174214
AggregationRound::BasicCircuits => {
175-
let generator = WitnessGenerator::<BasicCircuits>::new(
176-
config.clone(),
215+
let runner = witness_generator_runner::<BasicCircuits>(
216+
config.max_circuits_in_flight,
177217
store_factory.create_store().await?,
178218
connection_pool.clone(),
179219
protocol_version,
180220
keystore.clone(),
221+
cancellation_token.clone(),
181222
);
182-
generator.run(stop_receiver.clone(), opt.batch_size)
223+
runner.run()
183224
}
184225
AggregationRound::LeafAggregation => {
185-
let generator = WitnessGenerator::<LeafAggregation>::new(
186-
config.clone(),
226+
let runner = witness_generator_runner::<LeafAggregation>(
227+
config.max_circuits_in_flight,
187228
store_factory.create_store().await?,
188229
connection_pool.clone(),
189230
protocol_version,
190231
keystore.clone(),
232+
cancellation_token.clone(),
191233
);
192-
generator.run(stop_receiver.clone(), opt.batch_size)
234+
runner.run()
193235
}
194236
AggregationRound::NodeAggregation => {
195-
let generator = WitnessGenerator::<NodeAggregation>::new(
196-
config.clone(),
237+
let runner = witness_generator_runner::<NodeAggregation>(
238+
config.max_circuits_in_flight,
197239
store_factory.create_store().await?,
198240
connection_pool.clone(),
199241
protocol_version,
200242
keystore.clone(),
243+
cancellation_token.clone(),
201244
);
202-
generator.run(stop_receiver.clone(), opt.batch_size)
245+
runner.run()
203246
}
204247
AggregationRound::RecursionTip => {
205-
let generator = WitnessGenerator::<RecursionTip>::new(
206-
config.clone(),
248+
let runner = witness_generator_runner::<RecursionTip>(
249+
config.max_circuits_in_flight,
207250
store_factory.create_store().await?,
208251
connection_pool.clone(),
209252
protocol_version,
210253
keystore.clone(),
254+
cancellation_token.clone(),
211255
);
212-
generator.run(stop_receiver.clone(), opt.batch_size)
256+
runner.run()
213257
}
214258
AggregationRound::Scheduler => {
215-
let generator = WitnessGenerator::<Scheduler>::new(
216-
config.clone(),
259+
let runner = witness_generator_runner::<Scheduler>(
260+
config.max_circuits_in_flight,
217261
store_factory.create_store().await?,
218262
connection_pool.clone(),
219263
protocol_version,
220264
keystore.clone(),
265+
cancellation_token.clone(),
221266
);
222-
generator.run(stop_receiver.clone(), opt.batch_size)
267+
runner.run()
223268
}
224269
};
225270

226-
tasks.push(tokio::spawn(witness_generator_task));
271+
tasks.extend(witness_generator_tasks);
227272

228273
tracing::info!(
229274
"initialized {:?} witness generator in {:?}",
@@ -233,21 +278,7 @@ async fn main() -> anyhow::Result<()> {
233278
SERVER_METRICS.init_latency[&round.into()].set(started_at.elapsed());
234279
}
235280

236-
let (mut stop_signal_sender, mut stop_signal_receiver) = mpsc::channel(256);
237-
ctrlc::set_handler(move || {
238-
block_on(stop_signal_sender.send(true)).expect("Ctrl+C signal send");
239-
})
240-
.expect("Error setting Ctrl+C handler");
241-
let mut tasks = ManagedTasks::new(tasks).allow_tasks_to_finish();
242-
tokio::select! {
243-
_ = tasks.wait_single() => {},
244-
_ = stop_signal_receiver.next() => {
245-
tracing::info!("Stop request received, shutting down");
246-
}
247-
}
248-
249-
stop_sender.send_replace(true);
250-
tasks.complete(Duration::from_secs(5)).await;
251-
tracing::info!("Finished witness generation");
281+
*managed_tasks = ManagedTasks::new(tasks);
282+
managed_tasks.wait_single().await;
252283
Ok(())
253284
}

prover/crates/bin/witness_generator/src/metrics.rs

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,8 @@
11
use std::time::Duration;
22

3-
use vise::{Buckets, Family, Gauge, Histogram, LabeledFamily, Metrics};
3+
use vise::{Gauge, LabeledFamily, Metrics};
44
use zksync_prover_fri_utils::metrics::StageLabel;
55

6-
#[derive(Debug, Metrics)]
7-
#[metrics(prefix = "prover_fri_witness_generator")]
8-
pub(crate) struct WitnessGeneratorMetrics {
9-
#[metrics(buckets = Buckets::LATENCIES)]
10-
pub blob_fetch_time: Family<StageLabel, Histogram<Duration>>,
11-
#[metrics(buckets = Buckets::LATENCIES)]
12-
pub prepare_job_time: Family<StageLabel, Histogram<Duration>>,
13-
#[metrics(buckets = Buckets::exponential(60.0..=61440.0, 2.0))]
14-
pub witness_generation_time: Family<StageLabel, Histogram<Duration>>,
15-
#[metrics(buckets = Buckets::LATENCIES)]
16-
pub blob_save_time: Family<StageLabel, Histogram<Duration>>,
17-
}
18-
19-
#[vise::register]
20-
pub(crate) static WITNESS_GENERATOR_METRICS: vise::Global<WitnessGeneratorMetrics> =
21-
vise::Global::new();
22-
236
#[derive(Debug, Metrics)]
247
#[metrics(prefix = "prover")]
258
pub struct ServerMetrics {

0 commit comments

Comments
 (0)