diff --git a/Cargo.toml b/Cargo.toml index 91f290d..e7ba149 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,6 @@ Timeouts and intervals for futures. [dependencies] futures-core-preview = "0.3.0-alpha.19" -futures-util-preview = "0.3.0-alpha.19" pin-utils = "0.1.0-alpha.4" [dev-dependencies] diff --git a/src/atomic_waker.rs b/src/atomic_waker.rs new file mode 100644 index 0000000..c06f6df --- /dev/null +++ b/src/atomic_waker.rs @@ -0,0 +1,225 @@ +use core::cell::UnsafeCell; +use core::fmt; +use core::sync::atomic::AtomicUsize; +use core::sync::atomic::Ordering::{AcqRel, Acquire, Release}; +use core::task::Waker; + +/// A synchronization primitive for task wakeup. +/// +/// Sometimes the task interested in a given event will change over time. +/// An `AtomicWaker` can coordinate concurrent notifications with the consumer +/// potentially "updating" the underlying task to wake up. This is useful in +/// scenarios where a computation completes in another thread and wants to +/// notify the consumer, but the consumer is in the process of being migrated to +/// a new logical task. +/// +/// Consumers should call `register` before checking the result of a computation +/// and producers should call `wake` after producing the computation (this +/// differs from the usual `thread::park` pattern). It is also permitted for +/// `wake` to be called **before** `register`. This results in a no-op. +/// +/// A single `AtomicWaker` may be reused for any number of calls to `register` or +/// `wake`. +/// +/// `AtomicWaker` does not provide any memory ordering guarantees, as such the +/// user should use caution and use other synchronization primitives to guard +/// the result of the underlying computation. +pub struct AtomicWaker { + state: AtomicUsize, + waker: UnsafeCell>, +} + +/// Idle state +const WAITING: usize = 0; + +/// A new waker value is being registered with the `AtomicWaker` cell. +const REGISTERING: usize = 0b01; + +/// The waker currently registered with the `AtomicWaker` cell is being woken. +const WAKING: usize = 0b10; + +impl AtomicWaker { + /// Create an `AtomicWaker`. + pub fn new() -> AtomicWaker { + // Make sure that task is Sync + trait AssertSync: Sync {} + impl AssertSync for Waker {} + + AtomicWaker { + state: AtomicUsize::new(WAITING), + waker: UnsafeCell::new(None), + } + } + + /// Registers the waker to be notified on calls to `wake`. + /// + /// The new task will take place of any previous tasks that were registered + /// by previous calls to `register`. Any calls to `wake` that happen after + /// a call to `register` (as defined by the memory ordering rules), will + /// notify the `register` caller's task and deregister the waker from future + /// notifications. Because of this, callers should ensure `register` gets + /// invoked with a new `Waker` **each** time they require a wakeup. + /// + /// It is safe to call `register` with multiple other threads concurrently + /// calling `wake`. This will result in the `register` caller's current + /// task being notified once. + /// + /// This function is safe to call concurrently, but this is generally a bad + /// idea. Concurrent calls to `register` will attempt to register different + /// tasks to be notified. One of the callers will win and have its task set, + /// but there is no guarantee as to which caller will succeed. + /// + /// # Examples + /// + /// Here is how `register` is used when implementing a flag. + /// + /// ``` + /// use futures::future::Future; + /// use futures::task::{Context, Poll, AtomicWaker}; + /// use std::sync::atomic::AtomicBool; + /// use std::sync::atomic::Ordering::SeqCst; + /// use std::pin::Pin; + /// + /// struct Flag { + /// waker: AtomicWaker, + /// set: AtomicBool, + /// } + /// + /// impl Future for Flag { + /// type Output = (); + /// + /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + /// // Register **before** checking `set` to avoid a race condition + /// // that would result in lost notifications. + /// self.waker.register(cx.waker()); + /// + /// if self.set.load(SeqCst) { + /// Poll::Ready(()) + /// } else { + /// Poll::Pending + /// } + /// } + /// } + /// ``` + pub fn register(&self, waker: &Waker) { + match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) { + WAITING => { + unsafe { + // Locked acquired, update the waker cell + *self.waker.get() = Some(waker.clone()); + + // Release the lock. If the state transitioned to include + // the `WAKING` bit, this means that a wake has been + // called concurrently, so we have to remove the waker and + // wake it.` + // + // Start by assuming that the state is `REGISTERING` as this + // is what we jut set it to. + let res = self + .state + .compare_exchange(REGISTERING, WAITING, AcqRel, Acquire); + + match res { + Ok(_) => {} + Err(actual) => { + // This branch can only be reached if a + // concurrent thread called `wake`. In this + // case, `actual` **must** be `REGISTERING | + // `WAKING`. + debug_assert_eq!(actual, REGISTERING | WAKING); + + // Take the waker to wake once the atomic operation has + // completed. + let waker = (*self.waker.get()).take().unwrap(); + + // Just swap, because no one could change state while state == `REGISTERING` | `WAKING`. + self.state.swap(WAITING, AcqRel); + + // The atomic swap was complete, now + // wake the task and return. + waker.wake(); + } + } + } + } + WAKING => { + // Currently in the process of waking the task, i.e., + // `wake` is currently being called on the old task handle. + // So, we call wake on the new waker + waker.wake_by_ref(); + } + state => { + // In this case, a concurrent thread is holding the + // "registering" lock. This probably indicates a bug in the + // caller's code as racing to call `register` doesn't make much + // sense. + // + // We just want to maintain memory safety. It is ok to drop the + // call to `register`. + debug_assert!(state == REGISTERING || state == REGISTERING | WAKING); + } + } + } + + /// Calls `wake` on the last `Waker` passed to `register`. + /// + /// If `register` has not been called yet, then this does nothing. + pub fn wake(&self) { + if let Some(waker) = self.take() { + waker.wake(); + } + } + + /// Returns the last `Waker` passed to `register`, so that the user can wake it. + /// + /// + /// Sometimes, just waking the AtomicWaker is not fine grained enough. This allows the user + /// to take the waker and then wake it separately, rather than performing both steps in one + /// atomic action. + /// + /// If a waker has not been registered, this returns `None`. + pub fn take(&self) -> Option { + // AcqRel ordering is used in order to acquire the value of the `task` + // cell as well as to establish a `release` ordering with whatever + // memory the `AtomicWaker` is associated with. + match self.state.fetch_or(WAKING, AcqRel) { + WAITING => { + // The waking lock has been acquired. + let waker = unsafe { (*self.waker.get()).take() }; + + // Release the lock + self.state.fetch_and(!WAKING, Release); + + waker + } + state => { + // There is a concurrent thread currently updating the + // associated task. + // + // Nothing more to do as the `WAKING` bit has been set. It + // doesn't matter if there are concurrent registering threads or + // not. + // + debug_assert!( + state == REGISTERING || state == REGISTERING | WAKING || state == WAKING + ); + None + } + } + } +} + +impl Default for AtomicWaker { + fn default() -> Self { + AtomicWaker::new() + } +} + +impl fmt::Debug for AtomicWaker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "AtomicWaker") + } +} + +unsafe impl Send for AtomicWaker {} +unsafe impl Sync for AtomicWaker {} diff --git a/src/delay.rs b/src/delay.rs index 48a438d..07b139a 100644 --- a/src/delay.rs +++ b/src/delay.rs @@ -12,9 +12,8 @@ use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; -use futures_util::task::AtomicWaker; - use crate::arc_list::Node; +use crate::AtomicWaker; use crate::{ScheduledTimer, TimerHandle}; /// A future representing the notification that an elapsed duration has diff --git a/src/lib.rs b/src/lib.rs index 1931d1b..bb05727 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,6 +18,7 @@ #![warn(missing_debug_implementations)] mod arc_list; +mod atomic_waker; mod delay; mod global; mod heap; @@ -25,6 +26,7 @@ mod heap_timer; mod timer; use arc_list::{ArcList, Node}; +use atomic_waker::AtomicWaker; use heap::{Heap, Slot}; use heap_timer::HeapTimer; use timer::{ScheduledTimer, Timer, TimerHandle}; diff --git a/src/timer.rs b/src/timer.rs index 4186303..170f789 100644 --- a/src/timer.rs +++ b/src/timer.rs @@ -8,9 +8,9 @@ use std::task::{Context, Poll}; use std::time::Instant; use futures_core::future::Future; -use futures_util::task::AtomicWaker; -use super::{global, ArcList, Heap, HeapTimer, Node, Slot}; +use crate::AtomicWaker; +use crate::{global, ArcList, Heap, HeapTimer, Node, Slot}; /// A "timer heap" used to power separately owned instances of `Delay`. ///