diff --git a/.github/workflows/nightly-tests.yml b/.github/workflows/nightly-tests.yml index be52c5b84d3..636d0ea0dd9 100644 --- a/.github/workflows/nightly-tests.yml +++ b/.github/workflows/nightly-tests.yml @@ -6,6 +6,11 @@ on: # Run at 8:30 AM UTC every day - cron: '30 8 * * *' workflow_dispatch: # Allow manual triggering + inputs: + branch: + description: 'Branch to test' + required: false + default: 'unstable' concurrency: group: ${{ github.workflow }}-${{ github.ref }} @@ -47,6 +52,8 @@ jobs: fail-fast: false steps: - uses: actions/checkout@v5 + with: + ref: ${{ inputs.branch || 'unstable' }} - name: Get latest version of stable Rust uses: moonrepo/setup-rust@v1 with: @@ -57,28 +64,6 @@ jobs: run: make test-beacon-chain-${{ matrix.fork }} timeout-minutes: 60 - http-api-tests: - name: http-api-tests - needs: setup-matrix - runs-on: 'ubuntu-latest' - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - strategy: - matrix: - fork: ${{ fromJson(needs.setup-matrix.outputs.forks) }} - fail-fast: false - steps: - - uses: actions/checkout@v5 - - name: Get latest version of stable Rust - uses: moonrepo/setup-rust@v1 - with: - channel: stable - cache-target: release - bins: cargo-nextest - - name: Run http_api tests for ${{ matrix.fork }} - run: make test-http-api-${{ matrix.fork }} - timeout-minutes: 60 - op-pool-tests: name: op-pool-tests needs: setup-matrix @@ -91,6 +76,8 @@ jobs: fail-fast: false steps: - uses: actions/checkout@v5 + with: + ref: ${{ inputs.branch || 'unstable' }} - name: Get latest version of stable Rust uses: moonrepo/setup-rust@v1 with: @@ -113,6 +100,8 @@ jobs: fail-fast: false steps: - uses: actions/checkout@v5 + with: + ref: ${{ inputs.branch || 'unstable' }} - name: Get latest version of stable Rust uses: moonrepo/setup-rust@v1 with: diff --git a/validator_client/validator_services/src/sync.rs b/validator_client/validator_services/src/sync.rs index 0f456a70507..ef4e58bb17e 100644 --- a/validator_client/validator_services/src/sync.rs +++ b/validator_client/validator_services/src/sync.rs @@ -1,7 +1,6 @@ use crate::duties_service::{DutiesService, Error, SelectionProofConfig}; use bls::PublicKeyBytes; use eth2::types::SyncCommitteeSelection; -use futures::future::join_all; use futures::stream::{FuturesUnordered, StreamExt}; use logging::crit; use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -608,187 +607,99 @@ pub async fn fill_in_aggregation_proofs() { - Ok(subnet_ids) => subnet_ids, - Err(e) => { - crit!( - "error" = ?e, - "Arithmetic error computing subnet IDs" - ); - continue; - } - }; - - // Construct proof for prior slot. - let proof_slot = slot - 1; - - // Calling the make_sync_selection_proof will return a full selection proof - for &subnet_id in &subnet_ids { - let duties_service = duties_service.clone(); - futures_unordered.push(async move { - let result = - make_sync_selection_proof(&duties_service, duty, proof_slot, subnet_id) - .await; - - result.map(|proof| (duty.validator_index, proof_slot, subnet_id, proof)) - }); - } - } - - while let Some(result) = futures_unordered.next().await { - let Some((validator_index, proof_slot, subnet_id, proof)) = result else { - continue; - }; - let sync_map = duties_service.sync_duties.committees.read(); - let Some(committee_duties) = sync_map.get(&sync_committee_period) else { - debug!("period" = sync_committee_period, "Missing sync duties"); - continue; - }; - - let validators = committee_duties.validators.read(); - - // Check if the validator is an aggregator - match proof.is_aggregator::() { - Ok(true) => { - if let Some(Some(duty)) = validators.get(&validator_index) { - debug!( - validator_index, - "slot" = %proof_slot, - "subcommittee_index" = *subnet_id, - // log full selection proof for debugging - "full selection proof" = ?proof, - "Validator is sync aggregator" - ); - - // Store the proof - duty.aggregation_duties - .proofs - .write() - .insert((proof_slot, subnet_id), proof); - } - } - Ok(false) => {} // Not an aggregator - Err(e) => { - warn!( - validator_index, - %slot, - "error" = ?e, - "Error determining is_aggregator" - ); - } - } - } - } else { - // For non-distributed mode debug!( period = sync_committee_period, %current_slot, %pre_compute_slot, "Calculating sync selection proofs" ); + } - let mut validator_proofs = vec![]; - for (validator_start_slot, duty) in pre_compute_duties { - // Proofs are already known at this slot for this validator. - if slot < *validator_start_slot { + for (validator_start_slot, duty) in pre_compute_duties { + if slot < *validator_start_slot { + continue; + } + let subnet_ids = match duty.subnet_ids::() { + Ok(subnet_ids) => subnet_ids, + Err(e) => { + crit!( + "error" = ?e, + "Arithmetic error computing subnet IDs" + ); continue; } + }; - let subnet_ids = match duty.subnet_ids::() { - Ok(subnet_ids) => subnet_ids, - Err(e) => { - crit!( - error = ?e, - "Arithmetic error computing subnet IDs" - ); - continue; - } - }; - - // Create futures to produce proofs. - let duties_service_ref = &duties_service; - let futures = subnet_ids.iter().map(|subnet_id| async move { - // Construct proof for prior slot. - let proof_slot = slot - 1; + // Construct proof for prior slot. + let proof_slot = slot - 1; - let proof = - make_sync_selection_proof(duties_service_ref, duty, proof_slot, *subnet_id) + // Calling the make_sync_selection_proof will return a full selection proof + for &subnet_id in &subnet_ids { + let duties_service = duties_service.clone(); + futures_unordered.push(async move { + let result = + make_sync_selection_proof(&duties_service, duty, proof_slot, subnet_id) .await; - match proof { - Some(proof) => match proof.is_aggregator::() { - Ok(true) => { - debug!( - validator_index = duty.validator_index, - slot = %proof_slot, - %subnet_id, - "Validator is sync aggregator" - ); - Some(((proof_slot, *subnet_id), proof)) - } - Ok(false) => None, - Err(e) => { - warn!( - pubkey = ?duty.pubkey, - slot = %proof_slot, - error = ?e, - "Error determining is_aggregator" - ); - None - } - }, - - None => None, - } + result.map(|proof| (duty.validator_index, proof_slot, subnet_id, proof)) }); - - // Execute all the futures in parallel, collecting any successful results. - let proofs = join_all(futures) - .await - .into_iter() - .flatten() - .collect::>(); - - validator_proofs.push((duty.validator_index, proofs)); } + } - // Add to global storage (we add regularly so the proofs can be used ASAP). + while let Some(result) = futures_unordered.next().await { + let Some((validator_index, proof_slot, subnet_id, proof)) = result else { + continue; + }; let sync_map = duties_service.sync_duties.committees.read(); let Some(committee_duties) = sync_map.get(&sync_committee_period) else { - debug!(period = sync_committee_period, "Missing sync duties"); + debug!("period" = sync_committee_period, "Missing sync duties"); continue; }; + let validators = committee_duties.validators.read(); - let num_validators_updated = validator_proofs.len(); - for (validator_index, proofs) in validator_proofs { - if let Some(Some(duty)) = validators.get(&validator_index) { - duty.aggregation_duties.proofs.write().extend(proofs); - } else { - debug!( + // Check if the validator is an aggregator + match proof.is_aggregator::() { + Ok(true) => { + if let Some(Some(duty)) = validators.get(&validator_index) { + debug!( + validator_index, + "slot" = %proof_slot, + "subcommittee_index" = *subnet_id, + // log full selection proof for debugging + "full selection proof" = ?proof, + "Validator is sync aggregator" + ); + + // Store the proof + duty.aggregation_duties + .proofs + .write() + .insert((proof_slot, subnet_id), proof); + } else { + debug!( + validator_index, + period = sync_committee_period, + "Missing sync duty to update" + ); + } + } + Ok(false) => {} // Not an aggregator + Err(e) => { + warn!( validator_index, - period = sync_committee_period, - "Missing sync duty to update" + %slot, + "error" = ?e, + "Error determining is_aggregator" ); } } - - if num_validators_updated > 0 { - debug!( - %slot, - updated_validators = num_validators_updated, - "Finished computing sync selection proofs" - ); - } } } }