Skip to content

Commit dd1ce4f

Browse files
sapinbZk2u
andauthored
Feat/setup progress log (#172)
* add progess in rpc status * add logs on handle_receive_garbling_table failure cases * fix: create new txn after receiving tables in handle_receive_garbling_table * feat: add RetryEvalState wrapper to auto-retry evaluator reads on FDB txn timeout * refactor: rename _with_retry helpers, use RetryEvalState in begin_table_commitment * refactor: drop RetryEvalState --------- Co-authored-by: azz <azz@alpenlabs.io>
1 parent 6191378 commit dd1ce4f

2 files changed

Lines changed: 118 additions & 22 deletions

File tree

crates/job/executors/src/evaluator.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,11 +249,15 @@ pub(crate) async fn handle_receive_garbling_table<SP: StorageProvider, TS: Table
249249

250250
let eval_state = match ctx.storage.evaluator_state(peer_id).await {
251251
Ok(state) => state,
252-
Err(_) => return HandlerOutcome::Retry,
252+
Err(e) => {
253+
warn!(%peer_id, ?e, "failed to load evaluator state for receive_garbling_table");
254+
return HandlerOutcome::Retry;
255+
}
253256
};
254257

255258
// Resolve the commitment → circuit index from the evaluator root state.
256259
let Some(root_state) = eval_state.get_root_state().await.ok().flatten() else {
260+
warn!(%peer_id, "root state not available for receive_garbling_table");
257261
return HandlerOutcome::Retry;
258262
};
259263
let (eval_indices, eval_commitments) = match &root_state.step {
@@ -262,14 +266,18 @@ pub(crate) async fn handle_receive_garbling_table<SP: StorageProvider, TS: Table
262266
eval_commitments,
263267
..
264268
} => (*eval_indices, eval_commitments.clone()),
265-
_ => return HandlerOutcome::Retry,
269+
_ => {
270+
warn!(%peer_id, step = ?root_state.step, "unexpected step in receive_garbling_table");
271+
return HandlerOutcome::Retry;
272+
}
266273
};
267274

268275
let Some(pos) = eval_commitments
269276
.iter()
270277
.position(|c| *c == expected_commitment)
271278
else {
272279
// Commitment not found — stale action or state mismatch.
280+
warn!(%peer_id, "commitment not found in eval_commitments for receive_garbling_table");
273281
return HandlerOutcome::Retry;
274282
};
275283
let index = eval_indices[pos];
@@ -286,6 +294,7 @@ pub(crate) async fn handle_receive_garbling_table<SP: StorageProvider, TS: Table
286294
.await;
287295

288296
let Ok(expectation) = expectation else {
297+
warn!(%peer_id, "bulk receiver registration failed for receive_garbling_table");
289298
return HandlerOutcome::Retry;
290299
};
291300

@@ -298,6 +307,7 @@ pub(crate) async fn handle_receive_garbling_table<SP: StorageProvider, TS: Table
298307

299308
// Wait for the garbler to open the stream.
300309
let Ok(mut stream) = expectation.recv().await else {
310+
warn!(%peer_id, "bulk stream receive failed for receive_garbling_table");
301311
return HandlerOutcome::Retry;
302312
};
303313

@@ -312,6 +322,7 @@ pub(crate) async fn handle_receive_garbling_table<SP: StorageProvider, TS: Table
312322
};
313323
let writer = ctx.table_store.create(&table_id).await;
314324
let Ok(mut writer) = writer else {
325+
warn!(%peer_id, "table writer creation failed for receive_garbling_table");
315326
return HandlerOutcome::Retry;
316327
};
317328

@@ -344,6 +355,7 @@ pub(crate) async fn handle_receive_garbling_table<SP: StorageProvider, TS: Table
344355
if !ct_part.is_empty() {
345356
ct_hasher.update(ct_part);
346357
if writer.write_ciphertext(ct_part).await.is_err() {
358+
error!(%peer_id, "ciphertext write failed (translation overflow) for receive_garbling_table");
347359
let _ = ctx.table_store.delete(&table_id).await;
348360
return HandlerOutcome::Retry;
349361
}
@@ -352,6 +364,7 @@ pub(crate) async fn handle_receive_garbling_table<SP: StorageProvider, TS: Table
352364
// All translation received — remaining data is ciphertext.
353365
ct_hasher.update(&chunk);
354366
if writer.write_ciphertext(&chunk).await.is_err() {
367+
error!(%peer_id, "ciphertext write failed for receive_garbling_table");
355368
let _ = ctx.table_store.delete(&table_id).await;
356369
return HandlerOutcome::Retry;
357370
}
@@ -360,24 +373,29 @@ pub(crate) async fn handle_receive_garbling_table<SP: StorageProvider, TS: Table
360373

361374
// Verify we received enough translation data.
362375
if translation_remaining > 0 {
376+
error!(%peer_id, translation_remaining, "incomplete translation data for receive_garbling_table");
363377
let _ = ctx.table_store.delete(&table_id).await;
364378
return HandlerOutcome::Retry;
365379
}
366380

367381
let translate_hash = translate_hasher.finalize();
368382
let ct_hash = ct_hasher.finalize();
383+
369384
// Load metadata from evaluator state. These were stored when the garbler's
370385
// CommitMsgHeader (aes keys, public S) and ChallengeResponseMsgHeader
371386
// (output label ciphertexts) were processed by the STF.
372387
let Some(aes_key) = eval_state.get_aes128_key(index).await.ok().flatten() else {
388+
warn!(%peer_id, ?index, "aes128_key not available for receive_garbling_table");
373389
let _ = ctx.table_store.delete(&table_id).await;
374390
return HandlerOutcome::Retry;
375391
};
376392
let Some(public_s) = eval_state.get_public_s(index).await.ok().flatten() else {
393+
warn!(%peer_id, ?index, "public_s not available for receive_garbling_table");
377394
let _ = ctx.table_store.delete(&table_id).await;
378395
return HandlerOutcome::Retry;
379396
};
380397
let Some(output_label_ct) = eval_state.get_output_label_ct(index).await.ok().flatten() else {
398+
warn!(%peer_id, ?index, "output_label_ct not available for receive_garbling_table");
381399
let _ = ctx.table_store.delete(&table_id).await;
382400
return HandlerOutcome::Retry;
383401
};
@@ -387,6 +405,7 @@ pub(crate) async fn handle_receive_garbling_table<SP: StorageProvider, TS: Table
387405
.ok()
388406
.flatten()
389407
else {
408+
warn!(%peer_id, ?index, "constant_zero_label not available for receive_garbling_table");
390409
let _ = ctx.table_store.delete(&table_id).await;
391410
return HandlerOutcome::Retry;
392411
};
@@ -396,13 +415,15 @@ pub(crate) async fn handle_receive_garbling_table<SP: StorageProvider, TS: Table
396415
.ok()
397416
.flatten()
398417
else {
418+
warn!(%peer_id, ?index, "constant_one_label not available for receive_garbling_table");
399419
let _ = ctx.table_store.delete(&table_id).await;
400420
return HandlerOutcome::Retry;
401421
};
402422
// Verify the received data matches the expected commitment.
403423
let params_hash = hash_garbling_params(&aes_key, &public_s, &constant_one, &constant_zero);
404424
let computed = compute_commitment(&ct_hash, &translate_hash, &output_label_ct, &params_hash);
405425
if computed != expected_commitment {
426+
error!(%peer_id, ?index, "commitment mismatch in receive_garbling_table");
406427
let _ = ctx.table_store.delete(&table_id).await;
407428
return HandlerOutcome::Retry;
408429
}
@@ -413,6 +434,7 @@ pub(crate) async fn handle_receive_garbling_table<SP: StorageProvider, TS: Table
413434
};
414435

415436
if writer.finish(&translation_buf, metadata).await.is_err() {
437+
error!(%peer_id, ?index, "table writer finish failed for receive_garbling_table");
416438
let _ = ctx.table_store.delete(&table_id).await;
417439
return HandlerOutcome::Retry;
418440
}

crates/rpc/service/src/types.rs

Lines changed: 94 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
66
use bitcoin::{XOnlyPublicKey, secp256k1::schnorr::Signature as SchnorrSignature};
77
use mosaic_cac_types::{
8-
DepositId, DepositInputs, SetupInputs, Sighashes, WithdrawalInputs,
8+
DepositId, DepositInputs, HeapArray, SetupInputs, Sighashes, WithdrawalInputs,
99
state_machine::{
1010
Role,
1111
evaluator::{self},
@@ -129,20 +129,62 @@ pub struct EvaluatorWithdrawalData {
129129

130130
// --- From impls: garbler/evaluator state -> service domain types ---
131131

132+
/// Format progress of a `HeapArray<bool, N>` as `"done/total"`.
133+
fn arr_progress<const N: usize>(arr: &HeapArray<bool, N>) -> String {
134+
format!("{}/{}", arr.count_ones(), arr.len())
135+
}
136+
137+
/// Format progress of a single `bool` flag as `"0/1"` or `"1/1"`.
138+
fn bool_progress(b: bool) -> &'static str {
139+
if b { "1/1" } else { "0/1" }
140+
}
141+
132142
impl From<&garbler::Step> for TablesetStatus {
133143
fn from(step: &garbler::Step) -> Self {
134144
use garbler::Step::*;
135145
match step {
136-
Uninit
137-
| GeneratingPolynomialCommitments { .. }
138-
| GeneratingShares { .. }
139-
| GeneratingTableCommitments { .. }
140-
| SendingCommit { .. }
141-
| WaitingForChallenge
142-
| SendingChallengeResponse { .. }
143-
| TransferringGarblingTables { .. } => TablesetStatus::Incomplete {
146+
Uninit | WaitingForChallenge => TablesetStatus::Incomplete {
144147
details: step.step_name().into(),
145148
},
149+
GeneratingPolynomialCommitments { inputs, output } => TablesetStatus::Incomplete {
150+
details: format!(
151+
"{} (inputs: {}, output: {})",
152+
step.step_name(),
153+
arr_progress(inputs),
154+
bool_progress(*output),
155+
),
156+
},
157+
GeneratingShares { generated } => TablesetStatus::Incomplete {
158+
details: format!("{} ({})", step.step_name(), arr_progress(generated)),
159+
},
160+
GeneratingTableCommitments { generated, .. } => TablesetStatus::Incomplete {
161+
details: format!("{} ({})", step.step_name(), arr_progress(generated)),
162+
},
163+
SendingCommit {
164+
header_acked,
165+
chunk_acked,
166+
} => TablesetStatus::Incomplete {
167+
details: format!(
168+
"{} (header: {}, chunks: {})",
169+
step.step_name(),
170+
bool_progress(*header_acked),
171+
arr_progress(chunk_acked),
172+
),
173+
},
174+
SendingChallengeResponse {
175+
header_acked,
176+
chunk_acked,
177+
} => TablesetStatus::Incomplete {
178+
details: format!(
179+
"{} (header: {}, chunks: {})",
180+
step.step_name(),
181+
bool_progress(*header_acked),
182+
arr_progress(chunk_acked),
183+
),
184+
},
185+
TransferringGarblingTables { transferred, .. } => TablesetStatus::Incomplete {
186+
details: format!("{} ({})", step.step_name(), arr_progress(transferred)),
187+
},
146188
SetupComplete => TablesetStatus::SetupComplete,
147189
CompletingAdaptors { deposit_id } => TablesetStatus::Contest {
148190
deposit_id: *deposit_id,
@@ -162,14 +204,34 @@ impl From<&evaluator::Step> for TablesetStatus {
162204
fn from(step: &evaluator::Step) -> Self {
163205
use evaluator::Step::*;
164206
match step {
165-
Uninit
166-
| WaitingForCommit { .. }
167-
| WaitingForChallengeResponse { .. }
168-
| VerifyingOpenedInputShares
169-
| VerifyingTableCommitments { .. }
170-
| ReceivingGarblingTables { .. } => TablesetStatus::Incomplete {
207+
Uninit | VerifyingOpenedInputShares => TablesetStatus::Incomplete {
171208
details: step.step_name().into(),
172209
},
210+
WaitingForCommit { header, chunks } => TablesetStatus::Incomplete {
211+
details: format!(
212+
"{} (header: {}, chunks: {})",
213+
step.step_name(),
214+
bool_progress(*header),
215+
arr_progress(chunks),
216+
),
217+
},
218+
WaitingForChallengeResponse {
219+
header,
220+
remaining_chunks,
221+
} => TablesetStatus::Incomplete {
222+
details: format!(
223+
"{} (header: {}, chunks: {})",
224+
step.step_name(),
225+
bool_progress(*header),
226+
arr_progress(remaining_chunks),
227+
),
228+
},
229+
VerifyingTableCommitments { verified, .. } => TablesetStatus::Incomplete {
230+
details: format!("{} ({})", step.step_name(), arr_progress(verified)),
231+
},
232+
ReceivingGarblingTables { received, .. } => TablesetStatus::Incomplete {
233+
details: format!("{} ({})", step.step_name(), arr_progress(received)),
234+
},
173235
SetupComplete => TablesetStatus::SetupComplete,
174236
EvaluatingTables { deposit_id, .. } => TablesetStatus::Contest {
175237
deposit_id: *deposit_id,
@@ -191,8 +253,10 @@ impl From<&evaluator::Step> for TablesetStatus {
191253
impl From<garbler::DepositState> for DepositStatus {
192254
fn from(deposit: garbler::DepositState) -> Self {
193255
match deposit.step {
194-
garbler::DepositStep::WaitingForAdaptors { .. }
195-
| garbler::DepositStep::VerifyingAdaptors => DepositStatus::Incomplete {
256+
garbler::DepositStep::WaitingForAdaptors { ref chunks } => DepositStatus::Incomplete {
257+
details: format!("{} ({})", deposit.step.step_name(), arr_progress(chunks),),
258+
},
259+
garbler::DepositStep::VerifyingAdaptors => DepositStatus::Incomplete {
196260
details: deposit.step.step_name().into(),
197261
},
198262
garbler::DepositStep::DepositReady => DepositStatus::Ready,
@@ -205,9 +269,19 @@ impl From<garbler::DepositState> for DepositStatus {
205269
impl From<evaluator::DepositState> for DepositStatus {
206270
fn from(deposit: evaluator::DepositState) -> Self {
207271
match deposit.step {
208-
evaluator::DepositStep::GeneratingAdaptors { .. }
209-
| evaluator::DepositStep::SendingAdaptors { .. } => DepositStatus::Incomplete {
210-
details: deposit.step.step_name().into(),
272+
evaluator::DepositStep::GeneratingAdaptors {
273+
deposit: ref dep,
274+
ref withdrawal_chunks,
275+
} => DepositStatus::Incomplete {
276+
details: format!(
277+
"{} (deposit: {}, withdrawal: {})",
278+
deposit.step.step_name(),
279+
bool_progress(*dep),
280+
arr_progress(withdrawal_chunks),
281+
),
282+
},
283+
evaluator::DepositStep::SendingAdaptors { ref acked } => DepositStatus::Incomplete {
284+
details: format!("{} ({})", deposit.step.step_name(), arr_progress(acked),),
211285
},
212286
evaluator::DepositStep::DepositReady => DepositStatus::Ready,
213287
evaluator::DepositStep::WithdrawnUndisputed => DepositStatus::UncontestedWithdrawal,

0 commit comments

Comments
 (0)