Skip to content

Moves back to using a List of AVars for the listeners #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 33 additions & 10 deletions src/Control/Monad/Aff/Bus.purs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{-
Copyright 2016 SlamData, Inc.
Copyright 2018 SlamData, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,13 +30,18 @@ module Control.Monad.Aff.Bus
) where

import Prelude
import Control.Monad.Aff (Aff, Error)
import Control.Monad.Aff.AVar (AVar, AVAR, makeEmptyVar, takeVar, tryPutVar, readVar, killVar)

import Control.Monad.Aff (Aff, attempt, forkAff)
import Control.Monad.Aff.AVar (AVAR, AVar, killVar, makeEmptyVar, makeVar, putVar, takeVar)
import Control.Monad.Eff.Exception as Exn
import Data.Foldable (foldl, sequence_, traverse_)
import Data.List (List, (:))
import Data.Monoid (mempty)
import Data.Tuple (Tuple(..))

data Cap

newtype Bus (r ∷ # Type) a = Bus (AVar a)
data Bus (r ∷ # Type) a = Bus (AVar a) (AVar (List (AVar a)))

type BusR = BusR' ()

Expand All @@ -50,20 +55,38 @@ type BusRW = Bus (read ∷ Cap, write ∷ Cap)

-- | Creates a new bidirectional Bus which can be read from and written to.
make ∷ ∀ eff a. Aff (avar ∷ AVAR | eff) (BusRW a)
make = Bus <$> makeEmptyVar
make = do
cell ← makeEmptyVar
consumers ← makeVar mempty
let
loop = do
attempt (takeVar cell) >>= traverse_ \res → do
vars ← takeVar consumers
putVar mempty consumers
sequence_ (foldl (\xs a → putVar res a : xs) mempty vars)
loop
_ ← forkAff loop
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might need to be an unsafe fork (liftEff <<< launchAff) unless you want to let users to control the lifecycle of the loop with supervised.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing that line to _ ← liftEff (launchAff (forkAff loop)) explodes the tests with:

/home/creek/work/purescript-aff-bus/output/Control.Monad.Aff/foreign.js:513
                throw util.fromLeft(step);
                ^

TypeError: bhead is not a function
    at run (/home/creek/work/purescript-aff-bus/output/Control.Monad.Aff/foreign.js:274:20)
    at Object.run (/home/creek/work/purescript-aff-bus/output/Control.Monad.Aff/foreign.js:623:13)
    at __do (/home/creek/work/purescript-aff-bus/output/Control.Monad.Aff/index.js:91:11)
    at runSync (/home/creek/work/purescript-aff-bus/output/Control.Monad.Aff/foreign.js:90:20)
    at run (/home/creek/work/purescript-aff-bus/output/Control.Monad.Aff/foreign.js:319:22)
    at /home/creek/work/purescript-aff-bus/output/Control.Monad.Aff/foreign.js:620:15
    at drain (/home/creek/work/purescript-aff-bus/output/Control.Monad.Aff/foreign.js:120:9)
    at Object.enqueue (/home/creek/work/purescript-aff-bus/output/Control.Monad.Aff/foreign.js:141:11)
    at Object.run (/home/creek/work/purescript-aff-bus/output/Control.Monad.Aff/foreign.js:619:23)
    at __do (/home/creek/work/purescript-aff-bus/output/Control.Monad.Aff/index.js:91:11)
* ERROR: Subcommand terminated with exit code 1

Looks like it's maybe a bug in Aff?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pure $ Bus cell consumers

-- | Blocks until a new value is pushed to the Bus, returning the value.
read ∷ ∀ eff a r. BusR' r a → Aff (avar ∷ AVAR | eff) a
read (Bus avar) = readVar avar
read (Bus _ consumers) = do
res' ← makeEmptyVar
cs ← takeVar consumers
putVar (res' : cs) consumers
takeVar res'

-- | Pushes a new value to the Bus, yieldig immediately.
write ∷ ∀ eff a r. a → BusW' r a → Aff (avar ∷ AVAR | eff) Unit
write a (Bus avar) = tryPutVar a avar *> void (takeVar avar)
write a (Bus cell _) = putVar a cell

-- | Splits a bidirectional Bus into separate read and write Buses.
split ∷ ∀ a. BusRW a → Tuple (BusR a) (BusW a)
split (Bus avar) = Tuple (Bus avar) (Bus avar)
split (Bus a b) = Tuple (Bus a b) (Bus a b)

-- | Kills the Bus and propagates the exception to all consumers.
kill ∷ ∀ eff a r. Error → BusW' r a → Aff (avar ∷ AVAR | eff) Unit
kill err (Bus avar) = killVar err avar
kill ∷ ∀ eff a r. Exn.Error → BusW' r a → Aff (avar ∷ AVAR | eff) Unit
kill err (Bus cell consumers) = do
killVar err cell
vars ← takeVar consumers
traverse_ (killVar err) vars
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This used to kill consumers as well, but doing so propagated an error all the way up now, so we dropped that part. It'll be empty anyway, so I think it's still fine.

7 changes: 4 additions & 3 deletions test/Main.purs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
{-
Copyright 2016 SlamData, Inc.
Copyright 2018 SlamData, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -17,7 +17,8 @@ limitations under the License.
module Test.Main where

import Prelude
import Control.Monad.Aff (Aff, forkAff, launchAff, joinFiber, attempt)

import Control.Monad.Aff (Aff, attempt, forkAff, joinFiber, launchAff)
import Control.Monad.Aff.AVar (AVAR)
import Control.Monad.Aff.Bus as Bus
import Control.Monad.Aff.Console (log)
Expand Down