Skip to content

fix kill after write issue #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bower.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
},
"devDependencies": {
"purescript-console": "^4.0.0",
"purescript-assert": "^4.1.0",
"purescript-refs": "^4.0.0"
}
}
65 changes: 43 additions & 22 deletions src/Effect/Aff/Bus.purs
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,22 @@ module Effect.Aff.Bus

import Prelude

import Effect.Aff (Aff, attempt, launchAff_)
import Effect.Aff.AVar (AVar)
import Effect.Aff.AVar as AVar
import Effect.AVar as EffAvar
import Effect.Class (class MonadEffect, liftEffect)
import Effect.Exception as Exn
import Control.Lazy (fix)
import Control.Monad.Rec.Class (forever)
import Data.Either (Either(..))
import Data.Foldable (foldl, sequence_, traverse_)
import Data.List (List(..), (:))
import Data.Tuple (Tuple(..))
import Effect.AVar as EffAVar
import Effect.Aff (Aff, Error, launchAff_, try)
import Effect.Aff.AVar (AVar)
import Effect.Aff.AVar as AVar
import Effect.Class (class MonadEffect, liftEffect)
import Effect.Exception as Exn

data Cap

data Bus (r ∷ # Type) a = Bus (AVar a) (AVar (List (AVar a)))
data Bus (r ∷ # Type) a = Bus (AVar (Either Error a)) (AVar (List (AVar a)))

type BusR = BusR' ()

Expand All @@ -60,13 +62,26 @@ type BusRW = Bus (read ∷ Cap, write ∷ Cap)
-- | Creates a new bidirectional Bus which can be read from and written to.
make ∷ ∀ m a. MonadEffect m ⇒ m (BusRW a)
make = liftEffect do
cell ← EffAvar.empty
consumers ← EffAvar.new mempty
launchAff_ $ attempt $ forever do
res ← AVar.take cell
vars ← AVar.take consumers
AVar.put Nil consumers
sequence_ (foldl (\xs a → AVar.put res a : xs) mempty vars)
cell ← EffAVar.empty
consumers ← EffAVar.new mempty
launchAff_ $ fix \loop -> do
-- we `read` from `cell` instead of `take`, so that if error is written,
-- `cell` can be killed, such that if there was any other `put` operations
-- blocked, that will resolve with the error.
resE ← AVar.read cell
case resE of
Left err -> do
vars ← AVar.take consumers
liftEffect do
traverse_ (EffAVar.kill err) vars
EffAVar.kill err consumers
EffAVar.kill err cell
Right res -> do
void $ AVar.take cell
vars ← AVar.take consumers
AVar.put Nil consumers
sequence_ (foldl (\xs a → AVar.put res a : xs) mempty vars)
loop
pure $ Bus cell consumers

-- | Blocks until a new value is pushed to the Bus, returning the value.
Expand All @@ -79,20 +94,26 @@ read (Bus _ consumers) = do

-- | Pushes a new value to the Bus, yieldig immediately.
write ∷ ∀ a r. a → BusW' r a → Aff Unit
write a (Bus cell _) = AVar.put a cell
write a (Bus cell _) = AVar.put (Right a) cell

-- | Splits a bidirectional Bus into separate read and write Buses.
split ∷ ∀ a. BusRW a → Tuple (BusR a) (BusW a)
split (Bus a b) = Tuple (Bus a b) (Bus a b)

-- | Kills the Bus and propagates the exception to all pending and future consumers.
-- | `kill` is idempotent and blocks until killing process is fully finishes, i.e.
-- | `kill err bus *> isKilled bus` will result with `true`.
kill ∷ ∀ a r. Exn.Error → BusW' r a → Aff Unit
kill err (Bus cell consumers) = unlessM (liftEffect $ EffAvar.isKilled <$> EffAvar.status cell) do
AVar.kill err cell
vars ← AVar.take consumers
traverse_ (AVar.kill err) vars
AVar.kill err consumers
kill err bus@(Bus cell consumers) = do
unlessM (isKilled bus) do
-- If there are multiple parallel processes executing `kill` at the same time,
-- then without this try all of processes which are blocked bu put will be killed
-- as part of handling first `put`. so we have this try to guaranty that kill is idempotent.
void $ try $ AVar.put (Left err) cell
-- Here we block until read from `cell` result's with the `error`,
-- i.e. kill process was finished successfully.
void $ try $ forever $ AVar.read cell

-- | Synchronously checks whether a Bus has been killed.
isKilled ∷ ∀ m a r. MonadEffect m ⇒ BusR' r a → m Boolean
isKilled (Bus cell _) = liftEffect $ EffAvar.isKilled <$> EffAvar.status cell
isKilled ∷ ∀ m a r. MonadEffect m ⇒ Bus r a → m Boolean
isKilled (Bus cell _) = liftEffect $ EffAVar.isKilled <$> EffAVar.status cell
55 changes: 29 additions & 26 deletions test/Main.purs
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,28 @@ module Test.Main where

import Prelude

import Control.Monad.Error.Class (throwError)
import Control.Parallel (parSequence_)
import Data.Bifunctor (lmap)
import Data.Either (Either(..), either)
import Effect (Effect)
import Effect.Aff (Aff, Milliseconds(..), attempt, delay, forkAff, joinFiber, runAff_)
import Effect.Aff.Bus as Bus
import Effect (Effect)
import Effect.Class (liftEffect)
import Effect.Console (log)
import Effect.Exception (error, throwException)
import Effect.Ref as Ref
import Control.Monad.Error.Class (throwError)
import Data.Either (Either(..), either)
import Test.Assert (assertEqual', assertTrue')

test_readWrite ∷ Bus.BusRW Int -> Aff Boolean
test_readWrite ∷ Bus.BusRW Int -> Aff Unit
test_readWrite bus = do
ref ← liftEffect $ Ref.new 0
ref ← liftEffect $ Ref.new []

let
proc = do
res ← attempt (Bus.read bus)
case res of
Left e → do
void $ liftEffect $ Ref.modify (_ + 100) ref
Right n → do
void $ liftEffect $ Ref.modify (_ + n) ref
proc
void $ liftEffect $ Ref.modify (_ <> [res]) ref
either (const $ pure unit) (const proc) res

f1 ← forkAff proc
f2 ← forkAff proc
Expand All @@ -49,22 +48,29 @@ test_readWrite bus = do
Bus.write 2 bus
Bus.write 3 bus

-- without delay kill of bus interpats pending interactions with avar
-- so we need to wait for some time to be sure that all actions are finished
delay $ Milliseconds 10.0
let err = error "Done"
Bus.kill err bus
attempt (Bus.read bus) >>= case _ of
Left err' | show err' == show err -> pure unit
oop -> throwError $ error "read from killed bus should resolve with same error which was used to kill"
unlessM (Bus.isKilled bus) $ throwError $ error "isKilled must return true as bus was killed"
-- killing in parallel must be safe
parSequence_
[ Bus.kill err bus
, Bus.kill err bus
, Bus.kill err bus
]
isKilled <- Bus.isKilled bus
liftEffect $ assertTrue' "`isKilled` immediately after `kill` results `true`" isKilled

-- kill is idempotent
Bus.kill err bus

readRes <- attempt (Bus.read bus)
liftEffect $ assertEqual' "`read` from killed bus should resolve with same error which was used to `kill`"
{actual: lmap show readRes, expected: Left $ show err}

joinFiber f1
joinFiber f2

res <- liftEffect $ Ref.read ref
pure $ res == 212
liftEffect $ assertEqual' "`res` should be as expected"
{actual: lmap show <$> res, expected: [Right 1, Right 1, Right 2, Right 2, Right 3, Right 3, Left $ show err, Left $ show err]}


main ∷ Effect Unit
Expand All @@ -83,9 +89,6 @@ main = do
where
isOk isFinishedRef = case _ of
Left err -> throwException err
Right res ->
if res
then do
log "ok"
Ref.write true isFinishedRef
else throwException $ error "failed"
Right res -> do
log "ok"
Ref.write true isFinishedRef