-
Notifications
You must be signed in to change notification settings - Fork 170
Description
I was playing with AsyncChannel
and tried to implement a multiple producers multiple consumers pattern but stumbled on an issue: I wasn't able to terminate the async iteration of all consumers. When there is one consumer the solution is simple, just call AsyncChannel.finish()
at the end of the production to send a nil
value to the only consumer which then stops.
However, when there are multiple consumers all of them should receive a nil
value from the producers. To achieve this I tried to call .finish()
as many time as there are consumers but only one stopped.
Is this intended behavior, i.e. an AsyncChannel
should only be used with one consumer, or is this a bug? Please see the code snippet bellow to reproduce it.
System:
MacBook Air early 2015
macOS v12.3.1
Xcode v13.3
Swift v5.6
PS: I just read in the documentation that .finish()
returns immediately if the channel is already finished, so this means that calling it multiple times won't work for multiple consumers.
import Foundation
import AsyncAlgorithms
func producer(channel: AsyncChannel<Int>, id: Int) async {
for _ in 0..<3 {
await channel.send(id)
}
}
/// Spawns `count` producers
func produce(channel: AsyncChannel<Int>, count: Int) async {
await withTaskGroup(of: Void.self) { group in
for id in 0..<count {
group.addTask {
await producer(channel: channel, id: id)
}
}
}
for _ in 0..<count {
await channel.finish()
}
}
func consumer(channel: AsyncChannel<Int>, id: Int) async {
for await value in channel {
try? await Task.sleep(nanoseconds: 1_000_000_000)
print("Consumer \(id) consumed value produce by producer \(value)")
}
print("Consumer \(id) terminated") // Only one consumer is stopped
}
/// Spawns `count` consumers
func consume(channel: AsyncChannel<Int>, count: Int) async {
await withTaskGroup(of: Void.self) { group in
for id in 0..<count {
group.addTask {
await consumer(channel: channel, id: id)
}
}
}
}
@main
enum App {
static func main() async throws {
let channel = AsyncChannel<Int>()
async let p: Void = produce(channel: channel, count: 4)
async let c: Void = consume(channel: channel, count: 4)
_ = await (p, c)
}
}