-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbootstrap.rs
More file actions
148 lines (131 loc) · 4.63 KB
/
bootstrap.rs
File metadata and controls
148 lines (131 loc) · 4.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
use std::sync::Arc;
use anyhow::Result;
use bitcoind_async_client::{Auth, Client};
use strata_asm_params::AsmParams;
use strata_asm_proof_db::SledProofDb;
use strata_asm_spec::StrataAsmSpec;
use strata_asm_worker::AsmWorkerBuilder;
use strata_tasks::TaskExecutor;
use tokio::{
runtime::{Builder as RuntimeBuilder, Handle},
sync::mpsc,
task::{self, LocalSet},
};
use crate::{
block_watcher::drive_asm_from_bitcoin,
config::{AsmRpcConfig, BitcoinConfig},
prover::{InputBuilder, ProofBackend, ProofOrchestrator},
rpc_server::run_rpc_server,
storage::create_storage,
worker_context::AsmWorkerContext,
};
pub(crate) async fn bootstrap(
config: AsmRpcConfig,
params: AsmParams,
executor: TaskExecutor,
) -> Result<()> {
// 1. Create storage
let (state_db, mmr_db) = create_storage(&config.database)?;
// 2. Connect to Bitcoin node
let bitcoin_client = Arc::new(connect_bitcoin(&config.bitcoin).await?);
// 3. Create our simplified BridgeWorkerContext
let runtime_handle = Handle::current();
let worker_context = AsmWorkerContext::new(
runtime_handle.clone(),
bitcoin_client.clone(),
state_db.clone(),
mmr_db.clone(),
);
// 4. Launch ASM worker
let asm_worker = AsmWorkerBuilder::new()
.with_context(worker_context)
.with_asm_params(Arc::new(params.clone()))
.with_asm_spec(StrataAsmSpec)
.launch(&executor)?;
// 5. Compute the starting height for the block watcher.
let start_height = match asm_worker.monitor().get_current().cur_block {
Some(blk) => blk.height(),
None => params.anchor.block.height() + 1,
};
let asm_worker = Arc::new(asm_worker);
// 6. Optionally create the proof channel and spawn the orchestrator
let (proof_tx, proof_db_for_rpc) = if let Some(orch_config) = config.orchestrator {
let (tx, rx) = mpsc::unbounded_channel();
let proof_db = SledProofDb::open(&orch_config.proof_db_path)?;
let proof_db_clone = proof_db.clone();
let backend = ProofBackend::new()?;
let input_builder = InputBuilder::new(
state_db.clone(),
bitcoin_client.clone(),
proof_db.clone(),
params.anchor.block,
backend.asm_predicate.clone(),
backend.moho_predicate.clone(),
);
let mut orchestrator = ProofOrchestrator::new(
proof_db,
backend.asm_host,
backend.moho_host,
orch_config,
input_builder,
rx,
);
// ZkVmRemoteProver is !Send (#[async_trait(?Send)]), so the orchestrator
// future cannot be spawned on a multi-threaded runtime directly. We run it
// on a dedicated thread with a single-threaded runtime + LocalSet.
executor.spawn_critical_async_with_shutdown(
"proof_orchestrator",
move |shutdown| async move {
task::spawn_blocking(move || {
let rt = RuntimeBuilder::new_current_thread().enable_all().build()?;
let local = LocalSet::new();
rt.block_on(local.run_until(async move { orchestrator.run(shutdown).await }))
})
.await?
},
);
(Some(tx), Some(proof_db_clone))
} else {
(None, None)
};
// 7. Spawn block watcher as a critical task.
let asm_worker_for_driver = asm_worker.clone();
let bitcoin_config = config.bitcoin.clone();
let bitcoin_client_for_driver = bitcoin_client.clone();
executor.spawn_critical_async_with_shutdown("block_watcher", move |shutdown| {
drive_asm_from_bitcoin(
bitcoin_config,
bitcoin_client_for_driver,
asm_worker_for_driver,
start_height as u64,
proof_tx,
shutdown,
)
});
// 8. Spawn RPC server as a critical task
let rpc_host = config.rpc.host.clone();
let rpc_port = config.rpc.port;
executor.spawn_critical_async_with_shutdown("rpc_server", move |shutdown| {
run_rpc_server(
state_db,
asm_worker,
bitcoin_client,
proof_db_for_rpc,
rpc_host,
rpc_port,
shutdown,
)
});
Ok(())
}
/// Connect to Bitcoin node
async fn connect_bitcoin(config: &BitcoinConfig) -> Result<Client> {
let client = Client::new(
config.rpc_url.clone(),
Auth::UserPass(config.rpc_user.clone(), config.rpc_password.clone()),
None, // timeout
config.retry_count,
config.retry_interval.map(|d| d.as_millis() as u64),
)?;
Ok(client)
}