-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathservice.rs
More file actions
263 lines (226 loc) · 9.31 KB
/
service.rs
File metadata and controls
263 lines (226 loc) · 9.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
//! Service framework integration for ASM.
use std::{marker, thread::sleep, time::Duration};
use bitcoin::hashes::Hash;
use serde::{Deserialize, Serialize};
use strata_asm_common::AsmSpec;
use strata_btc_types::BlockHashExt;
use strata_identifiers::{Buf32, L1BlockCommitment, L1BlockId};
use strata_service::{Response, Service, SyncService};
use tracing::*;
use crate::{
AsmState, AsmWorkerServiceState, WorkerError, WorkerResult, message::AsmWorkerMessage,
traits::WorkerContext,
};
/// ASM service implementation using the service framework.
#[derive(Debug)]
pub struct AsmWorkerService<W, S> {
_phantom: marker::PhantomData<(W, S)>,
}
impl<W, S> Service for AsmWorkerService<W, S>
where
W: WorkerContext + Send + Sync + 'static,
S: AsmSpec + Send + Sync + 'static,
{
type State = AsmWorkerServiceState<W, S>;
type Msg = AsmWorkerMessage;
type Status = AsmWorkerStatus;
fn get_status(state: &Self::State) -> Self::Status {
AsmWorkerStatus {
is_initialized: state.initialized,
cur_block: state.blkid,
cur_state: state.anchor.clone(),
}
}
}
impl<W, S> SyncService for AsmWorkerService<W, S>
where
W: WorkerContext + Send + Sync + 'static,
S: AsmSpec + Send + Sync + 'static,
{
fn on_launch(state: &mut AsmWorkerServiceState<W, S>) -> anyhow::Result<()> {
Ok(state.load_latest_or_create_genesis()?)
}
// TODO(STR-1928): add tests.
fn process_input(
state: &mut AsmWorkerServiceState<W, S>,
input: AsmWorkerMessage,
) -> anyhow::Result<Response> {
match input {
AsmWorkerMessage::SubmitBlock(incoming_block, completion) => {
let result = process_block(state, &incoming_block);
let should_exit = result.is_err();
completion.send_blocking(result);
if should_exit {
return Ok(Response::ShouldExit);
}
}
}
Ok(Response::Continue)
}
}
/// Processes an L1 block through the ASM state transition.
fn process_block<W, S>(
state: &mut AsmWorkerServiceState<W, S>,
incoming_block: &L1BlockCommitment,
) -> crate::WorkerResult<()>
where
W: WorkerContext + Send + Sync + 'static,
S: AsmSpec + Send + Sync + 'static,
{
let ctx = &state.context;
// Handle pre-genesis: if the block is before genesis we don't care about it.
let genesis_height = state.asm_params.anchor.block.height();
let height = incoming_block.height();
if height < genesis_height {
warn!(height, "ignoring unexpected L1 block before genesis");
return Ok(());
}
// Traverse back the chain of l1 blocks until we find an l1 block which has AnchorState.
// Remember all the blocks along the way and pass it (in the reverse order) to process.
let pivot_span = debug_span!("asm.pivot_lookup",
target_height = height,
target_block = %incoming_block.blkid()
);
let pivot_span_guard = pivot_span.enter();
let mut skipped_blocks = vec![];
let mut pivot_block = *incoming_block;
let mut pivot_anchor = ctx.get_anchor_state(&pivot_block);
while pivot_anchor.is_err() && pivot_block.height() >= genesis_height {
let block = get_l1_block_with_retry(ctx, pivot_block.blkid())?;
let parent_height = pivot_block.height() - 1;
let parent_block_id =
L1BlockCommitment::new(parent_height, block.header.prev_blockhash.to_l1_block_id());
// Push the unprocessed block.
skipped_blocks.push((block, pivot_block));
// Update the loop state.
pivot_anchor = ctx.get_anchor_state(&parent_block_id);
pivot_block = parent_block_id;
}
// We reached the height before genesis (while traversing), but didn't find genesis state.
if pivot_block.height() < genesis_height {
warn!("ASM hasn't found pivot anchor state at genesis.");
return Err(crate::WorkerError::MissingGenesisState);
}
// Found pivot anchor state - our starting point.
info!(%pivot_block,
skipped_blocks = skipped_blocks.len(),
"ASM found pivot anchor state"
);
// Drop pivot span guard before next phase
drop(pivot_span_guard);
// Special handling for genesis block - its anchor state was created during init
// but its manifest wasn't (because Bitcoin block wasn't available yet).
// We only store the manifest to L1 (for data consumers) but do NOT append it
// to the external MMR, since the internal compact MMR in AnchorState starts
// empty with offset = genesis_height + 1. Appending genesis here would shift
// all external MMR indices by 1 relative to the internal accumulator.
// Idempotency: skip if the genesis manifest already exists in the L1 database.
if pivot_block.height() == genesis_height && !ctx.has_l1_manifest(pivot_block.blkid())? {
let genesis_span = info_span!("asm.genesis_manifest",
pivot_height = pivot_block.height(),
pivot_block = %pivot_block.blkid()
);
let _genesis_guard = genesis_span.enter();
// Fetch the genesis block (should work now since L1 reader processed it)
let genesis_block = ctx.get_l1_block(pivot_block.blkid())?;
// Compute wtxids_root and create manifest
let wtxids_root: Buf32 = genesis_block
.witness_root()
.map(|root| root.as_raw_hash().to_byte_array())
.unwrap_or_else(|| {
genesis_block
.header
.merkle_root
.as_raw_hash()
.to_byte_array()
})
.into();
let genesis_manifest = strata_asm_common::AsmManifest::new(
pivot_block.height(),
*pivot_block.blkid(),
wtxids_root.into(),
vec![], // TODO: this is not supposed to be empty right?
);
ctx.store_l1_manifest(genesis_manifest)?;
info!(%pivot_block, "Created genesis manifest");
} // genesis_span drops here
state.update_anchor_state(pivot_anchor.unwrap(), pivot_block);
// Process the whole chain of unprocessed blocks, starting from older blocks till
// incoming_block.
for (block, block_id) in skipped_blocks.iter().rev() {
let transition_span = debug_span!("asm.block_transition",
height = block_id.height(),
block_id = %block_id.blkid()
);
let _transition_guard = transition_span.enter();
info!(%block_id, "ASM transition attempt");
let (asm_stf_out, aux_data) = state.transition(block)?;
let storage_span = debug_span!("asm.manifest_storage");
let _storage_guard = storage_span.enter();
// Extract manifest and compute its hash
let manifest = asm_stf_out.manifest.clone();
let manifest_hash = manifest.compute_hash();
// Store manifest to L1 database (for chaintsn and other consumers)
state.context.store_l1_manifest(manifest)?;
// Append manifest hash to MMR database
let leaf_index = state.context.append_manifest_to_mmr(manifest_hash.into())?;
// Store auxiliary data for prover consumption
state.context.store_aux_data(block_id, &aux_data)?;
let new_state = AsmState::from_output(asm_stf_out);
// Store and update anchor.
state.context.store_anchor_state(block_id, &new_state)?;
state.update_anchor_state(new_state, *block_id);
info!(%block_id, %height, leaf_index, "ASM transition complete, manifest and state stored");
} // transition_span drops here
Ok(())
}
/// Fetches an L1 block, retrying on transient [`WorkerError::MissingL1Block`] errors.
///
/// The L1 reader may notify the ASM worker before the block data is fully
/// available (e.g. canonical-chain DB write hasn't propagated yet, or the
/// Bitcoin RPC times out under load). This bridges the gap with exponential
/// backoff: 200 ms base, 1.5x growth, 2 s cap, 10 retries (~10 s total).
fn get_l1_block_with_retry<W: WorkerContext>(
ctx: &W,
blockid: &L1BlockId,
) -> WorkerResult<bitcoin::Block> {
const MAX_RETRIES: u32 = 10;
const BASE_DELAY_MS: u64 = 200;
const MAX_DELAY_MS: u64 = 2000;
let mut delay_ms = BASE_DELAY_MS;
for attempt in 0..=MAX_RETRIES {
match ctx.get_l1_block(blockid) {
Ok(block) => return Ok(block),
Err(WorkerError::MissingL1Block(id)) if attempt < MAX_RETRIES => {
warn!(
attempt,
?id,
delay_ms,
"L1 block not yet available, retrying"
);
sleep(Duration::from_millis(delay_ms));
delay_ms = (delay_ms * 3 / 2).min(MAX_DELAY_MS);
}
Err(e) => return Err(e),
}
}
unreachable!()
}
/// Status information for the ASM worker service.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AsmWorkerStatus {
pub is_initialized: bool,
pub cur_block: Option<L1BlockCommitment>,
pub cur_state: Option<AsmState>,
}
impl AsmWorkerStatus {
/// Get the logs from the current ASM state.
///
/// Returns an empty slice if the state is not initialized.
pub fn logs(&self) -> &[strata_asm_common::AsmLogEntry] {
self.cur_state
.as_ref()
.map(|s| s.logs().as_slice())
.unwrap_or(&[])
}
}