From a7423a017efe8ea1f0180623a89acdd5635af8c8 Mon Sep 17 00:00:00 2001 From: Irakli Safareli Date: Sat, 21 Sep 2019 16:53:53 +0400 Subject: [PATCH] fix kill after write issue --- bower.json | 1 + src/Effect/Aff/Bus.purs | 65 +++++++++++++++++++++++++++-------------- test/Main.purs | 55 +++++++++++++++++----------------- 3 files changed, 73 insertions(+), 48 deletions(-) diff --git a/bower.json b/bower.json index 0d458ac..61759e2 100644 --- a/bower.json +++ b/bower.json @@ -28,6 +28,7 @@ }, "devDependencies": { "purescript-console": "^4.0.0", + "purescript-assert": "^4.1.0", "purescript-refs": "^4.0.0" } } diff --git a/src/Effect/Aff/Bus.purs b/src/Effect/Aff/Bus.purs index 8298129..22fa310 100644 --- a/src/Effect/Aff/Bus.purs +++ b/src/Effect/Aff/Bus.purs @@ -32,20 +32,22 @@ module Effect.Aff.Bus import Prelude -import Effect.Aff (Aff, attempt, launchAff_) -import Effect.Aff.AVar (AVar) -import Effect.Aff.AVar as AVar -import Effect.AVar as EffAvar -import Effect.Class (class MonadEffect, liftEffect) -import Effect.Exception as Exn +import Control.Lazy (fix) import Control.Monad.Rec.Class (forever) +import Data.Either (Either(..)) import Data.Foldable (foldl, sequence_, traverse_) import Data.List (List(..), (:)) import Data.Tuple (Tuple(..)) +import Effect.AVar as EffAVar +import Effect.Aff (Aff, Error, launchAff_, try) +import Effect.Aff.AVar (AVar) +import Effect.Aff.AVar as AVar +import Effect.Class (class MonadEffect, liftEffect) +import Effect.Exception as Exn data Cap -data Bus (r ∷ # Type) a = Bus (AVar a) (AVar (List (AVar a))) +data Bus (r ∷ # Type) a = Bus (AVar (Either Error a)) (AVar (List (AVar a))) type BusR = BusR' () @@ -60,13 +62,26 @@ type BusRW = Bus (read ∷ Cap, write ∷ Cap) -- | Creates a new bidirectional Bus which can be read from and written to. make ∷ ∀ m a. MonadEffect m ⇒ m (BusRW a) make = liftEffect do - cell ← EffAvar.empty - consumers ← EffAvar.new mempty - launchAff_ $ attempt $ forever do - res ← AVar.take cell - vars ← AVar.take consumers - AVar.put Nil consumers - sequence_ (foldl (\xs a → AVar.put res a : xs) mempty vars) + cell ← EffAVar.empty + consumers ← EffAVar.new mempty + launchAff_ $ fix \loop -> do + -- we `read` from `cell` instead of `take`, so that if error is written, + -- `cell` can be killed, such that if there was any other `put` operations + -- blocked, that will resolve with the error. + resE ← AVar.read cell + case resE of + Left err -> do + vars ← AVar.take consumers + liftEffect do + traverse_ (EffAVar.kill err) vars + EffAVar.kill err consumers + EffAVar.kill err cell + Right res -> do + void $ AVar.take cell + vars ← AVar.take consumers + AVar.put Nil consumers + sequence_ (foldl (\xs a → AVar.put res a : xs) mempty vars) + loop pure $ Bus cell consumers -- | Blocks until a new value is pushed to the Bus, returning the value. @@ -79,20 +94,26 @@ read (Bus _ consumers) = do -- | Pushes a new value to the Bus, yieldig immediately. write ∷ ∀ a r. a → BusW' r a → Aff Unit -write a (Bus cell _) = AVar.put a cell +write a (Bus cell _) = AVar.put (Right a) cell -- | Splits a bidirectional Bus into separate read and write Buses. split ∷ ∀ a. BusRW a → Tuple (BusR a) (BusW a) split (Bus a b) = Tuple (Bus a b) (Bus a b) -- | Kills the Bus and propagates the exception to all pending and future consumers. +-- | `kill` is idempotent and blocks until killing process is fully finishes, i.e. +-- | `kill err bus *> isKilled bus` will result with `true`. kill ∷ ∀ a r. Exn.Error → BusW' r a → Aff Unit -kill err (Bus cell consumers) = unlessM (liftEffect $ EffAvar.isKilled <$> EffAvar.status cell) do - AVar.kill err cell - vars ← AVar.take consumers - traverse_ (AVar.kill err) vars - AVar.kill err consumers +kill err bus@(Bus cell consumers) = do + unlessM (isKilled bus) do + -- If there are multiple parallel processes executing `kill` at the same time, + -- then without this try all of processes which are blocked bu put will be killed + -- as part of handling first `put`. so we have this try to guaranty that kill is idempotent. + void $ try $ AVar.put (Left err) cell + -- Here we block until read from `cell` result's with the `error`, + -- i.e. kill process was finished successfully. + void $ try $ forever $ AVar.read cell -- | Synchronously checks whether a Bus has been killed. -isKilled ∷ ∀ m a r. MonadEffect m ⇒ BusR' r a → m Boolean -isKilled (Bus cell _) = liftEffect $ EffAvar.isKilled <$> EffAvar.status cell +isKilled ∷ ∀ m a r. MonadEffect m ⇒ Bus r a → m Boolean +isKilled (Bus cell _) = liftEffect $ EffAVar.isKilled <$> EffAVar.status cell diff --git a/test/Main.purs b/test/Main.purs index 70715fc..49b9801 100644 --- a/test/Main.purs +++ b/test/Main.purs @@ -18,29 +18,28 @@ module Test.Main where import Prelude +import Control.Monad.Error.Class (throwError) +import Control.Parallel (parSequence_) +import Data.Bifunctor (lmap) +import Data.Either (Either(..), either) +import Effect (Effect) import Effect.Aff (Aff, Milliseconds(..), attempt, delay, forkAff, joinFiber, runAff_) import Effect.Aff.Bus as Bus -import Effect (Effect) import Effect.Class (liftEffect) import Effect.Console (log) import Effect.Exception (error, throwException) import Effect.Ref as Ref -import Control.Monad.Error.Class (throwError) -import Data.Either (Either(..), either) +import Test.Assert (assertEqual', assertTrue') -test_readWrite ∷ Bus.BusRW Int -> Aff Boolean +test_readWrite ∷ Bus.BusRW Int -> Aff Unit test_readWrite bus = do - ref ← liftEffect $ Ref.new 0 + ref ← liftEffect $ Ref.new [] let proc = do res ← attempt (Bus.read bus) - case res of - Left e → do - void $ liftEffect $ Ref.modify (_ + 100) ref - Right n → do - void $ liftEffect $ Ref.modify (_ + n) ref - proc + void $ liftEffect $ Ref.modify (_ <> [res]) ref + either (const $ pure unit) (const proc) res f1 ← forkAff proc f2 ← forkAff proc @@ -49,22 +48,29 @@ test_readWrite bus = do Bus.write 2 bus Bus.write 3 bus - -- without delay kill of bus interpats pending interactions with avar - -- so we need to wait for some time to be sure that all actions are finished - delay $ Milliseconds 10.0 let err = error "Done" - Bus.kill err bus - attempt (Bus.read bus) >>= case _ of - Left err' | show err' == show err -> pure unit - oop -> throwError $ error "read from killed bus should resolve with same error which was used to kill" - unlessM (Bus.isKilled bus) $ throwError $ error "isKilled must return true as bus was killed" + -- killing in parallel must be safe + parSequence_ + [ Bus.kill err bus + , Bus.kill err bus + , Bus.kill err bus + ] + isKilled <- Bus.isKilled bus + liftEffect $ assertTrue' "`isKilled` immediately after `kill` results `true`" isKilled + -- kill is idempotent + Bus.kill err bus + + readRes <- attempt (Bus.read bus) + liftEffect $ assertEqual' "`read` from killed bus should resolve with same error which was used to `kill`" + {actual: lmap show readRes, expected: Left $ show err} joinFiber f1 joinFiber f2 res <- liftEffect $ Ref.read ref - pure $ res == 212 + liftEffect $ assertEqual' "`res` should be as expected" + {actual: lmap show <$> res, expected: [Right 1, Right 1, Right 2, Right 2, Right 3, Right 3, Left $ show err, Left $ show err]} main ∷ Effect Unit @@ -83,9 +89,6 @@ main = do where isOk isFinishedRef = case _ of Left err -> throwException err - Right res -> - if res - then do - log "ok" - Ref.write true isFinishedRef - else throwException $ error "failed" + Right res -> do + log "ok" + Ref.write true isFinishedRef