Skip to content

Commit 9ffc6cc

Browse files
committed
Update tests
1 parent 39b7d2b commit 9ffc6cc

5 files changed

Lines changed: 115 additions & 36 deletions

File tree

rust/otap-dataflow/crates/channel/src/mpmc.rs

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ mod tests {
237237
let rt = create_test_runtime();
238238
let local = tokio::task::LocalSet::new();
239239

240-
_ = local.spawn_local(async {
240+
let handle = local.spawn_local(async {
241241
let (tx, rx) = Channel::new(NonZeroUsize::new(2).unwrap());
242242

243243
// Test send and receive
@@ -253,14 +253,15 @@ mod tests {
253253
});
254254

255255
rt.block_on(local);
256+
rt.block_on(handle).expect("Test task failed");
256257
}
257258

258259
#[test]
259260
fn test_channel_capacity() {
260261
let rt = create_test_runtime();
261262
let local = tokio::task::LocalSet::new();
262263

263-
_ = local.spawn_local(async {
264+
let handle = local.spawn_local(async {
264265
let (tx, _rx) = Channel::new(NonZeroUsize::new(1).unwrap());
265266

266267
// First send should succeed
@@ -275,14 +276,15 @@ mod tests {
275276
});
276277

277278
rt.block_on(local);
279+
rt.block_on(handle).expect("Test task failed");
278280
}
279281

280282
#[test]
281283
fn test_multiple_producers() {
282284
let rt = create_test_runtime();
283285
let local = tokio::task::LocalSet::new();
284286

285-
_ = local.spawn_local(async {
287+
let handle = local.spawn_local(async {
286288
let (tx1, rx) = Channel::new(NonZeroUsize::new(4).unwrap());
287289
let tx2 = tx1.clone();
288290

@@ -298,6 +300,7 @@ mod tests {
298300
});
299301

300302
rt.block_on(local);
303+
rt.block_on(handle).expect("Test task failed");
301304
}
302305

303306
#[test]
@@ -310,20 +313,23 @@ mod tests {
310313
// Shared state to track all received values
311314
let all_received = Rc::new(RefCell::new(Vec::new()));
312315

316+
let mut handles = vec![];
317+
313318
for i in 1..=3 {
314319
let received = all_received.clone();
315320
let rx = rx.clone();
316-
_ = local.spawn_local(async move {
321+
let handle = local.spawn_local(async move {
317322
while let Ok(value) = rx.recv().await {
318323
println!("Receiver {i}: Received value {value}");
319324
received.borrow_mut().push(value);
320325
}
321326
});
327+
handles.push(handle);
322328
}
323329

324330
let msg_to_send_count = 10;
325331

326-
_ = local.spawn_local(async move {
332+
let handle = local.spawn_local(async move {
327333
// Send several values
328334
for i in 1..=msg_to_send_count {
329335
let result = tx.send_async(i).await;
@@ -333,8 +339,12 @@ mod tests {
333339
// Close the channel to let receivers finish
334340
tx.close();
335341
});
342+
handles.push(handle);
336343

337344
rt.block_on(local);
345+
for handle in handles {
346+
rt.block_on(handle).expect("Test task failed");
347+
}
338348

339349
// Verify that all values were received exactly once
340350
let all_values = all_received.borrow();
@@ -359,7 +369,7 @@ mod tests {
359369
let rt = create_test_runtime();
360370
let local = tokio::task::LocalSet::new();
361371

362-
_ = local.spawn_local(async {
372+
let handle = local.spawn_local(async {
363373
let (tx, rx) = Channel::new(NonZeroUsize::new(1).unwrap());
364374
let receive_order = Rc::new(RefCell::new(Vec::new()));
365375

@@ -400,14 +410,15 @@ mod tests {
400410
});
401411

402412
rt.block_on(local);
413+
rt.block_on(handle).expect("Test task failed");
403414
}
404415

405416
#[test]
406417
fn test_producer_fairness() {
407418
let rt = create_test_runtime();
408419
let local = tokio::task::LocalSet::new();
409420

410-
_ = local.spawn_local(async {
421+
let handle = local.spawn_local(async {
411422
let (tx, rx) = Channel::new(NonZeroUsize::new(1).unwrap());
412423
let send_order = Rc::new(RefCell::new(Vec::new()));
413424

@@ -455,14 +466,15 @@ mod tests {
455466
});
456467

457468
rt.block_on(local);
469+
rt.block_on(handle).expect("Test task failed");
458470
}
459471

460472
#[test]
461473
fn test_mixed_operations() {
462474
let rt = create_test_runtime();
463475
let local = tokio::task::LocalSet::new();
464476

465-
_ = local.spawn_local(async {
477+
let handle = local.spawn_local(async {
466478
let (tx, rx) = Channel::new(NonZeroUsize::new(2).unwrap());
467479

468480
// Mix of sync and async sends
@@ -477,14 +489,15 @@ mod tests {
477489
});
478490

479491
rt.block_on(local);
492+
rt.block_on(handle).expect("Test task failed");
480493
}
481494

482495
#[test]
483496
fn test_receiver_drop() {
484497
let rt = create_test_runtime();
485498
let local = tokio::task::LocalSet::new();
486499

487-
_ = local.spawn_local(async {
500+
let handle = local.spawn_local(async {
488501
let (tx, rx) = Channel::new(NonZeroUsize::new(2).unwrap());
489502
let result = tx.send(1);
490503
assert!(result.is_ok());
@@ -501,14 +514,15 @@ mod tests {
501514
});
502515

503516
rt.block_on(local);
517+
rt.block_on(handle).expect("Test task failed");
504518
}
505519

506520
#[test]
507521
fn test_complex_receiver_drop() {
508522
let rt = create_test_runtime();
509523
let local = tokio::task::LocalSet::new();
510524

511-
_ = local.spawn_local(async {
525+
let handle = local.spawn_local(async {
512526
let (tx, rx) = Channel::new(NonZeroUsize::new(2).unwrap());
513527

514528
// Create multiple receivers
@@ -545,14 +559,15 @@ mod tests {
545559
});
546560

547561
rt.block_on(local);
562+
rt.block_on(handle).expect("Test task failed");
548563
}
549564

550565
#[test]
551566
fn test_async_send_receive() {
552567
let rt = create_test_runtime();
553568
let local = tokio::task::LocalSet::new();
554569

555-
_ = local.spawn_local(async {
570+
let handle = local.spawn_local(async {
556571
let (tx, rx) = Channel::new(NonZeroUsize::new(1).unwrap());
557572
let received = Rc::new(RefCell::new(vec![]));
558573
let received_clone = received.clone();
@@ -578,14 +593,15 @@ mod tests {
578593
});
579594

580595
rt.block_on(local);
596+
rt.block_on(handle).expect("Test task failed");
581597
}
582598

583599
#[test]
584600
fn test_channel_closing() {
585601
let rt = create_test_runtime();
586602
let local = tokio::task::LocalSet::new();
587603

588-
_ = local.spawn_local(async {
604+
let handle = local.spawn_local(async {
589605
let (tx, rx) = Channel::new(NonZeroUsize::new(1).unwrap());
590606

591607
// Send a value
@@ -609,14 +625,15 @@ mod tests {
609625
});
610626

611627
rt.block_on(local);
628+
rt.block_on(handle).expect("Test task failed");
612629
}
613630

614631
#[test]
615632
fn test_sender_drop() {
616633
let rt = create_test_runtime();
617634
let local = tokio::task::LocalSet::new();
618635

619-
_ = local.spawn_local(async {
636+
let handle = local.spawn_local(async {
620637
let (tx, rx) = Channel::new(NonZeroUsize::new(1).unwrap());
621638

622639
let result = tx.send(1);
@@ -631,17 +648,18 @@ mod tests {
631648
});
632649

633650
rt.block_on(local);
651+
rt.block_on(handle).expect("Test task failed");
634652
}
635653

636654
#[test]
637655
fn test_error_propagation() {
638656
let rt = create_test_runtime();
639657
let local = tokio::task::LocalSet::new();
640658

641-
_ = local.spawn_local(async {
659+
let handle = local.spawn_local(async {
642660
// Test 1: SendError::Full propagation
643661
{
644-
let (tx, _) = Channel::new(NonZeroUsize::new(1).unwrap());
662+
let (tx, _rx) = Channel::new(NonZeroUsize::new(1).unwrap());
645663

646664
// Fill the channel
647665
let result = tx.send(1);
@@ -724,14 +742,15 @@ mod tests {
724742
});
725743

726744
rt.block_on(local);
745+
rt.block_on(handle).expect("Test task failed");
727746
}
728747

729748
#[test]
730749
fn test_backpressure() {
731750
let rt = create_test_runtime();
732751
let local = tokio::task::LocalSet::new();
733752

734-
_ = local.spawn_local(async {
753+
let handle = local.spawn_local(async {
735754
let (tx, rx) = Channel::new(NonZeroUsize::new(1).unwrap());
736755
let send_completed = Rc::new(RefCell::new(false));
737756
let send_completed_clone = send_completed.clone();
@@ -763,5 +782,6 @@ mod tests {
763782
});
764783

765784
rt.block_on(local);
785+
rt.block_on(handle).expect("Test task failed");
766786
}
767787
}

rust/otap-dataflow/crates/channel/src/mpsc.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -240,12 +240,13 @@ mod tests {
240240
let rt = create_test_runtime();
241241
let local = tokio::task::LocalSet::new();
242242

243-
_ = local.spawn_local(async {
243+
let handle = local.spawn_local(async {
244244
let (tx, rx) = Channel::new(2);
245245

246246
// Test send and receive
247247
let result = tx.send(1);
248248
assert!(result.is_ok());
249+
249250
let result = tx.send(2);
250251
assert!(result.is_ok());
251252
assert_eq!(rx.try_recv().unwrap(), 1);
@@ -256,14 +257,15 @@ mod tests {
256257
});
257258

258259
rt.block_on(local);
260+
rt.block_on(handle).expect("Test task failed");
259261
}
260262

261263
#[test]
262264
fn test_channel_capacity() {
263265
let rt = create_test_runtime();
264266
let local = tokio::task::LocalSet::new();
265267

266-
_ = local.spawn_local(async {
268+
let handle = local.spawn_local(async {
267269
let (tx, _rx) = Channel::new(1);
268270

269271
// First send should succeed
@@ -278,14 +280,15 @@ mod tests {
278280
});
279281

280282
rt.block_on(local);
283+
rt.block_on(handle).expect("Test task failed");
281284
}
282285

283286
#[test]
284287
fn test_multiple_producers() {
285288
let rt = create_test_runtime();
286289
let local = tokio::task::LocalSet::new();
287290

288-
_ = local.spawn_local(async {
291+
let handle = local.spawn_local(async {
289292
let (tx1, rx) = Channel::new(4);
290293
let tx2 = tx1.clone();
291294

@@ -301,14 +304,15 @@ mod tests {
301304
});
302305

303306
rt.block_on(local);
307+
rt.block_on(handle).expect("Test task failed");
304308
}
305309

306310
#[test]
307311
fn test_async_send_receive() {
308312
let rt = create_test_runtime();
309313
let local = tokio::task::LocalSet::new();
310314

311-
_ = local.spawn_local(async {
315+
let handle = local.spawn_local(async {
312316
let (tx, rx) = Channel::new(1);
313317
let received = Rc::new(RefCell::new(vec![]));
314318
let received_clone = received.clone();
@@ -334,14 +338,15 @@ mod tests {
334338
});
335339

336340
rt.block_on(local);
341+
rt.block_on(handle).expect("Test task failed");
337342
}
338343

339344
#[test]
340345
fn test_channel_closing() {
341346
let rt = create_test_runtime();
342347
let local = tokio::task::LocalSet::new();
343348

344-
_ = local.spawn_local(async {
349+
let handle = local.spawn_local(async {
345350
let (tx, rx) = Channel::new(1);
346351

347352
// Send a value
@@ -365,14 +370,15 @@ mod tests {
365370
});
366371

367372
rt.block_on(local);
373+
rt.block_on(handle).expect("Test task failed");
368374
}
369375

370376
#[test]
371377
fn test_sender_drop() {
372378
let rt = create_test_runtime();
373379
let local = tokio::task::LocalSet::new();
374380

375-
_ = local.spawn_local(async {
381+
let handle = local.spawn_local(async {
376382
let (tx, rx) = Channel::new(1);
377383

378384
let result = tx.send(1);
@@ -387,14 +393,15 @@ mod tests {
387393
});
388394

389395
rt.block_on(local);
396+
rt.block_on(handle).expect("Test task failed");
390397
}
391398

392399
#[test]
393400
fn test_backpressure() {
394401
let rt = create_test_runtime();
395402
let local = tokio::task::LocalSet::new();
396403

397-
_ = local.spawn_local(async {
404+
let handle = local.spawn_local(async {
398405
let (tx, rx) = Channel::new(1);
399406
let send_completed = Rc::new(RefCell::new(false));
400407
let send_completed_clone = send_completed.clone();
@@ -427,5 +434,6 @@ mod tests {
427434
});
428435

429436
rt.block_on(local);
437+
rt.block_on(handle).expect("Test task failed");
430438
}
431439
}

0 commit comments

Comments
 (0)