Skip to content

support amazonlinux2 & many fixes #228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ let package = Package(

// `AsyncProcess` modules and dependencies

.target(name: "CProcessSpawnSync"),
.target(
name: "CProcessSpawnSync",
cSettings: [
.define("_GNU_SOURCE")
]
),
.target(
name: "ProcessSpawnSync",
dependencies: [
Expand Down
38 changes: 31 additions & 7 deletions Sources/AsyncProcess/ChunkSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,41 @@ public struct IllegalStreamConsumptionError: Error {
}

public struct ChunkSequence: AsyncSequence & Sendable {
private let fileHandle: FileHandle?
private let group: EventLoopGroup
private let contentStream: FileContentStream?

public init(takingOwnershipOfFileHandle fileHandle: FileHandle?, group: EventLoopGroup) {
self.group = group
self.fileHandle = fileHandle
public init(
takingOwnershipOfFileHandle fileHandle: FileHandle,
group: EventLoopGroup
) async throws {
// This will close the fileHandle
let contentStream = try await fileHandle.fileContentStream(eventLoop: group.any())
self.init(contentStream: contentStream)
}

internal func isSameAs(_ other: ChunkSequence) -> Bool {
guard let myContentStream = self.contentStream else {
return other.contentStream == nil
}
guard let otherContentStream = other.contentStream else {
return self.contentStream == nil
}
return myContentStream.isSameAs(otherContentStream)
}

public func close() async throws {
try await self.contentStream?.close()
}

private init(contentStream: FileContentStream?) {
self.contentStream = contentStream
}

public static func makeEmptyStream() -> Self {
return Self.init(contentStream: nil)
}

public func makeAsyncIterator() -> AsyncIterator {
// This will close the file handle.
return AsyncIterator(try! self.fileHandle?.fileContentStream(eventLoop: group.any()))
return AsyncIterator(self.contentStream)
}

public typealias Element = ByteBuffer
Expand Down
77 changes: 59 additions & 18 deletions Sources/AsyncProcess/FileContentStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,25 @@ public struct _FileContentStream: AsyncSequence & Sendable {
public typealias Element = ByteBuffer
typealias Underlying = AsyncThrowingChannel<Element, Error>

public func makeAsyncIterator() -> AsyncIterator {
return AsyncIterator(underlying: self.asyncChannel.makeAsyncIterator())
}

public struct AsyncIterator: AsyncIteratorProtocol {
public final class AsyncIterator: AsyncIteratorProtocol {
public typealias Element = ByteBuffer

deinit {
// This is painful and so wrong but unfortunately, our iterators don't have a cancel signal, so the only
// thing we can do is hope for `deinit` to be invoked :(.
// AsyncIteratorProtocol also doesn't support `~Copyable` so we also have to make this a class.
self.channel?.close(promise: nil)
}

init(underlying: Underlying.AsyncIterator, channel: (any Channel)?) {
self.underlying = underlying
self.channel = channel
}

var underlying: Underlying.AsyncIterator
let channel: (any Channel)?

public mutating func next() async throws -> ByteBuffer? {
public func next() async throws -> ByteBuffer? {
return try await self.underlying.next()
}
}
Expand All @@ -47,22 +56,41 @@ public struct _FileContentStream: AsyncSequence & Sendable {
}

private let asyncChannel: AsyncThrowingChannel<ByteBuffer, Error>
private let channel: (any Channel)?

internal func isSameAs(_ other: FileContentStream) -> Bool {
return (self.asyncChannel === other.asyncChannel) && (self.channel === other.channel)
}

public func makeAsyncIterator() -> AsyncIterator {
return AsyncIterator(
underlying: self.asyncChannel.makeAsyncIterator(),
channel: self.channel
)
}

public func close() async throws {
self.asyncChannel.finish()
do {
try await self.channel?.close().get()
} catch ChannelError.alreadyClosed {
// That's okay
}
}

public static func makeReader(
fileDescriptor: CInt,
eventLoop: EventLoop = MultiThreadedEventLoopGroup.singleton.any(),
blockingPool: NIOThreadPool = .singleton
) async throws -> _FileContentStream {
return try await eventLoop.submit {
try FileContentStream(fileDescriptor: fileDescriptor, eventLoop: eventLoop, blockingPool: blockingPool)
}.get()
try await FileContentStream(fileDescriptor: fileDescriptor, eventLoop: eventLoop, blockingPool: blockingPool)
}

internal init(
fileDescriptor: CInt,
eventLoop: EventLoop,
blockingPool: NIOThreadPool? = nil
) throws {
) async throws {
var statInfo: stat = .init()
let statError = fstat(fileDescriptor, &statInfo)
if statError != 0 {
Expand Down Expand Up @@ -103,23 +131,36 @@ public struct _FileContentStream: AsyncSequence & Sendable {
asyncChannel.finish()
}
}
self.channel = nil
case S_IFSOCK:
_ = ClientBootstrap(group: eventLoop)
self.channel = try await ClientBootstrap(group: eventLoop)
.channelInitializer { channel in
channel.pipeline.addHandler(ReadIntoAsyncChannelHandler(sink: asyncChannel))
do {
try channel.pipeline.syncOperations.addHandler(ReadIntoAsyncChannelHandler(sink: asyncChannel))
return channel.eventLoop.makeSucceededFuture(())
} catch {
return channel.eventLoop.makeFailedFuture(error)
}
}
.withConnectedSocket(dupedFD)
.get()
case S_IFIFO:
NIOPipeBootstrap(group: eventLoop)
self.channel = try await NIOPipeBootstrap(group: eventLoop)
.channelInitializer { channel in
channel.pipeline.addHandler(ReadIntoAsyncChannelHandler(sink: asyncChannel))
do {
try channel.pipeline.syncOperations.addHandler(ReadIntoAsyncChannelHandler(sink: asyncChannel))
return channel.eventLoop.makeSucceededFuture(())
} catch {
return channel.eventLoop.makeFailedFuture(error)
}
}
.takingOwnershipOfDescriptor(
input: dupedFD
)
.whenSuccess { channel in
.map { channel in
channel.close(mode: .output, promise: nil)
}
return channel
}.get()
case S_IFDIR:
throw IOError(errnoValue: EISDIR)
case S_IFBLK, S_IFCHR, S_IFLNK:
Expand Down Expand Up @@ -265,8 +306,8 @@ private final class ReadIntoAsyncChannelHandler: ChannelDuplexHandler {
}

extension FileHandle {
func fileContentStream(eventLoop: EventLoop) throws -> FileContentStream {
let asyncBytes = try FileContentStream(fileDescriptor: self.fileDescriptor, eventLoop: eventLoop)
func fileContentStream(eventLoop: EventLoop) async throws -> FileContentStream {
let asyncBytes = try await FileContentStream(fileDescriptor: self.fileDescriptor, eventLoop: eventLoop)
try self.close()
return asyncBytes
}
Expand Down
27 changes: 22 additions & 5 deletions Sources/AsyncProcess/NIOAsyncPipeWriter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct NIOAsyncPipeWriter<Chunks: AsyncSequence & Sendable> where Chunks.Element
static func sinkSequenceInto(
_ chunks: Chunks,
takingOwnershipOfFD fd: CInt,
ignoreWriteErrors: Bool,
eventLoop: EventLoop
) async throws {
let channel = try await NIOPipeBootstrap(group: eventLoop)
Expand All @@ -26,11 +27,27 @@ struct NIOAsyncPipeWriter<Chunks: AsyncSequence & Sendable> where Chunks.Element
output: fd
).get()
channel.close(mode: .input, promise: nil)
defer {
channel.close(promise: nil)
}
for try await chunk in chunks {
try await channel.writeAndFlush(chunk).get()
return try await asyncDo {
try await withTaskCancellationHandler {
for try await chunk in chunks {
do {
try await channel.writeAndFlush(chunk).get()
} catch {
if !ignoreWriteErrors {
throw error
}
break
}
}
} onCancel: {
channel.close(promise: nil)
}
} finally: { _ in
do {
try await channel.close()
} catch ChannelError.alreadyClosed {
// ok
}
}
}
}
34 changes: 30 additions & 4 deletions Sources/AsyncProcess/ProcessExecutor+Convenience.swift
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,33 @@ extension ProcessExecutor {
}

public struct ProcessExitReasonAndOutput: Sendable & Hashable {
public func hash(into hasher: inout Hasher) {
self.exitReason.hash(into: &hasher)
self.standardOutput.hash(into: &hasher)
self.standardError.hash(into: &hasher)
(self.standardInputWriteError == nil).hash(into: &hasher)
}

public static func == (
lhs: ProcessExecutor.ProcessExitReasonAndOutput,
rhs: ProcessExecutor.ProcessExitReasonAndOutput
) -> Bool {
return lhs.exitReason == rhs.exitReason && lhs.standardOutput == rhs.standardOutput
&& lhs.standardError == rhs.standardError
&& (lhs.standardInputWriteError == nil) == (rhs.standardInputWriteError == nil)
}

public var exitReason: ProcessExitReason

/// Any errors that occurred whilst writing the provided `standardInput` sequence into the child process' standard input.
public var standardInputWriteError: Optional<any Error>

public var standardOutput: ByteBuffer?
public var standardError: ByteBuffer?
}

internal enum ProcessExitInformationPiece {
case exitReason(ProcessExitReason)
case exitReason(ProcessExitExtendedInfo)
case standardOutput(ByteBuffer?)
case standardError(ByteBuffer?)
}
Expand Down Expand Up @@ -319,14 +339,20 @@ extension ProcessExecutor {
}

group.addTask {
return .exitReason(try await exe.run())
return .exitReason(try await exe.runWithExtendedInfo())
}

var allInfo = ProcessExitReasonAndOutput(exitReason: .exit(-1), standardOutput: nil, standardError: nil)
var allInfo = ProcessExitReasonAndOutput(
exitReason: .exit(-1),
standardInputWriteError: nil,
standardOutput: nil,
standardError: nil
)
while let next = try await group.next() {
switch next {
case .exitReason(let exitReason):
allInfo.exitReason = exitReason
allInfo.exitReason = exitReason.exitReason
allInfo.standardInputWriteError = exitReason.standardInputWriteError
case .standardOutput(let output):
allInfo.standardOutput = output
case .standardError(let output):
Expand Down
Loading