Skip to content

[Concurrency]: call AsyncStream's onCancel at most once #83190

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions stdlib/public/Concurrency/AsyncStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ public struct AsyncStream<Element> {
) {
let storage: _AsyncStreamCriticalStorage<Optional<() async -> Element?>>
= .create(produce)

let cancelStorage: _AsyncStreamCriticalStorage<(() -> Void)?>?
= if let onCancel { .create(onCancel) } else { nil }
Comment on lines +342 to +343
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure introducing another lock is the ideal approach, but it seems at least serviceable. it wasn't immediately obvious to me if _AsyncStreamCriticalStorage supports a fully generic payload, or if it's supposed to contain only a closure (or closure-sized generic parameter?) per this comment. also i tried changing to use Mutex directly, but could not get the tests to successfully build, so unsure if that approach would be an option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd try to make a _Storage struct and make it have onCancel as well as the other state in there; so we don't have to have separate locks like this


context = _Context {
return await withTaskCancellationHandler {
guard let result = await storage.value?() else {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think there might also be a 'philosophical' question here about what the appropriate behavior is if we know that the cancel handler has been invoked during a suspension of the produce closure. should we return a non-nil value in such a case, or wait for a subsequent call to next() to do so? i suppose clients could achieve whatever behavior they prefer under the current implementation, but could not 'return the last value produced during a concurrent cancellation' if we changed things, so maybe it's better to leave it as-is?

Expand All @@ -347,6 +351,7 @@ public struct AsyncStream<Element> {
return result
} onCancel: {
storage.value = nil
let onCancel = cancelStorage?.access { $0.take() }
onCancel?()
}
}
Expand Down
6 changes: 6 additions & 0 deletions stdlib/public/Concurrency/AsyncStreamBuffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,12 @@ final class _AsyncStreamCriticalStorage<Contents>: @unchecked Sendable {
}
}

func access<Result>(_ body: (inout Contents) -> Result) -> Result {
lock()
defer { unlock() }
return body(&_value)
}

static func create(_ initial: Contents) -> _AsyncStreamCriticalStorage {
let minimumCapacity = _lockWordCount()
let storage = unsafe Builtin.allocWithTailElems_1(
Expand Down
51 changes: 51 additions & 0 deletions test/Concurrency/Runtime/async_stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,57 @@ class NotSendable {}
_ = await consumer2.value
}

// MARK: - Unfolding

tests.test("unfolding stream calls onCancel at most once") { @MainActor in
nonisolated(unsafe) var cancelCallbackCount = 0

let (innerControlStream, _) = AsyncStream<String>.makeStream()
let (outerControlStream, outerContinuation) = AsyncStream<String>.makeStream()

let task = Task { @MainActor in
let stream = AsyncStream<Int> { @MainActor in
outerContinuation.yield("started")
do {
var iter = innerControlStream.makeAsyncIterator()
let next = await iter.next()
assert(next == nil) // should only return from Task cancellation
}
return 42
} onCancel: {
cancelCallbackCount += 1
}

for await value in stream {
_ = value
}
}

// wait for unfolding closure to start
do {
var iter = outerControlStream.makeAsyncIterator()
let next = await iter.next()
assert(next == "started")
}

// ensure iterator is suspended
await MainActor.run {}

// cancel task
task.cancel()

// cancel callback should be invoked
expectEqual(cancelCallbackCount, 1)

// ensure task completes
_ = await task.value

// check that the cancel callback wasn't invoked again
expectEqual(cancelCallbackCount, 1)
}

// MARK: -

await runAllTestsAsync()
}
}
Expand Down