Skip to content

_parAffAlt does not cancel all its parallel fibers when killed #216

Open
@deniskr

Description

@deniskr

We have a pretty complex we app that makes heavy use of fibers that run in parallel. Lately I stumbled upon the following scenario:

  1. fiber1 runs Control.Parallel.parOneOf (which is based on Effect.Aff._parAffAlt)
  2. The argument of Control.Parallel.parOneOf contains an array of 6 fibers that run in parallel
  3. Before any of the 6 child fibers complete, fiber1 is killed

Expected behavior
All 6 child fibers are killed

Actual behavior
5 of the 6 fibers are killed, one fiber continues running

Additional context
each of the child fiber contains quite a complex sub-tree of fibers which use bracket and habe non-tirivial finilizers.

To Reproduce
The case I describe happens in a complex integration test, which is impractical to run on your side. I tried to reduce the test to something consice, but when I reduce the complexity of the child fibers to something like delay (Milliseconds 10.0) the system works as expected. So the problem happens only when the child fibers use non-trivial finilizers and brackets.

Why I can be sure that the problem lies in _parAffAlt and not in our code? Because when I replaced the library Effect.Aff._parAffAlt function with my own implementation (see below) the system started working as expected.

Question
The unit tests in test/Test/Main.purs use very small fiber trees which mostly contain only the delay function. Those unit test
pass without triggering this bug.
Would it be possible to extend the test cases with large fiber trees built by Test.QuickCheck?
Does any body else encountered a similar hard to reproduce behaviour?

altPar
  :: forall a
  .  Aff a 
  -> Aff a 
  -> Aff a
altPar a1 a2 = supervise do
  f1Ref <- liftEffect $ Ref.new Nothing
  f2Ref <- liftEffect $ Ref.new Nothing

  f1 <- suspendAff do
    r1 <- attempt a1

    case r1 of
      Right _ -> do
        mf2 <- liftEffect $ Ref.read f2Ref
        for_ mf2 \f2 -> do
          killFiber (error "CancellationException1") f2 
        
      Left _ -> pure unit
    
    pure r1

  f2 <- suspendAff do
    r2 <- attempt a2

    case r2 of
      Right _ -> do
        mf1 <- liftEffect $ Ref.read f1Ref
        for_ mf1 \f1 -> do
          killFiber (error "CancellationException2") f1 
        
      Left _ -> pure unit
    
    pure r2
  
  liftEffect $ Ref.write (Just f1) f1Ref
  liftEffect $ Ref.write (Just f2) f2Ref

  fr1 <- forkAff (joinFiber f1)
  fr2 <- forkAff (joinFiber f2)

  r1 <- catchError 
          (joinFiber fr1)
          (\err -> pure (Left err))
  r2 <- catchError 
          (joinFiber fr2)
          (\err -> pure (Left err))


  case Tuple r1 r2 of
    Tuple (Right a) _ -> pure a
    Tuple (Left _) (Right a) -> pure a
    Tuple (Left ce) (Left err) | message ce == "CancellationException2" -> throwError err
    Tuple (Left err) (Left _) -> throwError err

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions