Skip to content

Does AsyncChannel supports multiple producers/consumers? #136

@laclouis5

Description

@laclouis5

I was playing with AsyncChannel and tried to implement a multiple producers multiple consumers pattern but stumbled on an issue: I wasn't able to terminate the async iteration of all consumers. When there is one consumer the solution is simple, just call AsyncChannel.finish() at the end of the production to send a nil value to the only consumer which then stops.

However, when there are multiple consumers all of them should receive a nil value from the producers. To achieve this I tried to call .finish() as many time as there are consumers but only one stopped.

Is this intended behavior, i.e. an AsyncChannel should only be used with one consumer, or is this a bug? Please see the code snippet bellow to reproduce it.

System:
MacBook Air early 2015
macOS v12.3.1
Xcode v13.3
Swift v5.6

PS: I just read in the documentation that .finish() returns immediately if the channel is already finished, so this means that calling it multiple times won't work for multiple consumers.

import Foundation
import AsyncAlgorithms

func producer(channel: AsyncChannel<Int>, id: Int) async {
    for _ in 0..<3 {
        await channel.send(id)
    }
}

/// Spawns `count` producers
func produce(channel: AsyncChannel<Int>, count: Int) async {
    await withTaskGroup(of: Void.self) { group in
        for id in 0..<count {
            group.addTask {
                await producer(channel: channel, id: id)
            }
        }
    }
    
    for _ in 0..<count {
        await channel.finish()
    }
}

func consumer(channel: AsyncChannel<Int>, id: Int) async {
    for await value in channel {
        try? await Task.sleep(nanoseconds: 1_000_000_000)
        print("Consumer \(id) consumed value produce by producer \(value)")
    }
    print("Consumer \(id) terminated")  // Only one consumer is stopped
}

/// Spawns `count` consumers
func consume(channel: AsyncChannel<Int>, count: Int) async {
    await withTaskGroup(of: Void.self) { group in
        for id in 0..<count {
            group.addTask {
                await consumer(channel: channel, id: id)
            }
        }
    }
}

@main
enum App {
    static func main() async throws {
        let channel = AsyncChannel<Int>()
        
        async let p: Void = produce(channel: channel, count: 4)
        async let c: Void = consume(channel: channel, count: 4)
        
        _ = await (p, c)
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    ResolvedQuestion answered or otherwise resolved

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions