Skip to content

Commit 105de27

Browse files
tkumor3cptartur
authored andcommitted
change the way of killing cancelled tasks (#874)
<!-- Reference any GitHub issues resolved by this PR --> Closes # ## Introduced changes <!-- A brief description of the changes --> - - ## Breaking changes <!-- List of all breaking changes, if applicable --> ## Checklist <!-- Make sure all of these are complete --> - [x] Linked relevant issue - [x] Updated relevant documentation - [x] Added relevant tests - [x] Performed self-review of the code - [x] Added changes to `CHANGELOG.md` (cherry picked from commit 0947313)
1 parent f39468a commit 105de27

File tree

6 files changed

+57
-22
lines changed

6 files changed

+57
-22
lines changed

crates/forge/src/lib.rs

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use futures::StreamExt;
1212
use rayon::prelude::{IntoParallelRefIterator, ParallelIterator};
1313
use running::blocking_run_from_test;
1414
use serde::Deserialize;
15-
use tokio::sync::mpsc::channel;
15+
use tokio::sync::mpsc::{channel, Sender};
1616

1717
use std::sync::Arc;
1818
use test_case_summary::TestCaseSummary;
@@ -366,7 +366,7 @@ pub async fn run(
366366
.await?;
367367

368368
fuzzing_happened |= was_fuzzed;
369-
summaries.push(summary.clone());
369+
summaries.push(summary);
370370
}
371371

372372
pretty_printing::print_test_summary(&summaries);
@@ -397,6 +397,17 @@ async fn run_tests_from_crate(
397397
let mut tasks = FuturesUnordered::new();
398398
let test_cases = &tests.test_cases;
399399

400+
// Initiate two channels to manage the `--exit-first` flag.
401+
// Owing to `cheatnet` fork's utilization of its own Tokio runtime for RPC requests,
402+
// test execution must occur within a `tokio::spawn_blocking`.
403+
// As `spawn_blocking` can't be prematurely cancelled (refer: https://dtantsur.github.io/rust-openstack/tokio/task/fn.spawn_blocking.html),
404+
// a channel is used to signal the task that test processing is no longer necessary.
405+
let (send, mut rec) = channel(1);
406+
407+
// The second channel serves as a hold point to ensure all tasks complete
408+
// their shutdown procedures before moving forward (more info: https://tokio.rs/tokio/topics/shutdown)
409+
let (send_shut_down, mut rec_shut_down) = channel(1);
410+
400411
for case in test_cases.iter() {
401412
let case_name = case.name.as_str();
402413

@@ -414,6 +425,8 @@ async fn run_tests_from_crate(
414425
runner_config.clone(),
415426
runner_params.clone(),
416427
cancellation_tokens.clone(),
428+
&send,
429+
&send_shut_down,
417430
));
418431
}
419432

@@ -431,6 +444,12 @@ async fn run_tests_from_crate(
431444
results.push(result);
432445
}
433446

447+
rec.close();
448+
449+
// Waiting for things to finish shutting down
450+
drop(send_shut_down);
451+
let _ = rec_shut_down.recv().await;
452+
434453
Ok((
435454
TestCrateSummary {
436455
test_case_summaries: results,
@@ -441,13 +460,16 @@ async fn run_tests_from_crate(
441460
))
442461
}
443462

463+
#[allow(clippy::too_many_arguments)]
444464
fn choose_test_strategy_and_run(
445465
args: Vec<ConcreteTypeId>,
446466
case: Arc<TestCase>,
447467
runner: Arc<SierraCasmRunner>,
448468
runner_config: Arc<RunnerConfig>,
449469
runner_params: Arc<RunnerParams>,
450470
cancellation_tokens: Arc<CancellationTokens>,
471+
send: &Sender<()>,
472+
send_shut_down: &Sender<()>,
451473
) -> JoinHandle<Result<TestCaseSummary>> {
452474
if args.is_empty() {
453475
run_single_test(
@@ -456,6 +478,8 @@ fn choose_test_strategy_and_run(
456478
runner_config,
457479
runner_params,
458480
cancellation_tokens,
481+
send.clone(),
482+
send_shut_down.clone(),
459483
)
460484
} else {
461485
run_with_fuzzing(
@@ -465,6 +489,7 @@ fn choose_test_strategy_and_run(
465489
runner_config,
466490
runner_params,
467491
cancellation_tokens,
492+
send_shut_down.clone(),
468493
)
469494
}
470495
}
@@ -475,6 +500,8 @@ fn run_single_test(
475500
runner_config: Arc<RunnerConfig>,
476501
runner_params: Arc<RunnerParams>,
477502
cancellation_tokens: Arc<CancellationTokens>,
503+
send: Sender<()>,
504+
send_shut_down: Sender<()>,
478505
) -> JoinHandle<Result<TestCaseSummary>> {
479506
let exit_first = runner_config.exit_first;
480507
tokio::task::spawn(async move {
@@ -489,8 +516,8 @@ fn run_single_test(
489516
// one of a test returns Err
490517
Ok(TestCaseSummary::Interrupted { })
491518
},
492-
result = blocking_run_from_test(vec![], case.clone(),runner, runner_config.clone(), runner_params.clone() , None) => {
493-
match result {
519+
result = blocking_run_from_test(vec![], case.clone(),runner, runner_config.clone(), runner_params.clone(), send.clone(), send_shut_down.clone() ) => {
520+
match result? {
494521
Ok(result) => {
495522
if exit_first {
496523
if let TestCaseSummary::Failed { .. } = &result {
@@ -516,10 +543,11 @@ fn run_with_fuzzing(
516543
runner_config: Arc<RunnerConfig>,
517544
runner_params: Arc<RunnerParams>,
518545
cancellation_tokens: Arc<CancellationTokens>,
546+
send_shut_down: Sender<()>,
519547
) -> JoinHandle<Result<TestCaseSummary>> {
520548
tokio::task::spawn(async move {
521549
let token = CancellationToken::new();
522-
550+
let (send, mut rec) = channel(1);
523551
let args = args
524552
.iter()
525553
.map(|arg| {
@@ -540,9 +568,6 @@ fn run_with_fuzzing(
540568
let mut fuzzer = RandomFuzzer::create(fuzzer_seed, fuzzer_runs, &args)?;
541569

542570
let mut tasks = FuturesUnordered::new();
543-
// Pattern in order to waiting for things to finish shutting down
544-
// https://tokio.rs/tokio/topics/shutdown
545-
let (send, mut recv) = channel(1);
546571

547572
for _ in 1..=fuzzer_runs {
548573
let args = fuzzer.next_args();
@@ -556,14 +581,12 @@ fn run_with_fuzzing(
556581
cancellation_tokens.clone(),
557582
token.clone(),
558583
send.clone(),
584+
send_shut_down.clone(),
559585
));
560586
}
561587

562588
let mut results = vec![];
563589

564-
// Graceful Shutdown Pattern
565-
drop(send);
566-
567590
while let Some(task) = tasks.next().await {
568591
let result = task??;
569592
results.push(result.clone());
@@ -576,7 +599,7 @@ fn run_with_fuzzing(
576599
}
577600
}
578601

579-
let _ = recv.recv().await;
602+
rec.close();
580603

581604
let runs = u32::try_from(
582605
results
@@ -627,7 +650,8 @@ fn run_fuzzing_subtest(
627650
runner_params: Arc<RunnerParams>,
628651
cancellation_tokens: Arc<CancellationTokens>,
629652
cancellation_fuzzing_token: CancellationToken,
630-
send: tokio::sync::mpsc::Sender<()>,
653+
send: Sender<()>,
654+
send_shut_down: Sender<()>,
631655
) -> JoinHandle<Result<TestCaseSummary>> {
632656
let c = case.clone();
633657
task::spawn(async move {
@@ -654,9 +678,10 @@ fn run_fuzzing_subtest(
654678
runner,
655679
runner_config.clone(),
656680
runner_params.clone(),
657-
Some(send),
681+
send.clone(),
682+
send_shut_down.clone()
658683
) => {
659-
match result {
684+
match result? {
660685
Ok(result) => {
661686
if let TestCaseSummary::Failed { .. } = &result {
662687
if runner_config.exit_first {

crates/forge/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ fn main_execution() -> Result<bool> {
123123
.max_blocking_threads(cores_approx)
124124
.enable_all()
125125
.build()?;
126+
126127
let all_failed_tests = rt.block_on({
127128
rt.spawn(async move {
128129
let mut all_failed_tests = vec![];

crates/forge/src/running.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use starknet_api::patricia_key;
3535
use starknet_api::transaction::Calldata;
3636
use test_collector::{ForkConfig, TestCase};
3737
use tokio::sync::mpsc::Sender;
38+
use tokio::task::JoinHandle;
3839

3940
use crate::scarb::ForkTarget;
4041
use crate::test_case_summary::TestCaseSummary;
@@ -70,25 +71,31 @@ fn build_hints_dict<'b>(
7071
(hints_dict, string_to_hint)
7172
}
7273

73-
pub(crate) async fn blocking_run_from_test(
74+
pub(crate) fn blocking_run_from_test(
7475
args: Vec<Felt252>,
7576
case: Arc<TestCase>,
7677
runner: Arc<SierraCasmRunner>,
7778
runner_config: Arc<RunnerConfig>,
7879
runner_params: Arc<RunnerParams>,
79-
sender: Option<Sender<()>>,
80-
) -> Result<TestCaseSummary> {
80+
send: Sender<()>,
81+
send_shut_down: Sender<()>,
82+
) -> JoinHandle<Result<TestCaseSummary>> {
8183
tokio::task::spawn_blocking(move || {
84+
// Due to the inability of spawn_blocking to be abruptly cancelled,
85+
// a channel is used to receive information indicating
86+
// that the execution of the task is no longer necessary.
87+
if send.is_closed() {
88+
return Err(anyhow::anyhow!("stop spawn_blocking"));
89+
}
8290
run_test_case(
8391
args,
8492
&case,
8593
&runner,
8694
&runner_config,
8795
&runner_params,
88-
&sender,
96+
&send_shut_down,
8997
)
9098
})
91-
.await?
9299
}
93100

94101
fn build_context() -> EntryPointExecutionContext {
@@ -146,7 +153,7 @@ pub(crate) fn run_test_case(
146153
runner: &SierraCasmRunner,
147154
runner_config: &Arc<RunnerConfig>,
148155
runner_params: &Arc<RunnerParams>,
149-
_sender: &Option<Sender<()>>,
156+
_send_shut_down: &Sender<()>,
150157
) -> Result<TestCaseSummary> {
151158
let available_gas = if let Some(available_gas) = &case.available_gas {
152159
Some(*available_gas)

crates/forge/src/test_crate_summary.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::test_case_summary::TestCaseSummary;
22
use crate::{RunnerStatus, TestCrateType};
33

44
/// Summary of the test run in the file
5-
#[derive(Debug, PartialEq, Clone)]
5+
#[derive(Debug, PartialEq)]
66
pub struct TestCrateSummary {
77
/// Summaries of each test case in the file
88
pub test_case_summaries: Vec<TestCaseSummary>,

crates/forge/tests/integration/common/running_tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use tokio::runtime::Runtime;
1212

1313
pub fn run_test_case(test: &TestCase) -> Vec<TestCrateSummary> {
1414
let rt = Runtime::new().expect("Could not instantiate Runtime");
15+
1516
rt.block_on(run(
1617
&test.path().unwrap(),
1718
&String::from("src"),

crates/forge/tests/integration/setup_fork.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ fn fork_aliased_decorator() {
9494
).as_str());
9595

9696
let rt = Runtime::new().expect("Could not instantiate Runtime");
97+
9798
let result = rt
9899
.block_on(run(
99100
&test.path().unwrap(),

0 commit comments

Comments
 (0)