From 04dd840ba3ddc189f72ebb6db85f2d68814ac8e7 Mon Sep 17 00:00:00 2001 From: Doug Gregor Date: Wed, 7 Jul 2021 17:04:52 -0700 Subject: [PATCH 01/10] [SE-0304] Replace the `async` operations in `(Throwing)TaskGroup` with `addTask` --- .../SourceCompatibilityShims.swift | 60 +++++++++++++++---- stdlib/public/Concurrency/TaskGroup.swift | 8 +-- test/Concurrency/async_task_groups.swift | 20 +++---- 3 files changed, 62 insertions(+), 26 deletions(-) diff --git a/stdlib/public/Concurrency/SourceCompatibilityShims.swift b/stdlib/public/Concurrency/SourceCompatibilityShims.swift index f9e38df8a4c1a..1a8faf21c5420 100644 --- a/stdlib/public/Concurrency/SourceCompatibilityShims.swift +++ b/stdlib/public/Concurrency/SourceCompatibilityShims.swift @@ -190,65 +190,101 @@ extension Task where Failure == Never { @available(SwiftStdlib 5.5, *) extension TaskGroup { - @available(*, deprecated, renamed: "async(priority:operation:)") + @available(*, deprecated, renamed: "addTask(priority:operation:)") @_alwaysEmitIntoClient public mutating func add( priority: TaskPriority? = nil, operation: __owned @Sendable @escaping () async -> ChildTaskResult ) async -> Bool { - return self.asyncUnlessCancelled(priority: priority) { + return self.addTaskUnlessCancelled(priority: priority) { await operation() } } - @available(*, deprecated, renamed: "async(priority:operation:)") + @available(*, deprecated, renamed: "addTask(priority:operation:)") @_alwaysEmitIntoClient public mutating func spawn( priority: TaskPriority? = nil, operation: __owned @Sendable @escaping () async -> ChildTaskResult ) { - async(priority: priority, operation: operation) + addTask(priority: priority, operation: operation) } - @available(*, deprecated, renamed: "asyncUnlessCancelled(priority:operation:)") + @available(*, deprecated, renamed: "addTaskUnlessCancelled(priority:operation:)") @_alwaysEmitIntoClient public mutating func spawnUnlessCancelled( priority: TaskPriority? = nil, operation: __owned @Sendable @escaping () async -> ChildTaskResult ) -> Bool { - asyncUnlessCancelled(priority: priority, operation: operation) + addTaskUnlessCancelled(priority: priority, operation: operation) + } + + @available(*, deprecated, renamed: "addTask(priority:operation:)") + @_alwaysEmitIntoClient + public mutating func async( + priority: TaskPriority? = nil, + operation: __owned @Sendable @escaping () async -> ChildTaskResult + ) { + addTask(priority: priority, operation: operation) + } + + @available(*, deprecated, renamed: "addTaskUnlessCancelled(priority:operation:)") + @_alwaysEmitIntoClient + public mutating func asyncUnlessCancelled( + priority: TaskPriority? = nil, + operation: __owned @Sendable @escaping () async -> ChildTaskResult + ) -> Bool { + addTaskUnlessCancelled(priority: priority, operation: operation) } } @available(SwiftStdlib 5.5, *) extension ThrowingTaskGroup { - @available(*, deprecated, renamed: "async(priority:operation:)") + @available(*, deprecated, renamed: "addTask(priority:operation:)") @_alwaysEmitIntoClient public mutating func add( priority: TaskPriority? = nil, operation: __owned @Sendable @escaping () async throws -> ChildTaskResult ) async -> Bool { - return self.asyncUnlessCancelled(priority: priority) { + return self.addTaskUnlessCancelled(priority: priority) { try await operation() } } - @available(*, deprecated, renamed: "async(priority:operation:)") + @available(*, deprecated, renamed: "addTask(priority:operation:)") @_alwaysEmitIntoClient public mutating func spawn( priority: TaskPriority? = nil, operation: __owned @Sendable @escaping () async throws -> ChildTaskResult ) { - async(priority: priority, operation: operation) + addTask(priority: priority, operation: operation) } - @available(*, deprecated, renamed: "asyncUnlessCancelled(priority:operation:)") + @available(*, deprecated, renamed: "addTaskUnlessCancelled(priority:operation:)") @_alwaysEmitIntoClient public mutating func spawnUnlessCancelled( priority: TaskPriority? = nil, operation: __owned @Sendable @escaping () async throws -> ChildTaskResult ) -> Bool { - asyncUnlessCancelled(priority: priority, operation: operation) + addTaskUnlessCancelled(priority: priority, operation: operation) + } + + @available(*, deprecated, renamed: "addTask(priority:operation:)") + @_alwaysEmitIntoClient + public mutating func async( + priority: TaskPriority? = nil, + operation: __owned @Sendable @escaping () async throws -> ChildTaskResult + ) { + addTask(priority: priority, operation: operation) + } + + @available(*, deprecated, renamed: "addTaskUnlessCancelled(priority:operation:)") + @_alwaysEmitIntoClient + public mutating func asyncUnlessCancelled( + priority: TaskPriority? = nil, + operation: __owned @Sendable @escaping () async throws -> ChildTaskResult + ) -> Bool { + addTaskUnlessCancelled(priority: priority, operation: operation) } } diff --git a/stdlib/public/Concurrency/TaskGroup.swift b/stdlib/public/Concurrency/TaskGroup.swift index fec0c0ec4a00f..0c961b2bb089c 100644 --- a/stdlib/public/Concurrency/TaskGroup.swift +++ b/stdlib/public/Concurrency/TaskGroup.swift @@ -208,7 +208,7 @@ public struct TaskGroup { /// - `true` if the operation was added to the group successfully, /// `false` otherwise (e.g. because the group `isCancelled`) @_alwaysEmitIntoClient - public mutating func async( + public mutating func addTask( priority: TaskPriority? = nil, operation: __owned @Sendable @escaping () async -> ChildTaskResult ) { @@ -242,7 +242,7 @@ public struct TaskGroup { /// - `true` if the operation was added to the group successfully, /// `false` otherwise (e.g. because the group `isCancelled`) @_alwaysEmitIntoClient - public mutating func asyncUnlessCancelled( + public mutating func addTaskUnlessCancelled( priority: TaskPriority? = nil, operation: __owned @Sendable @escaping () async -> ChildTaskResult ) -> Bool { @@ -440,7 +440,7 @@ public struct ThrowingTaskGroup { /// - `true` if the operation was added to the group successfully, /// `false` otherwise (e.g. because the group `isCancelled`) @_alwaysEmitIntoClient - public mutating func async( + public mutating func addTask( priority: TaskPriority? = nil, operation: __owned @Sendable @escaping () async throws -> ChildTaskResult ) { @@ -474,7 +474,7 @@ public struct ThrowingTaskGroup { /// - `true` if the operation was added to the group successfully, /// `false` otherwise (e.g. because the group `isCancelled`) @_alwaysEmitIntoClient - public mutating func asyncUnlessCancelled( + public mutating func addTaskUnlessCancelled( priority: TaskPriority? = nil, operation: __owned @Sendable @escaping () async throws -> ChildTaskResult ) -> Bool { diff --git a/test/Concurrency/async_task_groups.swift b/test/Concurrency/async_task_groups.swift index c963a6e3b74f1..49760bb104b79 100644 --- a/test/Concurrency/async_task_groups.swift +++ b/test/Concurrency/async_task_groups.swift @@ -22,11 +22,11 @@ func asyncThrowsOnCancel() async throws -> Int { @available(SwiftStdlib 5.5, *) func test_taskGroup_add() async throws -> Int { try await withThrowingTaskGroup(of: Int.self) { group in - group.async { + group.addTask { await asyncFunc() } - group.async { + group.addTask { await asyncFunc() } @@ -51,9 +51,9 @@ func boom() async throws -> Int { throw Boom() } func first_allMustSucceed() async throws { let first: Int = try await withThrowingTaskGroup(of: Int.self) { group in - group.async { await work() } - group.async { await work() } - group.async { try await boom() } + group.addTask { await work() } + group.addTask { await work() } + group.addTask { try await boom() } if let first = try await group.next() { return first @@ -72,9 +72,9 @@ func first_ignoreFailures() async throws { @Sendable func boom() async throws -> Int { throw Boom() } let first: Int = try await withThrowingTaskGroup(of: Int.self) { group in - group.async { await work() } - group.async { await work() } - group.async { + group.addTask { await work() } + group.addTask { await work() } + group.addTask { do { return try await boom() } catch { @@ -121,7 +121,7 @@ func test_taskGroup_quorum_thenCancel() async { func gatherQuorum(followers: [Follower]) async -> Bool { try! await withThrowingTaskGroup(of: Vote.self) { group in for follower in followers { - group.async { try await follower.vote() } + group.addTask { try await follower.vote() } } defer { @@ -192,7 +192,7 @@ extension Collection where Self: Sendable, Element: Sendable, Self.Index: Sendab var submitted = 0 func submitNext() async throws { - group.async { [submitted,i] in + group.addTask { [submitted,i] in let value = try await transform(self[i]) return SendableTuple2(submitted, value) } From f692faea6aaf8010b9f4c292683cf6cfba32527c Mon Sep 17 00:00:00 2001 From: Doug Gregor Date: Wed, 7 Jul 2021 17:48:57 -0700 Subject: [PATCH 02/10] [SE-0304] Rename `TaskPriority.default` to `medium` --- stdlib/public/Concurrency/Task.swift | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/stdlib/public/Concurrency/Task.swift b/stdlib/public/Concurrency/Task.swift index 19bf29727cc82..b128c1a4b8e49 100644 --- a/stdlib/public/Concurrency/Task.swift +++ b/stdlib/public/Concurrency/Task.swift @@ -202,11 +202,20 @@ public struct TaskPriority: RawRepresentable, Sendable { } public static let high: TaskPriority = .init(rawValue: 0x19) - public static let userInitiated: TaskPriority = high - public static let `default`: TaskPriority = .init(rawValue: 0x15) + + @_alwaysEmitIntoClient + public static var medium: TaskPriority { + .init(rawValue: 0x15) + } + public static let low: TaskPriority = .init(rawValue: 0x11) + + public static let userInitiated: TaskPriority = high public static let utility: TaskPriority = low public static let background: TaskPriority = .init(rawValue: 0x09) + + @available(*, deprecated, renamed: "medium") + public static let `default`: TaskPriority = .init(rawValue: 0x15) } @available(SwiftStdlib 5.5, *) From b7e2136e4a730ad0aabb621ddbcb0d8231def597 Mon Sep 17 00:00:00 2001 From: Doug Gregor Date: Wed, 7 Jul 2021 20:30:57 -0700 Subject: [PATCH 03/10] [SE-0304] Rename `Task.yield()` to `Task.suspend()` --- stdlib/public/Concurrency/Task.swift | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/stdlib/public/Concurrency/Task.swift b/stdlib/public/Concurrency/Task.swift index b128c1a4b8e49..a7a2d234418dd 100644 --- a/stdlib/public/Concurrency/Task.swift +++ b/stdlib/public/Concurrency/Task.swift @@ -645,6 +645,8 @@ extension Task where Success == Never, Failure == Never { /// This is not a perfect cure for starvation; /// if the task is the highest-priority task in the system, it might go /// immediately back to executing. + /// + @available(*, deprecated, renamed: "suspend()") public static func yield() async { let currentTask = Builtin.getCurrentAsyncTask() let priority = getJobFlags(currentTask).priority ?? Task.currentPriority._downgradeUserInteractive @@ -654,6 +656,11 @@ extension Task where Success == Never, Failure == Never { _enqueueJobGlobal(job) } } + + @_alwaysEmitIntoClient + public static func suspend() async { + await yield() + } } // ==== UnsafeCurrentTask ------------------------------------------------------ From 20c8bd12e81ca63b49e0e192448dd608978f3f32 Mon Sep 17 00:00:00 2001 From: Doug Gregor Date: Wed, 7 Jul 2021 20:49:33 -0700 Subject: [PATCH 04/10] [SE-0304] Add `(Throwing)?TaskGroup.waitForAll()` Do this as a staged change to the ABI, introducing an underscored `@usableFromInline` implementation to the ABI that we can rely on later, and an `@_alwaysEmitIntoClient` version we can inline now. --- stdlib/public/Concurrency/TaskGroup.swift | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/stdlib/public/Concurrency/TaskGroup.swift b/stdlib/public/Concurrency/TaskGroup.swift index 0c961b2bb089c..a762d49b92215 100644 --- a/stdlib/public/Concurrency/TaskGroup.swift +++ b/stdlib/public/Concurrency/TaskGroup.swift @@ -333,7 +333,14 @@ public struct TaskGroup { internal mutating func awaitAllRemainingTasks() async { while let _ = await next() {} } - + + /// Wait for all remaining tasks in the task group to complete before + /// returning. + @_alwaysEmitIntoClient + public mutating func waitForAll() async { + await awaitAllRemainingTasks() + } + /// Query whether the group has any remaining tasks. /// /// Task groups are always empty upon entry to the `withTaskGroup` body, and @@ -424,6 +431,17 @@ public struct ThrowingTaskGroup { } } + public mutating func _waitForAll() async throws { + while let _ = try await next() { } + } + + /// Wait for all remaining tasks in the task group to complete before + /// returning. + @_alwaysEmitIntoClient + public mutating func waitForAll() async throws { + while let _ = try await next() { } + } + /// Spawn, unconditionally, a child task in the group. /// /// ### Error handling From 59d1e61ac4678aa81710c9d62ff1b302ff699b3c Mon Sep 17 00:00:00 2001 From: Doug Gregor Date: Wed, 7 Jul 2021 21:57:49 -0700 Subject: [PATCH 05/10] [SE-0304] Implement cancellable `Task.sleep(nanoseconds:)`. --- stdlib/public/Concurrency/CMakeLists.txt | 1 + stdlib/public/Concurrency/Task.swift | 19 --- .../public/Concurrency/TaskCancellation.swift | 2 +- stdlib/public/Concurrency/TaskGroup.swift | 3 +- stdlib/public/Concurrency/TaskSleep.swift | 144 ++++++++++++++++++ .../Runtime/async_task_sleep_cancel.swift | 110 +++++++++++++ 6 files changed, 258 insertions(+), 21 deletions(-) create mode 100644 stdlib/public/Concurrency/TaskSleep.swift create mode 100644 test/Concurrency/Runtime/async_task_sleep_cancel.swift diff --git a/stdlib/public/Concurrency/CMakeLists.txt b/stdlib/public/Concurrency/CMakeLists.txt index be4986cd011bb..10ae71a77ca24 100644 --- a/stdlib/public/Concurrency/CMakeLists.txt +++ b/stdlib/public/Concurrency/CMakeLists.txt @@ -70,6 +70,7 @@ add_swift_target_library(swift_Concurrency ${SWIFT_STDLIB_LIBRARY_BUILD_TYPES} I TaskGroup.swift TaskLocal.cpp TaskLocal.swift + TaskSleep.swift ThreadSanitizer.cpp Mutex.cpp AsyncStreamBuffer.swift diff --git a/stdlib/public/Concurrency/Task.swift b/stdlib/public/Concurrency/Task.swift index a7a2d234418dd..95761b4310ec7 100644 --- a/stdlib/public/Concurrency/Task.swift +++ b/stdlib/public/Concurrency/Task.swift @@ -615,25 +615,6 @@ extension Task where Failure == Error { } } -// ==== Async Sleep ------------------------------------------------------------ - -@available(SwiftStdlib 5.5, *) -extension Task where Success == Never, Failure == Never { - /// Suspends the current task for _at least_ the given duration - /// in nanoseconds. - /// - /// This function does _not_ block the underlying thread. - public static func sleep(_ duration: UInt64) async { - let currentTask = Builtin.getCurrentAsyncTask() - let priority = getJobFlags(currentTask).priority ?? Task.currentPriority._downgradeUserInteractive - - return await Builtin.withUnsafeContinuation { (continuation: Builtin.RawUnsafeContinuation) -> Void in - let job = _taskCreateNullaryContinuationJob(priority: Int(priority.rawValue), continuation: continuation) - _enqueueJobGlobalWithDelay(duration, job) - } - } -} - // ==== Voluntary Suspension ----------------------------------------------------- @available(SwiftStdlib 5.5, *) diff --git a/stdlib/public/Concurrency/TaskCancellation.swift b/stdlib/public/Concurrency/TaskCancellation.swift index 6807819f02bbe..1be21c3e98664 100644 --- a/stdlib/public/Concurrency/TaskCancellation.swift +++ b/stdlib/public/Concurrency/TaskCancellation.swift @@ -112,7 +112,7 @@ public struct CancellationError: Error { @available(SwiftStdlib 5.5, *) @_silgen_name("swift_task_addCancellationHandler") -func _taskAddCancellationHandler(handler: @Sendable () -> Void) -> UnsafeRawPointer /*CancellationNotificationStatusRecord*/ +func _taskAddCancellationHandler(handler: () -> Void) -> UnsafeRawPointer /*CancellationNotificationStatusRecord*/ @available(SwiftStdlib 5.5, *) @_silgen_name("swift_task_removeCancellationHandler") diff --git a/stdlib/public/Concurrency/TaskGroup.swift b/stdlib/public/Concurrency/TaskGroup.swift index a762d49b92215..6ac35dcd31e41 100644 --- a/stdlib/public/Concurrency/TaskGroup.swift +++ b/stdlib/public/Concurrency/TaskGroup.swift @@ -431,7 +431,8 @@ public struct ThrowingTaskGroup { } } - public mutating func _waitForAll() async throws { + @usableFromInline + internal mutating func _waitForAll() async throws { while let _ = try await next() { } } diff --git a/stdlib/public/Concurrency/TaskSleep.swift b/stdlib/public/Concurrency/TaskSleep.swift new file mode 100644 index 0000000000000..88ac654761269 --- /dev/null +++ b/stdlib/public/Concurrency/TaskSleep.swift @@ -0,0 +1,144 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift.org open source project +// +// Copyright (c) 2020 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// +import Swift +@_implementationOnly import _SwiftConcurrencyShims + +@available(SwiftStdlib 5.5, *) +extension Task where Success == Never, Failure == Never { + /// Suspends the current task for _at least_ the given duration + /// in nanoseconds. + /// + /// This function does _not_ block the underlying thread. + public static func sleep(_ duration: UInt64) async { + let currentTask = Builtin.getCurrentAsyncTask() + let priority = getJobFlags(currentTask).priority ?? Task.currentPriority._downgradeUserInteractive + + return await Builtin.withUnsafeContinuation { (continuation: Builtin.RawUnsafeContinuation) -> Void in + let job = _taskCreateNullaryContinuationJob(priority: Int(priority.rawValue), continuation: continuation) + _enqueueJobGlobalWithDelay(duration, job) + } + } + + /// The type of continuation used in the implementation of + /// sleep(nanoseconds:). + private typealias SleepContinuation = UnsafeContinuation<(), Error> + + /// Called when the sleep(nanoseconds:) operation woke up without being + /// cancelled. + private static func onSleepWake( + _ wordPtr: UnsafeMutablePointer, + _ continuation: UnsafeContinuation<(), Error> + ) { + // Indicate that we've finished by putting a "1" into the flag word. + let (_, won) = Builtin.cmpxchg_seqcst_seqcst_Word( + wordPtr._rawValue, + UInt(0)._builtinWordValue, + UInt(1)._builtinWordValue) + + if Bool(_builtinBooleanLiteral: won) { + // The sleep finished, invoke the continuation. + continuation.resume() + } else { + // The task was cancelled first, which means the continuation was + // called by the cancellation handler. We need to deallocate up the flag + // word, because it was left over for this task to complete. + wordPtr.deallocate() + } + } + + /// Called when the sleep(nanoseconds:) operation has been cancelled before + /// the sleep completed. + private static func onSleepCancel( + _ wordPtr: UnsafeMutablePointer, + _ continuation: UnsafeContinuation<(), Error> + ) { + // Indicate that we've finished by putting a "2" into the flag word. + let (_, won) = Builtin.cmpxchg_seqcst_seqcst_Word( + wordPtr._rawValue, + UInt(0)._builtinWordValue, + UInt(2)._builtinWordValue) + + if Bool(_builtinBooleanLiteral: won) { + // We recorded the task cancellation before the sleep finished, so + // invoke the continuation with a the cancellation error. + continuation.resume(throwing: _Concurrency.CancellationError()) + } + } + + /// Suspends the current task for _at least_ the given duration + /// in nanoseconds, unless the task is cancelled. If the task is cancelled, + /// throws \c CancellationError. + /// + /// This function does _not_ block the underlying thread. + public static func sleep(nanoseconds duration: UInt64) async throws { + // If the task was already cancelled, go ahead and throw now. + try checkCancellation() + + // Allocate storage for the flag word and continuation. + let wordPtr = UnsafeMutablePointer.allocate(capacity: 2) + + // Initialize the flag word to 0, which means the continuation has not + // executed. + Builtin.atomicstore_seqcst_Word( + wordPtr._rawValue, UInt(0)._builtinWordValue) + + // A pointer to the storage continuation. Also initialize it to zero, to + // indicate that there is no continuation. + let continuationPtr = wordPtr + 1 + Builtin.atomicstore_seqcst_Word( + continuationPtr._rawValue, UInt(0)._builtinWordValue) + + do { + // Install a cancellation handler to resume the continuation by + // throwing CancellationError. + try await withTaskCancellationHandler { + let _: () = try await withUnsafeThrowingContinuation { continuation in + // Stash the continuation so the cancellation handler can see it. + Builtin.atomicstore_seqcst_Word( + continuationPtr._rawValue, + unsafeBitCast(continuation, to: Builtin.Word.self)) + + // Create a task that resumes the continuation normally if it + // finishes first. Enqueue it directly with the delay, so it fires + // when we're done sleeping. + let sleepTaskFlags = taskCreateFlags( + priority: nil, isChildTask: false, copyTaskLocals: false, + inheritContext: false, enqueueJob: false, + addPendingGroupTaskUnconditionally: false) + let (sleepTask, _) = Builtin.createAsyncTask(sleepTaskFlags) { + onSleepWake(wordPtr, continuation) + } + _enqueueJobGlobalWithDelay( + duration, Builtin.convertTaskToJob(sleepTask)) + } + } onCancel: { + let continuationWord = continuationPtr.pointee + if UInt(continuationWord) != 0 { + // Try to cancel, which will resume the continuation by throwing a + // CancellationError if the continuation hasn't already been resumed. + continuationPtr.withMemoryRebound( + to: SleepContinuation.self, capacity: 1) { + onSleepCancel(wordPtr, $0.pointee) + } + } + } + + // We got here without being cancelled, so deallocate the storage for + // the flag word and continuation. + wordPtr.deallocate() + } catch { + // The task was cancelled; propagate the error. The "on wake" task is + // responsible for deallocating the flag word. + throw error + } + } +} diff --git a/test/Concurrency/Runtime/async_task_sleep_cancel.swift b/test/Concurrency/Runtime/async_task_sleep_cancel.swift new file mode 100644 index 0000000000000..b321a083a1ee1 --- /dev/null +++ b/test/Concurrency/Runtime/async_task_sleep_cancel.swift @@ -0,0 +1,110 @@ +// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency %import-libdispatch -parse-as-library) | %FileCheck %s --dump-input always +// REQUIRES: executable_test +// REQUIRES: concurrency +// REQUIRES: libdispatch + +// rdar://76038845 +// UNSUPPORTED: use_os_stdlib +// UNSUPPORTED: back_deployment_runtime + +import _Concurrency +// FIXME: should not depend on Dispatch +import Dispatch + +@available(SwiftStdlib 5.5, *) +@main struct Main { + static let pause = 500_000_000 // 500ms + + static func main() async { + // CHECK: Starting! + print("Starting!") + await testSleepFinished() + await testSleepCancelledBeforeStarted() + await testSleepCancelled() + } + + static func testSleepFinished() async { + // CHECK-NEXT: Testing sleep that completes + print("Testing sleep that completes") + let start = DispatchTime.now() + + // try! will fail if the task got cancelled (which shouldn't happen). + try! await Task.sleep(nanoseconds: UInt64(pause)) + + let stop = DispatchTime.now() + + // assert that at least the specified time passed since calling `sleep` + assert(stop >= (start + .nanoseconds(pause))) + + // CHECK-NEXT: Wakey wakey! + print("Wakey wakey!") + } + + static func testSleepCancelledBeforeStarted() async { + // CHECK-NEXT: Testing sleep that gets cancelled before it starts + print("Testing sleep that gets cancelled before it starts") + let start = DispatchTime.now() + + let sleepyTask = Task { + try await Task.sleep(nanoseconds: UInt64(pause)) + } + + do { + sleepyTask.cancel() + try await sleepyTask.value + + fatalError("sleep(nanoseconds:) should have thrown CancellationError") + } catch is CancellationError { + // CHECK-NEXT: Caught the cancellation error + print("Caught the cancellation error") + + let stop = DispatchTime.now() + + // assert that we stopped early. + assert(stop < (start + .nanoseconds(pause))) + } catch { + fatalError("sleep(nanoseconds:) threw some other error: \(error)") + } + + // CHECK-NEXT: Cancelled! + print("Cancelled!") + } + + static func testSleepCancelled() async { + // CHECK-NEXT: Testing sleep that gets cancelled before it completes + print("Testing sleep that gets cancelled before it completes") + let start = DispatchTime.now() + + let sleepyTask = Task { + try await Task.sleep(nanoseconds: UInt64(pause)) + } + + do { + let waiterTask = Task { + try await sleepyTask.value + } + + let cancellerTask = Task { + await Task.sleep(UInt64(pause / 2)) + sleepyTask.cancel() + } + + try await waiterTask.value + + fatalError("sleep(nanoseconds:) should have thrown CancellationError") + } catch is CancellationError { + // CHECK-NEXT: Caught the cancellation error + print("Caught the cancellation error") + + let stop = DispatchTime.now() + + // assert that we stopped early. + assert(stop < (start + .nanoseconds(pause))) + } catch { + fatalError("sleep(nanoseconds:) threw some other error: \(error)") + } + + // CHECK-NEXT: Cancelled! + print("Cancelled!") + } +} From 7ba63cf53cc1c735fe44041371b8acfcd7be93be Mon Sep 17 00:00:00 2001 From: Doug Gregor Date: Thu, 8 Jul 2021 13:21:56 -0700 Subject: [PATCH 06/10] [SE-0304] Clarify documentation of Task.sleep(nanoseconds:) slightly --- stdlib/public/Concurrency/TaskSleep.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stdlib/public/Concurrency/TaskSleep.swift b/stdlib/public/Concurrency/TaskSleep.swift index 88ac654761269..39a20cf7fe446 100644 --- a/stdlib/public/Concurrency/TaskSleep.swift +++ b/stdlib/public/Concurrency/TaskSleep.swift @@ -76,7 +76,7 @@ extension Task where Success == Never, Failure == Never { /// Suspends the current task for _at least_ the given duration /// in nanoseconds, unless the task is cancelled. If the task is cancelled, - /// throws \c CancellationError. + /// throws \c CancellationError without waiting for the duration. /// /// This function does _not_ block the underlying thread. public static func sleep(nanoseconds duration: UInt64) async throws { From 4a0723f68ce9e5a6d88c17e82724b52b0a013c6e Mon Sep 17 00:00:00 2001 From: Doug Gregor Date: Thu, 8 Jul 2021 13:22:33 -0700 Subject: [PATCH 07/10] [SE-0304] Add `UnsafeCurrentTask.cancel()`. --- stdlib/public/Concurrency/Task.swift | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/stdlib/public/Concurrency/Task.swift b/stdlib/public/Concurrency/Task.swift index 95761b4310ec7..12aaa7655aa9e 100644 --- a/stdlib/public/Concurrency/Task.swift +++ b/stdlib/public/Concurrency/Task.swift @@ -712,6 +712,11 @@ public struct UnsafeCurrentTask { public var priority: TaskPriority { getJobFlags(_task).priority ?? .unspecified } + + /// Cancel the current task. + public func cancel() { + _taskCancel(_task) + } } @available(SwiftStdlib 5.5, *) From a596a03d3870e449a13d4e2e2ffedc6df5157a9a Mon Sep 17 00:00:00 2001 From: Doug Gregor Date: Thu, 8 Jul 2021 13:22:47 -0700 Subject: [PATCH 08/10] Add a test for `Task.sleep(nanoseconds: 0)`. --- .../Runtime/async_task_sleep_cancel.swift | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/test/Concurrency/Runtime/async_task_sleep_cancel.swift b/test/Concurrency/Runtime/async_task_sleep_cancel.swift index b321a083a1ee1..a8dbdea9daddb 100644 --- a/test/Concurrency/Runtime/async_task_sleep_cancel.swift +++ b/test/Concurrency/Runtime/async_task_sleep_cancel.swift @@ -19,6 +19,7 @@ import Dispatch // CHECK: Starting! print("Starting!") await testSleepFinished() + await testSleepMomentary() await testSleepCancelledBeforeStarted() await testSleepCancelled() } @@ -40,6 +41,17 @@ import Dispatch print("Wakey wakey!") } + static func testSleepMomentary() async { + // CHECK-NEXT: Testing sleep that completes instantly + print("Testing sleep that completes instantly") + + // try! will fail if the task got cancelled (which shouldn't happen). + try! await Task.sleep(nanoseconds: 0) + + // CHECK-NEXT: Wakey wakey! + print("Wakey wakey!") + } + static func testSleepCancelledBeforeStarted() async { // CHECK-NEXT: Testing sleep that gets cancelled before it starts print("Testing sleep that gets cancelled before it starts") From 1a024e96c8405d5def56b906bec4302a932d31c4 Mon Sep 17 00:00:00 2001 From: Doug Gregor Date: Thu, 8 Jul 2021 13:23:42 -0700 Subject: [PATCH 09/10] Remove `UnownedJob.run()`. It never did anything anyway --- stdlib/public/Concurrency/PartialAsyncTask.swift | 2 -- 1 file changed, 2 deletions(-) diff --git a/stdlib/public/Concurrency/PartialAsyncTask.swift b/stdlib/public/Concurrency/PartialAsyncTask.swift index c8fdd11221889..79398caf62cbb 100644 --- a/stdlib/public/Concurrency/PartialAsyncTask.swift +++ b/stdlib/public/Concurrency/PartialAsyncTask.swift @@ -18,8 +18,6 @@ import Swift @frozen public struct UnownedJob { private var context: Builtin.Job - - public func run() { } } @available(SwiftStdlib 5.5, *) From 121e34fa04c18c90fc292b2f6a3383263556ebf4 Mon Sep 17 00:00:00 2001 From: Doug Gregor Date: Thu, 8 Jul 2021 17:26:55 -0700 Subject: [PATCH 10/10] Reimplement `Task.sleep(nanoseconds:)` without the raciness. The prior implementation of `Task.sleep()` effectively had two different atomic words to capture the state, which could lead to cases where cancelling before a sleep operation started would fail to throw `CancellationError`. Reimplement the logic for the cancellable sleep with a more traditional lock-free approach by packing all of the state information into a single word, where we always load, figure out what to do, then compare-and-swap. --- stdlib/public/Concurrency/TaskSleep.swift | 286 +++++++++++++++++----- 1 file changed, 218 insertions(+), 68 deletions(-) diff --git a/stdlib/public/Concurrency/TaskSleep.swift b/stdlib/public/Concurrency/TaskSleep.swift index 39a20cf7fe446..62ab2bad374a2 100644 --- a/stdlib/public/Concurrency/TaskSleep.swift +++ b/stdlib/public/Concurrency/TaskSleep.swift @@ -32,45 +32,164 @@ extension Task where Success == Never, Failure == Never { /// sleep(nanoseconds:). private typealias SleepContinuation = UnsafeContinuation<(), Error> + /// Describes the state of a sleep() operation. + private enum SleepState { + /// The sleep continuation has not yet begun. + case notStarted + + // The sleep continuation has been created and is available here. + case activeContinuation(SleepContinuation) + + /// The sleep has finished. + case finished + + /// The sleep was cancelled. + case cancelled + + /// The sleep was cancelled before it even got started. + case cancelledBeforeStarted + + /// Decode sleep state from the word of storage. + init(word: Builtin.Word) { + switch UInt(word) & 0x03 { + case 0: + let continuationBits = UInt(word) & ~0x03 + if continuationBits == 0 { + self = .notStarted + } else { + let continuation = unsafeBitCast( + continuationBits, to: SleepContinuation.self) + self = .activeContinuation(continuation) + } + + case 1: + self = .finished + + case 2: + self = .cancelled + + case 3: + self = .cancelledBeforeStarted + + default: + fatalError("Bitmask failure") + } + } + + /// Decode sleep state by loading from the given pointer + init(loading wordPtr: UnsafeMutablePointer) { + self.init(word: Builtin.atomicload_seqcst_Word(wordPtr._rawValue)) + } + + /// Encode sleep state into a word of storage. + var word: UInt { + switch self { + case .notStarted: + return 0 + + case .activeContinuation(let continuation): + let continuationBits = unsafeBitCast(continuation, to: UInt.self) + return continuationBits + + case .finished: + return 1 + + case .cancelled: + return 2 + + case .cancelledBeforeStarted: + return 3 + } + } + } + /// Called when the sleep(nanoseconds:) operation woke up without being /// cancelled. private static func onSleepWake( - _ wordPtr: UnsafeMutablePointer, - _ continuation: UnsafeContinuation<(), Error> + _ wordPtr: UnsafeMutablePointer ) { - // Indicate that we've finished by putting a "1" into the flag word. - let (_, won) = Builtin.cmpxchg_seqcst_seqcst_Word( - wordPtr._rawValue, - UInt(0)._builtinWordValue, - UInt(1)._builtinWordValue) - - if Bool(_builtinBooleanLiteral: won) { - // The sleep finished, invoke the continuation. - continuation.resume() - } else { - // The task was cancelled first, which means the continuation was - // called by the cancellation handler. We need to deallocate up the flag - // word, because it was left over for this task to complete. - wordPtr.deallocate() + while true { + let state = SleepState(loading: wordPtr) + switch state { + case .notStarted: + fatalError("Cannot wake before we even started") + + case .activeContinuation(let continuation): + // We have an active continuation, so try to transition to the + // "finished" state. + let (_, won) = Builtin.cmpxchg_seqcst_seqcst_Word( + wordPtr._rawValue, + state.word._builtinWordValue, + SleepState.finished.word._builtinWordValue) + if Bool(_builtinBooleanLiteral: won) { + // The sleep finished, so invoke the continuation: we're done. + continuation.resume() + return + } + + // Try again! + continue + + case .finished: + fatalError("Already finished normally, can't do that again") + + case .cancelled: + // The task was cancelled, which means the continuation was + // called by the cancellation handler. We need to deallocate the flag + // word, because it was left over for this task to complete. + wordPtr.deallocate() + return + + case .cancelledBeforeStarted: + // Nothing to do; + return + } } } /// Called when the sleep(nanoseconds:) operation has been cancelled before /// the sleep completed. private static func onSleepCancel( - _ wordPtr: UnsafeMutablePointer, - _ continuation: UnsafeContinuation<(), Error> + _ wordPtr: UnsafeMutablePointer ) { - // Indicate that we've finished by putting a "2" into the flag word. - let (_, won) = Builtin.cmpxchg_seqcst_seqcst_Word( - wordPtr._rawValue, - UInt(0)._builtinWordValue, - UInt(2)._builtinWordValue) - - if Bool(_builtinBooleanLiteral: won) { - // We recorded the task cancellation before the sleep finished, so - // invoke the continuation with a the cancellation error. - continuation.resume(throwing: _Concurrency.CancellationError()) + while true { + let state = SleepState(loading: wordPtr) + switch state { + case .notStarted: + // We haven't started yet, so try to transition to the cancelled-before + // started state. + let (_, won) = Builtin.cmpxchg_seqcst_seqcst_Word( + wordPtr._rawValue, + state.word._builtinWordValue, + SleepState.cancelledBeforeStarted.word._builtinWordValue) + if Bool(_builtinBooleanLiteral: won) { + return + } + + // Try again! + continue + + case .activeContinuation(let continuation): + // We have an active continuation, so try to transition to the + // "cancelled" state. + let (_, won) = Builtin.cmpxchg_seqcst_seqcst_Word( + wordPtr._rawValue, + state.word._builtinWordValue, + SleepState.cancelled.word._builtinWordValue) + if Bool(_builtinBooleanLiteral: won) { + // We recorded the task cancellation before the sleep finished, so + // invoke the continuation with the cancellation error. + continuation.resume(throwing: _Concurrency.CancellationError()) + return + } + + // Try again! + continue + + case .finished, .cancelled, .cancelledBeforeStarted: + // The operation already finished, so there is nothing more to do. + return + } } } @@ -80,64 +199,95 @@ extension Task where Success == Never, Failure == Never { /// /// This function does _not_ block the underlying thread. public static func sleep(nanoseconds duration: UInt64) async throws { - // If the task was already cancelled, go ahead and throw now. - try checkCancellation() - - // Allocate storage for the flag word and continuation. - let wordPtr = UnsafeMutablePointer.allocate(capacity: 2) + // Allocate storage for the storage word. + let wordPtr = UnsafeMutablePointer.allocate(capacity: 1) - // Initialize the flag word to 0, which means the continuation has not - // executed. + // Initialize the flag word to "not started", which means the continuation + // has neither been created nor completed. Builtin.atomicstore_seqcst_Word( - wordPtr._rawValue, UInt(0)._builtinWordValue) - - // A pointer to the storage continuation. Also initialize it to zero, to - // indicate that there is no continuation. - let continuationPtr = wordPtr + 1 - Builtin.atomicstore_seqcst_Word( - continuationPtr._rawValue, UInt(0)._builtinWordValue) + wordPtr._rawValue, SleepState.notStarted.word._builtinWordValue) do { // Install a cancellation handler to resume the continuation by // throwing CancellationError. try await withTaskCancellationHandler { let _: () = try await withUnsafeThrowingContinuation { continuation in - // Stash the continuation so the cancellation handler can see it. - Builtin.atomicstore_seqcst_Word( - continuationPtr._rawValue, - unsafeBitCast(continuation, to: Builtin.Word.self)) - - // Create a task that resumes the continuation normally if it - // finishes first. Enqueue it directly with the delay, so it fires - // when we're done sleeping. - let sleepTaskFlags = taskCreateFlags( - priority: nil, isChildTask: false, copyTaskLocals: false, - inheritContext: false, enqueueJob: false, - addPendingGroupTaskUnconditionally: false) - let (sleepTask, _) = Builtin.createAsyncTask(sleepTaskFlags) { - onSleepWake(wordPtr, continuation) + while true { + let state = SleepState(loading: wordPtr) + switch state { + case .notStarted: + // The word that describes the active continuation state. + let continuationWord = + SleepState.activeContinuation(continuation).word + + // Try to swap in the continuation word. + let (_, won) = Builtin.cmpxchg_seqcst_seqcst_Word( + wordPtr._rawValue, + state.word._builtinWordValue, + continuationWord._builtinWordValue) + if !Bool(_builtinBooleanLiteral: won) { + // Keep trying! + continue + } + + // Create a task that resumes the continuation normally if it + // finishes first. Enqueue it directly with the delay, so it fires + // when we're done sleeping. + let sleepTaskFlags = taskCreateFlags( + priority: nil, isChildTask: false, copyTaskLocals: false, + inheritContext: false, enqueueJob: false, + addPendingGroupTaskUnconditionally: false) + let (sleepTask, _) = Builtin.createAsyncTask(sleepTaskFlags) { + onSleepWake(wordPtr) + } + _enqueueJobGlobalWithDelay( + duration, Builtin.convertTaskToJob(sleepTask)) + return + + case .activeContinuation, .finished: + fatalError("Impossible to have multiple active continuations") + + case .cancelled: + fatalError("Impossible to have cancelled before we began") + + case .cancelledBeforeStarted: + // Finish the continuation normally. We'll throw later, after + // we clean up. + continuation.resume() + return } - _enqueueJobGlobalWithDelay( - duration, Builtin.convertTaskToJob(sleepTask)) } - } onCancel: { - let continuationWord = continuationPtr.pointee - if UInt(continuationWord) != 0 { - // Try to cancel, which will resume the continuation by throwing a - // CancellationError if the continuation hasn't already been resumed. - continuationPtr.withMemoryRebound( - to: SleepContinuation.self, capacity: 1) { - onSleepCancel(wordPtr, $0.pointee) - } } + } onCancel: { + onSleepCancel(wordPtr) + } + + // Determine whether we got cancelled before we even started. + let cancelledBeforeStarted: Bool + switch SleepState(loading: wordPtr) { + case .notStarted, .activeContinuation, .cancelled: + fatalError("Invalid state for non-cancelled sleep task") + + case .cancelledBeforeStarted: + cancelledBeforeStarted = true + + case .finished: + cancelledBeforeStarted = false } // We got here without being cancelled, so deallocate the storage for // the flag word and continuation. wordPtr.deallocate() + + // If we got cancelled before we even started, through the cancellation + // error now. + if cancelledBeforeStarted { + throw _Concurrency.CancellationError() + } } catch { // The task was cancelled; propagate the error. The "on wake" task is - // responsible for deallocating the flag word. + // responsible for deallocating the flag word and continuation, if it's + // still running. throw error } }