From 0e4c1d836dca3eb3bb12a5770dbc7326180b1805 Mon Sep 17 00:00:00 2001 From: Thibault Wittemberg Date: Tue, 14 Jun 2022 21:25:23 +0200 Subject: [PATCH 1/4] asyncThrowingChannel: make fail set a terminal state --- .../AsyncThrowingChannel.swift | 116 ++++++++++++------ Tests/AsyncAlgorithmsTests/TestChannel.swift | 20 ++- 2 files changed, 95 insertions(+), 41 deletions(-) diff --git a/Sources/AsyncAlgorithms/AsyncThrowingChannel.swift b/Sources/AsyncAlgorithms/AsyncThrowingChannel.swift index 58c9e8c9..5ce68961 100644 --- a/Sources/AsyncAlgorithms/AsyncThrowingChannel.swift +++ b/Sources/AsyncAlgorithms/AsyncThrowingChannel.swift @@ -11,7 +11,13 @@ /// An error-throwing channel for sending elements from on task to another with back pressure. /// -/// The `AsyncThrowingChannel` class is intended to be used as a communication types between tasks, particularly when one task produces values and another task consumes those values. The back pressure applied by `send(_:)`, `fail(_:)` and `finish()` via suspension/resume ensures that the production of values does not exceed the consumption of values from iteration. Each of these methods suspends after enqueuing the event and is resumed when the next call to `next()` on the `Iterator` is made. +/// The `AsyncThrowingChannel` class is intended to be used as a communication types between tasks, +/// particularly when one task produces values and another task consumes those values. The back +/// pressure applied by `send(_:)` via suspension/resume ensures that the production of values does +/// not exceed the consumption of values from iteration. This method suspends after enqueuing the event +/// and is resumed when the next call to `next()` on the `Iterator` is made, or when `finish()`/`fail(_:)` is called +/// from another Task. As `finish()` and `fail(_:)` induce a terminal state, there is no need for a back pressure management. +/// Those functions do not suspend and will finish all the pending iterations. public final class AsyncThrowingChannel: AsyncSequence, Sendable { /// The iterator for an `AsyncThrowingChannel` instance. public struct Iterator: AsyncIteratorProtocol, Sendable { @@ -78,12 +84,23 @@ public final class AsyncThrowingChannel: Asyn return lhs.generation == rhs.generation } } + + enum Termination { + case finished + case failed(Error) + } enum Emission { case idle case pending([UnsafeContinuation?, Never>]) case awaiting(Set) - + case terminated(Termination) + + var isTerminated: Bool { + guard case .terminated = self else { return false } + return true + } + mutating func cancel(_ generation: Int) -> UnsafeContinuation? { switch self { case .awaiting(var awaiting): @@ -106,9 +123,8 @@ public final class AsyncThrowingChannel: Asyn struct State { var emission: Emission = .idle var generation = 0 - var terminal = false } - + let state = ManagedCriticalState(State()) public init(_ elementType: Element.Type = Element.self) { } @@ -129,12 +145,9 @@ public final class AsyncThrowingChannel: Asyn func next(_ generation: Int) async throws -> Element? { return try await withUnsafeThrowingContinuation { continuation in var cancelled = false - var terminal = false + var potentialTermination: Termination? + state.withCriticalRegion { state -> UnsafeResumption?, Never>? in - if state.terminal { - terminal = true - return nil - } switch state.emission { case .idle: state.emission = .awaiting([Awaiting(generation: generation, continuation: continuation)]) @@ -158,53 +171,78 @@ public final class AsyncThrowingChannel: Asyn state.emission = .awaiting(nexts) } return nil + case .terminated(let termination): + potentialTermination = termination + state.emission = .terminated(.finished) + return nil } }?.resume() - if cancelled || terminal { + + if cancelled { continuation.resume(returning: nil) + return + } + + switch potentialTermination { + case .none: + return + case .failed(let error): + continuation.resume(throwing: error) + return + case .finished: + continuation.resume(returning: nil) + return } } } - - func finishAll() { + + func terminateAll(error: Failure? = nil) { let (sends, nexts) = state.withCriticalRegion { state -> ([UnsafeContinuation?, Never>], Set) in - if state.terminal { - return ([], []) + + let nextState: Emission + if let error = error { + nextState = .terminated(.failed(error)) + } else { + nextState = .terminated(.finished) } - state.terminal = true + switch state.emission { case .idle: + state.emission = nextState return ([], []) case .pending(let nexts): - state.emission = .idle + state.emission = nextState return (nexts, []) case .awaiting(let nexts): - state.emission = .idle + state.emission = nextState return ([], nexts) + case .terminated: + return ([], []) } } + for send in sends { send.resume(returning: nil) } - for next in nexts { - next.continuation?.resume(returning: nil) + + if let error = error { + for next in nexts { + next.continuation?.resume(throwing: error) + } + } else { + for next in nexts { + next.continuation?.resume(returning: nil) + } } + } - func _send(_ result: Result) async { + func _send(_ element: Element) async { await withTaskCancellationHandler { - finishAll() + terminateAll() } operation: { let continuation: UnsafeContinuation? = await withUnsafeContinuation { continuation in state.withCriticalRegion { state -> UnsafeResumption?, Never>? in - if state.terminal { - return UnsafeResumption(continuation: continuation, success: nil) - } - - if case .failure = result { - state.terminal = true - } - switch state.emission { case .idle: state.emission = .pending([continuation]) @@ -221,28 +259,32 @@ public final class AsyncThrowingChannel: Asyn state.emission = .awaiting(nexts) } return UnsafeResumption(continuation: continuation, success: next) + case .terminated: + return UnsafeResumption(continuation: continuation, success: nil) } }?.resume() } - continuation?.resume(with: result.map { $0 as Element? }) + continuation?.resume(returning: element) } } - /// Send an element to an awaiting iteration. This function will resume when the next call to `next()` is made. + /// Send an element to an awaiting iteration. This function will resume when the next call to `next()` is made + /// or when a call to `finish()`/`fail(_:)` is made from another Task. /// If the channel is already finished then this returns immediately public func send(_ element: Element) async { - await _send(.success(element)) + await _send(element) } - /// Send an error to an awaiting iteration. This function will resume when the next call to `next()` is made. - /// If the channel is already finished then this returns immediately - public func fail(_ error: Error) async where Failure == Error { - await _send(.failure(error)) + /// Send an error to all awaiting iterations. + /// All subsequent calls to `next(_:)` will resume immediately. + public func fail(_ error: Error) where Failure == Error { + terminateAll(error: error) } /// Send a finish to all awaiting iterations. + /// All subsequent calls to `next(_:)` will resume immediately. public func finish() { - finishAll() + terminateAll() } public func makeAsyncIterator() -> Iterator { diff --git a/Tests/AsyncAlgorithmsTests/TestChannel.swift b/Tests/AsyncAlgorithmsTests/TestChannel.swift index 891ec434..7d73467f 100644 --- a/Tests/AsyncAlgorithmsTests/TestChannel.swift +++ b/Tests/AsyncAlgorithmsTests/TestChannel.swift @@ -63,11 +63,12 @@ final class TestChannel: XCTestCase { XCTAssertEqual(collected, expected) } - func test_asyncThrowingChannel_throws_when_fail_is_called() async { + func test_asyncThrowingChannel_throws_and_discards_additional_sent_values_when_fail_is_called() async { + let sendImmediatelyResumes = expectation(description: "Send immediately resumes after fail") + let channel = AsyncThrowingChannel() - Task { - await channel.fail(Failure()) - } + channel.fail(Failure()) + var iterator = channel.makeAsyncIterator() do { let _ = try await iterator.next() @@ -75,6 +76,17 @@ final class TestChannel: XCTestCase { } catch { XCTAssertEqual(error as? Failure, Failure()) } + + do { + let pastFailure = try await iterator.next() + XCTAssertNil(pastFailure) + } catch { + XCTFail("The AsyncThrowingChannel should not fail when failure has already been fired") + } + + await channel.send("send") + sendImmediatelyResumes.fulfill() + wait(for: [sendImmediatelyResumes], timeout: 1.0) } func test_asyncChannel_ends_alls_iterators_and_discards_additional_sent_values_when_finish_is_called() async { From 6646601250ceafd989bcc1d8d80990975b8c0407 Mon Sep 17 00:00:00 2001 From: Thibault Wittemberg Date: Tue, 14 Jun 2022 21:26:53 +0200 Subject: [PATCH 2/4] asyncChannel: align naming on asyncThrowingChannel --- Sources/AsyncAlgorithms/AsyncChannel.swift | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/Sources/AsyncAlgorithms/AsyncChannel.swift b/Sources/AsyncAlgorithms/AsyncChannel.swift index 0f30a6f6..facdaadf 100644 --- a/Sources/AsyncAlgorithms/AsyncChannel.swift +++ b/Sources/AsyncAlgorithms/AsyncChannel.swift @@ -13,10 +13,12 @@ /// /// The `AsyncChannel` class is intended to be used as a communication type between tasks, /// particularly when one task produces values and another task consumes those values. The back -/// pressure applied by `send(_:)` and `finish()` via the suspension/resume ensures that -/// the production of values does not exceed the consumption of values from iteration. Each of these -/// methods suspends after enqueuing the event and is resumed when the next call to `next()` -/// on the `Iterator` is made. +/// pressure applied by `send(_:)` via the suspension/resume ensures that +/// the production of values does not exceed the consumption of values from iteration. This method +/// suspends after enqueuing the event and is resumed when the next call to `next()` +/// on the `Iterator` is made, or when `finish()` is called from another Task. +/// As `finish()` induces a terminal state, there is no need for a back pressure management. +/// This function does not suspend and will finish all the pending iterations. public final class AsyncChannel: AsyncSequence, Sendable { /// The iterator for a `AsyncChannel` instance. public struct Iterator: AsyncIteratorProtocol, Sendable { @@ -168,7 +170,7 @@ public final class AsyncChannel: AsyncSequence, Sendable { } } - func finishAll() { + func terminateAll() { let (sends, nexts) = state.withCriticalRegion { state -> ([UnsafeContinuation?, Never>], Set) in if state.terminal { return ([], []) @@ -195,7 +197,7 @@ public final class AsyncChannel: AsyncSequence, Sendable { func _send(_ element: Element) async { await withTaskCancellationHandler { - finishAll() + terminateAll() } operation: { let continuation: UnsafeContinuation? = await withUnsafeContinuation { continuation in state.withCriticalRegion { state -> UnsafeResumption?, Never>? in @@ -225,15 +227,17 @@ public final class AsyncChannel: AsyncSequence, Sendable { } } - /// Send an element to an awaiting iteration. This function will resume when the next call to `next()` is made. + /// Send an element to an awaiting iteration. This function will resume when the next call to `next()` is made + /// or when a call to `finish()` is made from another Task. /// If the channel is already finished then this returns immediately public func send(_ element: Element) async { await _send(element) } /// Send a finish to all awaiting iterations. + /// All subsequent calls to `next(_:)` will resume immediately. public func finish() { - finishAll() + terminateAll() } /// Create an `Iterator` for iteration of an `AsyncChannel` From 6a61bab01c4d910c86600d4a4a314a0107ac377b Mon Sep 17 00:00:00 2001 From: Thibault Wittemberg Date: Tue, 14 Jun 2022 21:37:39 +0200 Subject: [PATCH 3/4] asyncChannel: remove a test base on Task.sleep --- Tests/AsyncAlgorithmsTests/TestChannel.swift | 57 -------------------- 1 file changed, 57 deletions(-) diff --git a/Tests/AsyncAlgorithmsTests/TestChannel.swift b/Tests/AsyncAlgorithmsTests/TestChannel.swift index 7d73467f..66fd1e1d 100644 --- a/Tests/AsyncAlgorithmsTests/TestChannel.swift +++ b/Tests/AsyncAlgorithmsTests/TestChannel.swift @@ -144,63 +144,6 @@ final class TestChannel: XCTestCase { wait(for: [additionalSend], timeout: 1.0) } - func test_asyncChannel_ends_alls_iterators_and_discards_additional_sent_values_when_finish_is_called2() async throws { - let channel = AsyncChannel() - let complete = ManagedCriticalState(false) - let finished = expectation(description: "finished") - - let valueFromConsumer1 = ManagedCriticalState(nil) - let valueFromConsumer2 = ManagedCriticalState(nil) - - let received = expectation(description: "received") - received.expectedFulfillmentCount = 2 - - let pastEnd = expectation(description: "pastEnd") - pastEnd.expectedFulfillmentCount = 2 - - Task(priority: .high) { - var iterator = channel.makeAsyncIterator() - let ending = await iterator.next() - valueFromConsumer1.withCriticalRegion { $0 = ending } - received.fulfill() - let item = await iterator.next() - XCTAssertNil(item) - pastEnd.fulfill() - } - - Task(priority: .high) { - var iterator = channel.makeAsyncIterator() - let ending = await iterator.next() - valueFromConsumer2.withCriticalRegion { $0 = ending } - received.fulfill() - let item = await iterator.next() - XCTAssertNil(item) - pastEnd.fulfill() - } - - try await Task.sleep(nanoseconds: 1_000_000_000) - - Task(priority: .low) { - channel.finish() - complete.withCriticalRegion { $0 = true } - finished.fulfill() - } - - wait(for: [finished, received], timeout: 1.0) - - XCTAssertTrue(complete.withCriticalRegion { $0 }) - XCTAssertEqual(valueFromConsumer1.withCriticalRegion { $0 }, nil) - XCTAssertEqual(valueFromConsumer2.withCriticalRegion { $0 }, nil) - - wait(for: [pastEnd], timeout: 1.0) - let additionalSend = expectation(description: "additional send") - Task { - await channel.send("test") - additionalSend.fulfill() - } - wait(for: [additionalSend], timeout: 1.0) - } - func test_asyncThrowingChannel_ends_alls_iterators_and_discards_additional_sent_values_when_finish_is_called() async { let channel = AsyncThrowingChannel() let complete = ManagedCriticalState(false) From 4831b0ffe2026a11799a3614ce898d10d2d332de Mon Sep 17 00:00:00 2001 From: Thibault Wittemberg Date: Tue, 14 Jun 2022 21:43:32 +0200 Subject: [PATCH 4/4] guides: update Channel with non async fail(_:) --- .../AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Channel.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Channel.md b/Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Channel.md index 974903b5..5121c769 100644 --- a/Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Channel.md +++ b/Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Channel.md @@ -44,14 +44,14 @@ public final class AsyncThrowingChannel: Asyn public init(element elementType: Element.Type = Element.self, failure failureType: Failure.Type = Failure.self) public func send(_ element: Element) async - public func fail(_ error: Error) async where Failure == Error + public func fail(_ error: Error) where Failure == Error public func finish() public func makeAsyncIterator() -> Iterator } ``` -Channels are intended to be used as communication types between tasks. Particularly when one task produces values and another task consumes said values. On the one hand, the back pressure applied by `send(_:)` and `fail(_:)` via the suspension/resume ensure that the production of values does not exceed the consumption of values from iteration. Each of these methods suspend after enqueuing the event and are resumed when the next call to `next()` on the `Iterator` is made. On the other hand, the call to `finish()` immediately resumes all the pending operations for every producers and consumers. Thus, every suspended `send(_:)` operations instantly resume, so as every suspended `next()` operations by producing a nil value, indicating the termination of the iterations. Further calls to `send(_:)` will immediately resume. +Channels are intended to be used as communication types between tasks. Particularly when one task produces values and another task consumes said values. On the one hand, the back pressure applied by `send(_:)` via the suspension/resume ensures that the production of values does not exceed the consumption of values from iteration. This method suspends after enqueuing the event and is resumed when the next call to `next()` on the `Iterator` is made. On the other hand, the call to `finish()` or `fail(_:)` immediately resumes all the pending operations for every producers and consumers. Thus, every suspended `send(_:)` operations instantly resume, so as every suspended `next()` operations by producing a nil value, or by throwing an error, indicating the termination of the iterations. Further calls to `send(_:)` will immediately resume. ```swift let channel = AsyncChannel()