From ad240943198418ea99cd4ce8cb99caa9bd5d2af4 Mon Sep 17 00:00:00 2001 From: Johannes Weiss Date: Tue, 15 Jul 2025 12:35:37 +0100 Subject: [PATCH] support amazonlinux2 & many fixes --- Package.swift | 7 +- Sources/AsyncProcess/ChunkSequence.swift | 38 +- Sources/AsyncProcess/FileContentStream.swift | 77 +- Sources/AsyncProcess/NIOAsyncPipeWriter.swift | 27 +- .../ProcessExecutor+Convenience.swift | 34 +- Sources/AsyncProcess/ProcessExecutor.swift | 363 +++++++--- Sources/AsyncProcess/ProcessExit.swift | 8 + .../StructuredConcurrencyHelpers.swift | 95 +++ Sources/CProcessSpawnSync/internal-helpers.h | 83 ++- Sources/CProcessSpawnSync/spawner.c | 4 +- Sources/ProcessSpawnSync/ProcessSpawner.swift | 124 +++- .../AsyncProcessTests/IntegrationTests.swift | 668 +++++++++++++++++- 12 files changed, 1367 insertions(+), 161 deletions(-) create mode 100644 Sources/AsyncProcess/StructuredConcurrencyHelpers.swift diff --git a/Package.swift b/Package.swift index 78f88e9..de5966f 100644 --- a/Package.swift +++ b/Package.swift @@ -79,7 +79,12 @@ let package = Package( // `AsyncProcess` modules and dependencies - .target(name: "CProcessSpawnSync"), + .target( + name: "CProcessSpawnSync", + cSettings: [ + .define("_GNU_SOURCE") + ] + ), .target( name: "ProcessSpawnSync", dependencies: [ diff --git a/Sources/AsyncProcess/ChunkSequence.swift b/Sources/AsyncProcess/ChunkSequence.swift index f36d0d1..eadf140 100644 --- a/Sources/AsyncProcess/ChunkSequence.swift +++ b/Sources/AsyncProcess/ChunkSequence.swift @@ -23,17 +23,41 @@ public struct IllegalStreamConsumptionError: Error { } public struct ChunkSequence: AsyncSequence & Sendable { - private let fileHandle: FileHandle? - private let group: EventLoopGroup + private let contentStream: FileContentStream? - public init(takingOwnershipOfFileHandle fileHandle: FileHandle?, group: EventLoopGroup) { - self.group = group - self.fileHandle = fileHandle + public init( + takingOwnershipOfFileHandle fileHandle: FileHandle, + group: EventLoopGroup + ) async throws { + // This will close the fileHandle + let contentStream = try await fileHandle.fileContentStream(eventLoop: group.any()) + self.init(contentStream: contentStream) + } + + internal func isSameAs(_ other: ChunkSequence) -> Bool { + guard let myContentStream = self.contentStream else { + return other.contentStream == nil + } + guard let otherContentStream = other.contentStream else { + return self.contentStream == nil + } + return myContentStream.isSameAs(otherContentStream) + } + + public func close() async throws { + try await self.contentStream?.close() + } + + private init(contentStream: FileContentStream?) { + self.contentStream = contentStream + } + + public static func makeEmptyStream() -> Self { + return Self.init(contentStream: nil) } public func makeAsyncIterator() -> AsyncIterator { - // This will close the file handle. - return AsyncIterator(try! self.fileHandle?.fileContentStream(eventLoop: group.any())) + return AsyncIterator(self.contentStream) } public typealias Element = ByteBuffer diff --git a/Sources/AsyncProcess/FileContentStream.swift b/Sources/AsyncProcess/FileContentStream.swift index 50e1cf4..e8d7bc3 100644 --- a/Sources/AsyncProcess/FileContentStream.swift +++ b/Sources/AsyncProcess/FileContentStream.swift @@ -24,16 +24,25 @@ public struct _FileContentStream: AsyncSequence & Sendable { public typealias Element = ByteBuffer typealias Underlying = AsyncThrowingChannel - public func makeAsyncIterator() -> AsyncIterator { - return AsyncIterator(underlying: self.asyncChannel.makeAsyncIterator()) - } - - public struct AsyncIterator: AsyncIteratorProtocol { + public final class AsyncIterator: AsyncIteratorProtocol { public typealias Element = ByteBuffer + deinit { + // This is painful and so wrong but unfortunately, our iterators don't have a cancel signal, so the only + // thing we can do is hope for `deinit` to be invoked :(. + // AsyncIteratorProtocol also doesn't support `~Copyable` so we also have to make this a class. + self.channel?.close(promise: nil) + } + + init(underlying: Underlying.AsyncIterator, channel: (any Channel)?) { + self.underlying = underlying + self.channel = channel + } + var underlying: Underlying.AsyncIterator + let channel: (any Channel)? - public mutating func next() async throws -> ByteBuffer? { + public func next() async throws -> ByteBuffer? { return try await self.underlying.next() } } @@ -47,22 +56,41 @@ public struct _FileContentStream: AsyncSequence & Sendable { } private let asyncChannel: AsyncThrowingChannel + private let channel: (any Channel)? + + internal func isSameAs(_ other: FileContentStream) -> Bool { + return (self.asyncChannel === other.asyncChannel) && (self.channel === other.channel) + } + + public func makeAsyncIterator() -> AsyncIterator { + return AsyncIterator( + underlying: self.asyncChannel.makeAsyncIterator(), + channel: self.channel + ) + } + + public func close() async throws { + self.asyncChannel.finish() + do { + try await self.channel?.close().get() + } catch ChannelError.alreadyClosed { + // That's okay + } + } public static func makeReader( fileDescriptor: CInt, eventLoop: EventLoop = MultiThreadedEventLoopGroup.singleton.any(), blockingPool: NIOThreadPool = .singleton ) async throws -> _FileContentStream { - return try await eventLoop.submit { - try FileContentStream(fileDescriptor: fileDescriptor, eventLoop: eventLoop, blockingPool: blockingPool) - }.get() + try await FileContentStream(fileDescriptor: fileDescriptor, eventLoop: eventLoop, blockingPool: blockingPool) } internal init( fileDescriptor: CInt, eventLoop: EventLoop, blockingPool: NIOThreadPool? = nil - ) throws { + ) async throws { var statInfo: stat = .init() let statError = fstat(fileDescriptor, &statInfo) if statError != 0 { @@ -103,23 +131,36 @@ public struct _FileContentStream: AsyncSequence & Sendable { asyncChannel.finish() } } + self.channel = nil case S_IFSOCK: - _ = ClientBootstrap(group: eventLoop) + self.channel = try await ClientBootstrap(group: eventLoop) .channelInitializer { channel in - channel.pipeline.addHandler(ReadIntoAsyncChannelHandler(sink: asyncChannel)) + do { + try channel.pipeline.syncOperations.addHandler(ReadIntoAsyncChannelHandler(sink: asyncChannel)) + return channel.eventLoop.makeSucceededFuture(()) + } catch { + return channel.eventLoop.makeFailedFuture(error) + } } .withConnectedSocket(dupedFD) + .get() case S_IFIFO: - NIOPipeBootstrap(group: eventLoop) + self.channel = try await NIOPipeBootstrap(group: eventLoop) .channelInitializer { channel in - channel.pipeline.addHandler(ReadIntoAsyncChannelHandler(sink: asyncChannel)) + do { + try channel.pipeline.syncOperations.addHandler(ReadIntoAsyncChannelHandler(sink: asyncChannel)) + return channel.eventLoop.makeSucceededFuture(()) + } catch { + return channel.eventLoop.makeFailedFuture(error) + } } .takingOwnershipOfDescriptor( input: dupedFD ) - .whenSuccess { channel in + .map { channel in channel.close(mode: .output, promise: nil) - } + return channel + }.get() case S_IFDIR: throw IOError(errnoValue: EISDIR) case S_IFBLK, S_IFCHR, S_IFLNK: @@ -265,8 +306,8 @@ private final class ReadIntoAsyncChannelHandler: ChannelDuplexHandler { } extension FileHandle { - func fileContentStream(eventLoop: EventLoop) throws -> FileContentStream { - let asyncBytes = try FileContentStream(fileDescriptor: self.fileDescriptor, eventLoop: eventLoop) + func fileContentStream(eventLoop: EventLoop) async throws -> FileContentStream { + let asyncBytes = try await FileContentStream(fileDescriptor: self.fileDescriptor, eventLoop: eventLoop) try self.close() return asyncBytes } diff --git a/Sources/AsyncProcess/NIOAsyncPipeWriter.swift b/Sources/AsyncProcess/NIOAsyncPipeWriter.swift index cbf4120..c36d173 100644 --- a/Sources/AsyncProcess/NIOAsyncPipeWriter.swift +++ b/Sources/AsyncProcess/NIOAsyncPipeWriter.swift @@ -17,6 +17,7 @@ struct NIOAsyncPipeWriter where Chunks.Element static func sinkSequenceInto( _ chunks: Chunks, takingOwnershipOfFD fd: CInt, + ignoreWriteErrors: Bool, eventLoop: EventLoop ) async throws { let channel = try await NIOPipeBootstrap(group: eventLoop) @@ -26,11 +27,27 @@ struct NIOAsyncPipeWriter where Chunks.Element output: fd ).get() channel.close(mode: .input, promise: nil) - defer { - channel.close(promise: nil) - } - for try await chunk in chunks { - try await channel.writeAndFlush(chunk).get() + return try await asyncDo { + try await withTaskCancellationHandler { + for try await chunk in chunks { + do { + try await channel.writeAndFlush(chunk).get() + } catch { + if !ignoreWriteErrors { + throw error + } + break + } + } + } onCancel: { + channel.close(promise: nil) + } + } finally: { _ in + do { + try await channel.close() + } catch ChannelError.alreadyClosed { + // ok + } } } } diff --git a/Sources/AsyncProcess/ProcessExecutor+Convenience.swift b/Sources/AsyncProcess/ProcessExecutor+Convenience.swift index f91e6f9..ea09a1f 100644 --- a/Sources/AsyncProcess/ProcessExecutor+Convenience.swift +++ b/Sources/AsyncProcess/ProcessExecutor+Convenience.swift @@ -235,13 +235,33 @@ extension ProcessExecutor { } public struct ProcessExitReasonAndOutput: Sendable & Hashable { + public func hash(into hasher: inout Hasher) { + self.exitReason.hash(into: &hasher) + self.standardOutput.hash(into: &hasher) + self.standardError.hash(into: &hasher) + (self.standardInputWriteError == nil).hash(into: &hasher) + } + + public static func == ( + lhs: ProcessExecutor.ProcessExitReasonAndOutput, + rhs: ProcessExecutor.ProcessExitReasonAndOutput + ) -> Bool { + return lhs.exitReason == rhs.exitReason && lhs.standardOutput == rhs.standardOutput + && lhs.standardError == rhs.standardError + && (lhs.standardInputWriteError == nil) == (rhs.standardInputWriteError == nil) + } + public var exitReason: ProcessExitReason + + /// Any errors that occurred whilst writing the provided `standardInput` sequence into the child process' standard input. + public var standardInputWriteError: Optional + public var standardOutput: ByteBuffer? public var standardError: ByteBuffer? } internal enum ProcessExitInformationPiece { - case exitReason(ProcessExitReason) + case exitReason(ProcessExitExtendedInfo) case standardOutput(ByteBuffer?) case standardError(ByteBuffer?) } @@ -319,14 +339,20 @@ extension ProcessExecutor { } group.addTask { - return .exitReason(try await exe.run()) + return .exitReason(try await exe.runWithExtendedInfo()) } - var allInfo = ProcessExitReasonAndOutput(exitReason: .exit(-1), standardOutput: nil, standardError: nil) + var allInfo = ProcessExitReasonAndOutput( + exitReason: .exit(-1), + standardInputWriteError: nil, + standardOutput: nil, + standardError: nil + ) while let next = try await group.next() { switch next { case .exitReason(let exitReason): - allInfo.exitReason = exitReason + allInfo.exitReason = exitReason.exitReason + allInfo.standardInputWriteError = exitReason.standardInputWriteError case .standardOutput(let output): allInfo.standardOutput = output case .standardError(let output): diff --git a/Sources/AsyncProcess/ProcessExecutor.swift b/Sources/AsyncProcess/ProcessExecutor.swift index 73079a9..b826108 100644 --- a/Sources/AsyncProcess/ProcessExecutor.swift +++ b/Sources/AsyncProcess/ProcessExecutor.swift @@ -12,7 +12,6 @@ import AsyncAlgorithms import Atomics -import Foundation import Logging import NIO import ProcessSpawnSync @@ -31,6 +30,13 @@ import ProcessSpawnSync typealias Process = PSProcess #endif +#if os(iOS) || os(tvOS) || os(watchOS) + // Process & fork/exec unavailable + #error("Process and fork() unavailable") +#else + import Foundation +#endif + public struct ProcessOutputStream: Sendable & Hashable & CustomStringConvertible { internal enum Backing { case standardOutput @@ -154,6 +160,28 @@ private struct AnyAsyncSequence: AsyncSequence & Sendable where Element } } +internal enum ChildFileState: Sendable { + case inherit + case devNull + case ownedHandle(FileHandle) + case unownedHandle(FileHandle) + + var handleIfOwned: FileHandle? { + switch self { + case .inherit, .devNull, .unownedHandle: + return nil + case .ownedHandle(let handle): + return handle + } + } +} + +enum Streaming { + case toBeStreamed(FileHandle, EventLoopPromise) + case preparing(EventLoopFuture) + case streaming(ChunkSequence) +} + /// Execute a sub-process. /// /// - warning: Currently, the default for `standardOutput` & `standardError` is ``ProcessOutput.stream`` which means @@ -166,16 +194,14 @@ public final actor ProcessExecutor { private let arguments: [String] private let environment: [String: String] private let standardInput: AnyAsyncSequence - private let standardInputPipe: Pipe? - private let standardOutputWriteHandle: FileHandle? - private let standardErrorWriteHandle: FileHandle? - private let _standardOutput: ChunkSequence - private let _standardError: ChunkSequence + private let standardInputPipe: ChildFileState + private let standardOutputWriteHandle: ChildFileState + private let standardErrorWriteHandle: ChildFileState + private var _standardOutput: Streaming + private var _standardError: Streaming private let processIsRunningApproximation = ManagedAtomic(RunningStateApproximation.neverStarted.rawValue) private let processOutputConsumptionApproximation = ManagedAtomic(UInt8(0)) private let processPid = ManagedAtomic(pid_t(0)) - private let ownsStandardOutputWriteHandle: Bool - private let ownsStandardErrorWriteHandle: Bool private let teardownSequence: TeardownSequence private let spawnOptions: SpawnOptions @@ -203,12 +229,34 @@ public final actor ProcessExecutor { /// it will be silently ignored (and no new session will be created). public var createNewSession: Bool + /// If an `AsyncSequence` to write is provided to `standardInput`, should we ignore all write errors? + /// + /// The default is `false` and write errors to the child process's standard input are thrown like process spawn errors. If set to `true`, these errors + /// are silently ignored. This option can be useful if we need to capture the child process' output even if writing into its standard input fails + public var ignoreStdinStreamWriteErrors: Bool + + /// If an error is hit whilst writing into the child process's standard input, should we cancel the process (making it terminate) + /// + /// Default is `true`. + public var cancelProcessOnStandardInputWriteFailure: Bool + + /// Should we cancel the standard input writing when the process has exited? + /// + /// Default is `true`. + /// + /// - warning: Disabling this is rather dangerous if the child process had interited its standard input into another process. If that is the case, we will + /// not return from `run(WithExtendedInfo)` until we streamed our full standard input (or it failed). + public var cancelStandardInputWritingWhenProcessExits: Bool + /// Safe & sensible default options. public static var `default`: SpawnOptions { return SpawnOptions( closeOtherFileDescriptors: true, changedWorkingDirectory: nil, - createNewSession: false + createNewSession: false, + ignoreStdinStreamWriteErrors: false, + cancelProcessOnStandardInputWriteFailure: true, + cancelStandardInputWritingWhenProcessExits: true ) } } @@ -249,28 +297,97 @@ public final actor ProcessExecutor { } } - public var standardOutput: ChunkSequence { + enum StreamingKickOff: Sendable { + case make(FileHandle, EventLoopPromise) + case wait(EventLoopFuture) + case take(ChunkSequence) + } + + private static func kickOffStreaming( + stream: inout Streaming + ) -> StreamingKickOff { + switch stream { + case .toBeStreamed(let fileHandle, let promise): + stream = .preparing(promise.futureResult) + return .make(fileHandle, promise) + case .preparing(let future): + return .wait(future) + case .streaming(let chunkSequence): + return .take(chunkSequence) + } + } + + private static func streamingSetupDone( + stream: inout Streaming, + _ chunkSequence: ChunkSequence + ) { + switch stream { + case .toBeStreamed, .streaming: + fatalError("impossible state: \(stream)") + case .preparing: + stream = .streaming(chunkSequence) + } + } + + private func assureSingleStreamConsumption(streamBit: OutputConsumptionState, name: String) { let afterValue = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( - with: OutputConsumptionState.stdoutConsumed.rawValue, + with: streamBit.rawValue, ordering: .relaxed ) precondition( - OutputConsumptionState(rawValue: afterValue).contains([.stdoutConsumed]), - "Double-consumption of stdandardOutput" + OutputConsumptionState(rawValue: afterValue).contains([streamBit]), + "Double-consumption of \(name)" ) - return self._standardOutput + } + + @discardableResult + private func setupStandardOutput() async throws -> ChunkSequence { + switch Self.kickOffStreaming(stream: &self._standardOutput) { + case .make(let fileHandle, let promise): + let chunkSequence = try! await ChunkSequence( + takingOwnershipOfFileHandle: fileHandle, + group: self.group.any() + ) + Self.streamingSetupDone(stream: &self._standardOutput, chunkSequence) + promise.succeed(chunkSequence) + return chunkSequence + case .wait(let chunkSequence): + return try await chunkSequence.get() + case .take(let chunkSequence): + return chunkSequence + } + } + + @discardableResult + private func setupStandardError() async throws -> ChunkSequence { + switch Self.kickOffStreaming(stream: &self._standardError) { + case .make(let fileHandle, let promise): + let chunkSequence = try! await ChunkSequence( + takingOwnershipOfFileHandle: fileHandle, + group: self.group.any() + ) + Self.streamingSetupDone(stream: &self._standardError, chunkSequence) + promise.succeed(chunkSequence) + return chunkSequence + case .wait(let chunkSequence): + return try await chunkSequence.get() + case .take(let chunkSequence): + return chunkSequence + } + } + + public var standardOutput: ChunkSequence { + get async { + self.assureSingleStreamConsumption(streamBit: .stdoutConsumed, name: #function) + return try! await self.setupStandardOutput() + } } public var standardError: ChunkSequence { - let afterValue = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( - with: OutputConsumptionState.stderrConsumed.rawValue, - ordering: .relaxed - ) - precondition( - OutputConsumptionState(rawValue: afterValue).contains([.stderrConsumed]), - "Double-consumption of stdandardEror" - ) - return self._standardError + get async { + self.assureSingleStreamConsumption(streamBit: .stderrConsumed, name: #function) + return try! await self.setupStandardError() + } } private enum RunningStateApproximation: Int { @@ -319,7 +436,12 @@ public final actor ProcessExecutor { self.teardownSequence = teardownSequence self.spawnOptions = spawnOptions - self.standardInputPipe = StandardInput.self == EOFSequence.self ? nil : Pipe() + self.standardInputPipe = StandardInput.self == EOFSequence.self ? .devNull : .ownedHandle(Pipe()) + + let standardOutputWriteHandle: ChildFileState + let standardErrorWriteHandle: ChildFileState + let _standardOutput: Streaming + let _standardError: Streaming switch standardOutput.backing { case .discard: @@ -327,38 +449,33 @@ public final actor ProcessExecutor { with: OutputConsumptionState.stdoutNotStreamed.rawValue, ordering: .relaxed ) - self.ownsStandardOutputWriteHandle = true - self.standardOutputWriteHandle = FileHandle(forWritingAtPath: "/dev/null") - self._standardOutput = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) + standardOutputWriteHandle = .devNull + _standardOutput = .streaming(ChunkSequence.makeEmptyStream()) case .fileDescriptorOwned(let fd): _ = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( with: OutputConsumptionState.stdoutNotStreamed.rawValue, ordering: .relaxed ) - self.ownsStandardOutputWriteHandle = true - self.standardOutputWriteHandle = FileHandle(fileDescriptor: fd.rawValue) - self._standardOutput = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) + standardOutputWriteHandle = .ownedHandle(FileHandle(fileDescriptor: fd.rawValue)) + _standardOutput = .streaming(ChunkSequence.makeEmptyStream()) case .fileDescriptorShared(let fd): _ = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( with: OutputConsumptionState.stdoutNotStreamed.rawValue, ordering: .relaxed ) - self.ownsStandardOutputWriteHandle = false - self.standardOutputWriteHandle = FileHandle(fileDescriptor: fd.rawValue) - self._standardOutput = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) + standardOutputWriteHandle = .unownedHandle(FileHandle(fileDescriptor: fd.rawValue)) + _standardOutput = .streaming(ChunkSequence.makeEmptyStream()) case .inherit: _ = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( with: OutputConsumptionState.stdoutNotStreamed.rawValue, ordering: .relaxed ) - self.ownsStandardOutputWriteHandle = true - self.standardOutputWriteHandle = nil - self._standardOutput = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) + standardOutputWriteHandle = .inherit + _standardOutput = .streaming(ChunkSequence.makeEmptyStream()) case .stream: - let (stdoutSequence, stdoutWriteHandle) = Self.makeWriteStream(group: group) - self.ownsStandardOutputWriteHandle = true - self._standardOutput = stdoutSequence - self.standardOutputWriteHandle = stdoutWriteHandle + let handles = Self.makeWriteStream(group: group) + _standardOutput = .toBeStreamed(handles.parentHandle, self.group.any().makePromise()) + standardOutputWriteHandle = .ownedHandle(handles.childHandle) } switch standardError.backing { @@ -367,49 +484,44 @@ public final actor ProcessExecutor { with: OutputConsumptionState.stderrNotStreamed.rawValue, ordering: .relaxed ) - self.ownsStandardErrorWriteHandle = true - self.standardErrorWriteHandle = FileHandle(forWritingAtPath: "/dev/null") - self._standardError = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) + standardErrorWriteHandle = .devNull + _standardError = .streaming(ChunkSequence.makeEmptyStream()) case .fileDescriptorOwned(let fd): _ = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( with: OutputConsumptionState.stderrNotStreamed.rawValue, ordering: .relaxed ) - self.ownsStandardErrorWriteHandle = true - self.standardErrorWriteHandle = FileHandle(fileDescriptor: fd.rawValue) - self._standardError = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) + standardErrorWriteHandle = .ownedHandle(FileHandle(fileDescriptor: fd.rawValue)) + _standardError = .streaming(ChunkSequence.makeEmptyStream()) case .fileDescriptorShared(let fd): _ = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( with: OutputConsumptionState.stderrNotStreamed.rawValue, ordering: .relaxed ) - self.ownsStandardErrorWriteHandle = false - self.standardErrorWriteHandle = FileHandle(fileDescriptor: fd.rawValue) - self._standardError = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) + standardErrorWriteHandle = .unownedHandle(FileHandle(fileDescriptor: fd.rawValue)) + _standardError = .streaming(ChunkSequence.makeEmptyStream()) case .inherit: _ = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( with: OutputConsumptionState.stderrNotStreamed.rawValue, ordering: .relaxed ) - self.ownsStandardErrorWriteHandle = true - self.standardErrorWriteHandle = nil - self._standardError = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) + standardErrorWriteHandle = .inherit + _standardError = .streaming(ChunkSequence.makeEmptyStream()) case .stream: - let (stdoutSequence, stdoutWriteHandle) = Self.makeWriteStream(group: group) - self.ownsStandardErrorWriteHandle = true - self._standardError = stdoutSequence - self.standardErrorWriteHandle = stdoutWriteHandle + let handles = Self.makeWriteStream(group: group) + _standardError = .toBeStreamed(handles.parentHandle, self.group.any().makePromise()) + standardErrorWriteHandle = .ownedHandle(handles.childHandle) } + + self._standardError = _standardError + self._standardOutput = _standardOutput + self.standardOutputWriteHandle = standardOutputWriteHandle + self.standardErrorWriteHandle = standardErrorWriteHandle } - private static func makeWriteStream(group: EventLoopGroup) -> (ChunkSequence, FileHandle) { + private static func makeWriteStream(group: EventLoopGroup) -> (parentHandle: FileHandle, childHandle: FileHandle) { let pipe = Pipe() - let chunkSequence = ChunkSequence( - takingOwnershipOfFileHandle: pipe.fileHandleForReading, - group: group - ) - let writeHandle = pipe.fileHandleForWriting - return (chunkSequence, writeHandle) + return (parentHandle: pipe.fileHandleForReading, childHandle: pipe.fileHandleForWriting) } deinit { @@ -522,6 +634,31 @@ public final actor ProcessExecutor { /// If you prefer to get the standard output and error in one (non-stremed) piece upon exit, consider the `static` methods such as /// ``ProcessExecutor/runCollectingOutput(group:executable:_:standardInput:collectStandardOutput:collectStandardError:perStreamCollectionLimitBytes:environment:logger:)``. public func run() async throws -> ProcessExitReason { + let result = try await self.runWithExtendedInfo() + if let error = result.standardInputWriteError { + throw error + } + return result.exitReason + } + + enum WhoReturned: Sendable { + case process(ProcessExitReason) + case stdinWriter((any Error)?) + } + + /// Run the process and provide extended information on exit. + /// + /// Calling `run()` will run the (sub-)process and return its ``ProcessExitReason`` when the execution completes. + /// Unless `standardOutput` and `standardError` were both set to ``ProcessOutput/discard``, + /// ``ProcessOutput/fileDescriptor(takingOwnershipOf:)`` or ``ProcessOutput/inherit`` you must consume the `AsyncSequence`s + /// ``ProcessExecutor/standardOutput`` and ``ProcessExecutor/standardError`` concurrently to ``run()``ing the process. + /// + /// If you prefer to get the standard output and error in one (non-stremed) piece upon exit, consider the `static` methods such as + /// ``ProcessExecutor/runCollectingOutput(group:executable:_:standardInput:collectStandardOutput:collectStandardError:perStreamCollectionLimitBytes:environment:logger:)``. + public func runWithExtendedInfo() async throws -> ProcessExitExtendedInfo { + try await self.setupStandardOutput() + try await self.setupStandardError() + let p = Process() #if canImport(Darwin) if #available(macOS 13.0, *) { @@ -549,15 +686,32 @@ public final actor ProcessExecutor { assert(!Self.isBackedByPSProcess) } - if let standardOutputWriteHandle = self.standardOutputWriteHandle { - // NOTE: Do _NOT_ remove this if. Setting this to `nil` is different to not setting it at all! - p.standardOutput = standardOutputWriteHandle + switch self.standardInputPipe { + case .inherit: + () // We are _not_ setting it, this is `Foundation.Process`'s API for inheritance + case .devNull: + p.standardInput = nil // Yes, setting to `nil` means `/dev/null` + case .ownedHandle(let pipe), .unownedHandle(let pipe): + p.standardInput = pipe + } + + switch self.standardOutputWriteHandle { + case .inherit: + () // We are _not_ setting it, this is `Foundation.Process`'s API for inheritance + case .devNull: + p.standardOutput = nil // Yes, setting to `nil` means `/dev/null` + case .ownedHandle(let fileHandle), .unownedHandle(let fileHandle): + p.standardOutput = fileHandle } - if let standardErrorWriteHandle = self.standardErrorWriteHandle { - // NOTE: Do _NOT_ remove this if. Setting this to `nil` is different to not setting it at all! - p.standardError = standardErrorWriteHandle + + switch self.standardErrorWriteHandle { + case .inherit: + () // We are _not_ setting it, this is `Foundation.Process`'s API for inheritance + case .devNull: + p.standardError = nil // Yes, setting to `nil` means `/dev/null` + case .ownedHandle(let fileHandle), .unownedHandle(let fileHandle): + p.standardError = fileHandle } - p.standardInput = self.standardInputPipe let (terminationStreamConsumer, terminationStreamProducer) = AsyncStream.justMakeIt( elementType: ProcessExitReason.self @@ -630,11 +784,11 @@ public final actor ProcessExecutor { ordering: .relaxed ) terminationStreamProducer.finish() // The termination handler will never have fired. - if self.ownsStandardOutputWriteHandle { - try! self.standardOutputWriteHandle?.close() + if let stdoutHandle = self.standardOutputWriteHandle.handleIfOwned { + try! stdoutHandle.close() } - if self.ownsStandardErrorWriteHandle { - try! self.standardErrorWriteHandle?.close() + if let stderrHandle = self.standardErrorWriteHandle.handleIfOwned { + try! stderrHandle.close() } assert(worked) // We just set it to running above, shouldn't be able to race (no `await`). assert(original == RunningStateApproximation.running.rawValue) // We compare-and-exchange it. @@ -659,12 +813,14 @@ public final actor ProcessExecutor { ] ) - try! self.standardInputPipe?.fileHandleForReading.close() // Must work. - if self.ownsStandardOutputWriteHandle { - try! self.standardOutputWriteHandle?.close() // Must work. + if let stdinHandle = self.standardInputPipe.handleIfOwned { + try! stdinHandle.fileHandleForReading.close() // Must work. + } + if let stdoutHandle = self.standardOutputWriteHandle.handleIfOwned { + try! stdoutHandle.close() // Must work. } - if self.ownsStandardErrorWriteHandle { - try! self.standardErrorWriteHandle?.close() // Must work. + if let stderrHandle = self.standardErrorWriteHandle.handleIfOwned { + try! stderrHandle.close() // Must work. } @Sendable func waitForChildToExit() async -> ProcessExitReason { @@ -683,10 +839,10 @@ public final actor ProcessExecutor { } } - return try await withThrowingTaskGroup( - of: ProcessExitReason?.self, - returning: ProcessExitReason.self - ) { runProcessGroup async throws -> ProcessExitReason in + let extendedExitReason = await withTaskGroup( + of: WhoReturned.self, + returning: ProcessExitExtendedInfo.self + ) { runProcessGroup async -> ProcessExitExtendedInfo in runProcessGroup.addTask { await withTaskGroup(of: Void.self) { triggerTeardownGroup in triggerTeardownGroup.addTask { @@ -718,32 +874,55 @@ public final actor ProcessExecutor { let result = await waitForChildToExit() triggerTeardownGroup.cancelAll() // This triggers the teardown - return result + return .process(result) } } runProcessGroup.addTask { - if let stdinPipe = self.standardInputPipe { - let fdForNIO = dup(stdinPipe.fileHandleForWriting.fileDescriptor) - try! stdinPipe.fileHandleForWriting.close() + let stdinPipe: Pipe + switch self.standardInputPipe { + case .inherit, .devNull: + return .stdinWriter(nil) + case .ownedHandle(let pipe): + stdinPipe = pipe + case .unownedHandle(let pipe): + stdinPipe = pipe + } + let fdForNIO = dup(stdinPipe.fileHandleForWriting.fileDescriptor) + try! stdinPipe.fileHandleForWriting.close() + do { try await NIOAsyncPipeWriter>.sinkSequenceInto( self.standardInput, takingOwnershipOfFD: fdForNIO, + ignoreWriteErrors: self.spawnOptions.ignoreStdinStreamWriteErrors, eventLoop: self.group.any() ) + } catch { + return .stdinWriter(error) } - return nil + return .stdinWriter(nil) } var exitReason: ProcessExitReason? = nil - // cannot fix this warning yet (rdar://113844171) - while let result = try await runProcessGroup.next() { - if let result = result { + var stdinWriterError: (any Error)?? = nil + while let result = await runProcessGroup.next() { + switch result { + case .process(let result): exitReason = result + if self.spawnOptions.cancelStandardInputWritingWhenProcessExits { + runProcessGroup.cancelAll() + } + case .stdinWriter(let maybeError): + stdinWriterError = maybeError + if self.spawnOptions.cancelProcessOnStandardInputWriteFailure && maybeError != nil { + runProcessGroup.cancelAll() + } } } - return exitReason! // must work because the real task will return a reason (or throw) + return ProcessExitExtendedInfo(exitReason: exitReason!, standardInputWriteError: stdinWriterError!) } + + return extendedExitReason } /// The processes's process identifier (pid). Please note that most use cases of this are racy because UNIX systems recycle pids after process exit. diff --git a/Sources/AsyncProcess/ProcessExit.swift b/Sources/AsyncProcess/ProcessExit.swift index 6e88f78..1266233 100644 --- a/Sources/AsyncProcess/ProcessExit.swift +++ b/Sources/AsyncProcess/ProcessExit.swift @@ -10,6 +10,14 @@ // //===----------------------------------------------------------------------===// +public struct ProcessExitExtendedInfo: Sendable { + /// Reason the process exited. + public var exitReason: ProcessExitReason + + /// Any errors that occurred whilst writing the provided `standardInput` sequence into the child process' standard input. + public var standardInputWriteError: Optional +} + public enum ProcessExitReason: Hashable & Sendable { case exit(CInt) case signal(CInt) diff --git a/Sources/AsyncProcess/StructuredConcurrencyHelpers.swift b/Sources/AsyncProcess/StructuredConcurrencyHelpers.swift new file mode 100644 index 0000000..f2e71bc --- /dev/null +++ b/Sources/AsyncProcess/StructuredConcurrencyHelpers.swift @@ -0,0 +1,95 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2022-2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2025 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +// swift-format-ignore +// Note: Whitespace changes are used to workaround compiler bug +// https://github.com/swiftlang/swift/issues/79285 + +#if compiler(>=6.0) +@inlinable +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +internal func asyncDo( + isolation: isolated (any Actor)? = #isolation, + // DO NOT FIX THE WHITESPACE IN THE NEXT LINE UNTIL 5.10 IS UNSUPPORTED + // https://github.com/swiftlang/swift/issues/79285 + _ body: () async throws -> sending R, finally: sending @escaping ((any Error)?) async throws -> Void) async throws -> sending R { + let result: R + do { + result = try await body() + } catch { + // `body` failed, we need to invoke `finally` with the `error`. + + // This _looks_ unstructured but isn't really because we unconditionally always await the return. + // We need to have an uncancelled task here to assure this is actually running in case we hit a + // cancellation error. + try await Task { + try await finally(error) + }.value + throw error + } + + // `body` succeeded, we need to invoke `finally` with `nil` (no error). + + // This _looks_ unstructured but isn't really because we unconditionally always await the return. + // We need to have an uncancelled task here to assure this is actually running in case we hit a + // cancellation error. + try await Task { + try await finally(nil) + }.value + return result +} +#else +@inlinable +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +internal func asyncDo( + _ body: () async throws -> R, + finally: @escaping @Sendable ((any Error)?) async throws -> Void +) async throws -> R { + let result: R + do { + result = try await body() + } catch { + // `body` failed, we need to invoke `finally` with the `error`. + + // This _looks_ unstructured but isn't really because we unconditionally always await the return. + // We need to have an uncancelled task here to assure this is actually running in case we hit a + // cancellation error. + try await Task { + try await finally(error) + }.value + throw error + } + + // `body` succeeded, we need to invoke `finally` with `nil` (no error). + + // This _looks_ unstructured but isn't really because we unconditionally always await the return. + // We need to have an uncancelled task here to assure this is actually running in case we hit a + // cancellation error. + try await Task { + try await finally(nil) + }.value + return result +} +#endif diff --git a/Sources/CProcessSpawnSync/internal-helpers.h b/Sources/CProcessSpawnSync/internal-helpers.h index b57f0f6..ffb00bf 100644 --- a/Sources/CProcessSpawnSync/internal-helpers.h +++ b/Sources/CProcessSpawnSync/internal-helpers.h @@ -12,7 +12,18 @@ #ifndef INTERNAL_HELPERS_H #define INTERNAL_HELPERS_H +#include +#include #include +#include +#if defined(__linux__) +#include +#endif +#if defined(__APPLE__) +#include + +ssize_t __getdirentries64(int fd, void *buf, size_t bufsize, off_t *basep); +#endif static int positive_int_parse(const char *str) { int out = 0; @@ -29,34 +40,80 @@ static int positive_int_parse(const char *str) { return out; } -static int highest_possibly_open_fd_dir(const char *fd_dir) { +#if defined(__linux__) || defined(__APPLE__) +// Platform-specific version that uses syscalls directly and doesn't allocate heap memory. +// Safe to use after vfork() and before execve() +static int highest_possibly_open_fd_dir_syscall(const char *fd_dir) { int highest_fd_so_far = 0; - DIR *dir_ptr = opendir(fd_dir); - if (dir_ptr == NULL) { + int dir_fd = open(fd_dir, O_RDONLY); + if (dir_fd < 0) { + // errno set by `open`. return -1; } - struct dirent *dir_entry = NULL; - while ((dir_entry = readdir(dir_ptr)) != NULL) { - char *entry_name = dir_entry->d_name; - int number = positive_int_parse(entry_name); - if (number > (long)highest_fd_so_far) { - highest_fd_so_far = number; + // Buffer for directory entries - allocated on stack, no heap allocation + char buffer[4096] = {0}; +#if defined(__linux__) + ssize_t bytes_read = -1; +#elif defined(__APPLE__) + ssize_t bytes_read = -1; + off_t os_controlled_seek_pos = -1; +#endif + + while (( +#if defined(__linux__) +# if defined(__GLIBC__) && __GLIBC__ == 2 && defined(__GLIBC_MINOR__) && __GLIBC_MINOR__ >= 30 + bytes_read = getdents64(dir_fd, (struct dirent64 *)buffer, sizeof(buffer)) +# else + bytes_read = syscall(SYS_getdents64, dir_fd, (struct dirent64 *)buffer, sizeof(buffer)) +# endif +#elif defined(__APPLE__) + bytes_read = __getdirentries64(dir_fd, buffer, sizeof(buffer), &os_controlled_seek_pos) +#endif + ) > 0) { + if (bytes_read < 0) { + if (errno == EINTR) { + continue; + } else { + // `errno` set by getdents64/getdirentries. + highest_fd_so_far = -1; + goto error; + } } - } + long offset = 0; + while (offset < bytes_read) { +#if defined(__linux__) + struct dirent64 *entry = (struct dirent64 *)(buffer + offset); +#elif defined(__APPLE__) + struct dirent *entry = (struct dirent *)(buffer + offset); +#endif + + // Skip "." and ".." entries + if (entry->d_name[0] != '.') { + int number = positive_int_parse(entry->d_name); + if (number > highest_fd_so_far) { + highest_fd_so_far = number; + } + } + + offset += entry->d_reclen; + } + } - closedir(dir_ptr); +error: + close(dir_fd); return highest_fd_so_far; } +#endif static int highest_possibly_open_fd(void) { #if defined(__APPLE__) - int hi = highest_possibly_open_fd_dir("/dev/fd"); + int hi = highest_possibly_open_fd_dir_syscall("/dev/fd"); if (hi < 0) { hi = getdtablesize(); } #elif defined(__linux__) - int hi = highest_possibly_open_fd_dir("/proc/self/fd"); + int hi = highest_possibly_open_fd_dir_syscall("/proc/self/fd"); if (hi < 0) { hi = getdtablesize(); } diff --git a/Sources/CProcessSpawnSync/spawner.c b/Sources/CProcessSpawnSync/spawner.c index 315bb75..a69791b 100644 --- a/Sources/CProcessSpawnSync/spawner.c +++ b/Sources/CProcessSpawnSync/spawner.c @@ -10,7 +10,6 @@ // //===----------------------------------------------------------------------===// -#define _GNU_SOURCE #include #include #include @@ -169,7 +168,8 @@ static void setup_and_execve_child(ps_process_configuration *config, int error_p #endif if (close_range_err) { // close_range failed (or doesn't exist), let's fall back onto this - for (int i=config->psc_fd_setup_count; ipsc_fd_setup_count; i<=high_fd; i++) { if (i != error_pipe) { close(i); } diff --git a/Sources/ProcessSpawnSync/ProcessSpawner.swift b/Sources/ProcessSpawnSync/ProcessSpawner.swift index 43d0e11..d57fd03 100644 --- a/Sources/ProcessSpawnSync/ProcessSpawner.swift +++ b/Sources/ProcessSpawnSync/ProcessSpawner.swift @@ -12,9 +12,15 @@ import Atomics import CProcessSpawnSync -import Foundation import NIOConcurrencyHelpers +#if os(iOS) || os(tvOS) || os(watchOS) + // Process & fork/exec unavailable + #error("Process and fork() unavailable") +#else + import Foundation +#endif + extension ps_error_s { private func makeDescription() -> String { return """ @@ -51,6 +57,38 @@ public struct PSProcessUnknownError: Error & CustomStringConvertible { } } +// We need this to replicate `Foundation.Process`'s odd API where +// - stardard{Input,Output,Error} not set means _inherit_ +// - stardard{Input,Output,Error} set to `nil` means `/dev/null` +// - stardard{Input,Output,Error} set to FileHandle/Pipe means use that +internal enum OptionallySet { + case notSet + case setToNone + case setTo(Wrapped) + + var asOptional: Wrapped? { + switch self { + case .notSet: + return nil + case .setToNone: + return nil + case .setTo(let wrapped): + return wrapped + } + } + + var isSetToNone: Bool { + switch self { + case .notSet, .setTo: + return false + case .setToNone: + return true + } + } +} + +extension OptionallySet: Sendable where Wrapped: Sendable {} + public final class PSProcess: Sendable { struct State: Sendable { var executableURL: URL? = nil @@ -60,9 +98,9 @@ public final class PSProcess: Sendable { var closeOtherFileDescriptors: Bool = true var createNewSession: Bool = false private(set) var pidWhenRunning: pid_t? = nil - var standardInput: Pipe? = nil - var standardOutput: FileHandle? = nil - var standardError: FileHandle? = nil + var standardInput: OptionallySet = .notSet + var standardOutput: OptionallySet = .notSet + var standardError: OptionallySet = .notSet var terminationHandler: (@Sendable (PSProcess) -> Void)? = nil private(set) var procecesIdentifier: pid_t? = nil private(set) var terminationStatus: (Process.TerminationReason, CInt)? = nil @@ -129,13 +167,60 @@ public final class PSProcess: Sendable { } } + let devNullFD: CInt + if state.standardInput.isSetToNone || state.standardOutput.isSetToNone || state.standardError.isSetToNone { + devNullFD = open("/dev/null", O_RDWR) + guard devNullFD >= 0 else { + throw PSProcessUnknownError(reason: "Cannot open /dev/null: \(errno)") + } + } else { + devNullFD = -1 + } + + defer { + if devNullFD != -1 { + close(devNullFD) + } + } + let stdinFDForChild: CInt + let stdoutFDForChild: CInt + let stderrFDForChild: CInt + + // Replicate `Foundation.Process`'s API where not setting means "inherit" and `nil`-setting means /dev/null + switch state.standardInput { + case .notSet: + stdinFDForChild = STDIN_FILENO + case .setToNone: + assert(devNullFD >= 0) + stdinFDForChild = devNullFD + case .setTo(let handle): + stdinFDForChild = handle.fileHandleForReading.fileDescriptor + } + + switch state.standardOutput { + case .notSet: + stdoutFDForChild = STDOUT_FILENO + case .setToNone: + assert(devNullFD >= 0) + stdoutFDForChild = devNullFD + case .setTo(let handle): + stdoutFDForChild = handle.fileDescriptor + } + + switch state.standardError { + case .notSet: + stderrFDForChild = STDERR_FILENO + case .setToNone: + assert(devNullFD >= 0) + stderrFDForChild = devNullFD + case .setTo(let handle): + stderrFDForChild = handle.fileDescriptor + } + let psSetup: [ps_fd_setup] = [ - ps_fd_setup( - psfd_kind: PS_MAP_FD, - psfd_parent_fd: state.standardInput?.fileHandleForReading.fileDescriptor ?? STDIN_FILENO - ), - ps_fd_setup(psfd_kind: PS_MAP_FD, psfd_parent_fd: state.standardOutput?.fileDescriptor ?? STDOUT_FILENO), - ps_fd_setup(psfd_kind: PS_MAP_FD, psfd_parent_fd: state.standardError?.fileDescriptor ?? STDERR_FILENO), + ps_fd_setup(psfd_kind: PS_MAP_FD, psfd_parent_fd: stdinFDForChild), + ps_fd_setup(psfd_kind: PS_MAP_FD, psfd_parent_fd: stdoutFDForChild), + ps_fd_setup(psfd_kind: PS_MAP_FD, psfd_parent_fd: stderrFDForChild), ] let (pid, error) = psSetup.withUnsafeBufferPointer { psSetupPtr -> (pid_t, ps_error) in var config = ps_process_configuration_s( @@ -152,7 +237,12 @@ public final class PSProcess: Sendable { let pid = ps_spawn_process(&config, &error) return (pid, error) } - try! state.standardInput?.fileHandleForReading.close() + switch state.standardInput { + case .notSet, .setToNone: + () // Nothing to do + case .setTo(let pipe): + try! pipe.fileHandleForReading.close() + } guard pid > 0 else { switch (error.pse_kind, error.pse_code) { case (PS_ERROR_KIND_EXECVE, ENOENT), @@ -313,12 +403,12 @@ public final class PSProcess: Sendable { public var standardOutput: FileHandle? { get { self.state.withLockedValue { state in - state.standardOutput + state.standardOutput.asOptional } } set { self.state.withLockedValue { state in - state.standardOutput = newValue + state.standardOutput = newValue.map { .setTo($0) } ?? .setToNone } } } @@ -326,12 +416,12 @@ public final class PSProcess: Sendable { public var standardError: FileHandle? { get { self.state.withLockedValue { state in - state.standardError + state.standardError.asOptional } } set { self.state.withLockedValue { state in - state.standardError = newValue + state.standardError = newValue.map { .setTo($0) } ?? .setToNone } } } @@ -339,12 +429,12 @@ public final class PSProcess: Sendable { public var standardInput: Pipe? { get { self.state.withLockedValue { state in - state.standardInput + state.standardInput.asOptional } } set { self.state.withLockedValue { state in - state.standardInput = newValue + state.standardInput = newValue.map { .setTo($0) } ?? .setToNone } } } diff --git a/Tests/AsyncProcessTests/IntegrationTests.swift b/Tests/AsyncProcessTests/IntegrationTests.swift index f79cd50..6e8c13e 100644 --- a/Tests/AsyncProcessTests/IntegrationTests.swift +++ b/Tests/AsyncProcessTests/IntegrationTests.swift @@ -354,8 +354,6 @@ final class IntegrationTests: XCTestCase { } func testOutputWithoutNewlinesThatIsSplitIntoLines() async throws { - self.logger = Logger(label: "x") - self.logger.logLevel = .trace let exe = ProcessExecutor( group: self.group, executable: "/bin/sh", @@ -1333,6 +1331,672 @@ final class IntegrationTests: XCTestCase { } } + func testVeryHighFDs() async throws { + var openedFDs: [CInt] = [] + + // Open /dev/null to use as source for duplication + let devNullFD = open("/dev/null", O_RDONLY) + guard devNullFD != -1 else { + XCTFail("Failed to open /dev/null") + return + } + defer { + let closeResult = close(devNullFD) + XCTAssertEqual(0, closeResult, "Failed to close /dev/null FD") + } + + for candidate in sequence(first: CInt(1), next: { $0 <= CInt.max / 2 ? $0 * 2 : nil }) { + // Use fcntl with F_DUPFD to find next available FD >= candidate + let fd = fcntl(devNullFD, F_DUPFD, candidate) + if fd == -1 { + // Failed to allocate FD >= candidate, try next power of 2 + self.logger.debug( + "already unavailable, skipping", + metadata: ["candidate": "\(candidate)", "errno": "\(errno)"] + ) + continue + } else { + openedFDs.append(fd) + self.logger.debug("Opened FD in parent", metadata: ["fd": "\(fd)"]) + } + } + + defer { + for fd in openedFDs { + let closeResult = close(fd) + XCTAssertEqual(0, closeResult, "Failed to close FD \(fd)") + } + } + + // Create shell script that checks each FD passed as arguments + let shellScript = """ + for fd in "$@"; do + if [ -e "/proc/self/fd/$fd" ] || [ -e "/dev/fd/$fd" ]; then + echo "- fd: $fd: OPEN" + else + echo "- fd: $fd: CLOSED" + fi + done + """ + + var arguments = ["-c", shellScript, "--"] + arguments.append(contentsOf: openedFDs.map { "\($0)" }) + + let result = try await ProcessExecutor.runCollectingOutput( + group: self.group, + executable: "/bin/sh", + arguments, + standardInput: EOFSequence(), + collectStandardOutput: true, + collectStandardError: true, + logger: self.logger + ) + try result.exitReason.throwIfNonZero() + + // Assert stderr is empty + XCTAssertEqual("", String(buffer: result.standardError!)) + + // Assert stdout contains exactly the expected output (all FDs closed) + let expectedOutput = openedFDs.map { "- fd: \($0): CLOSED" }.joined(separator: "\n") + "\n" + XCTAssertEqual(expectedOutput, String(buffer: result.standardOutput!)) + } + + func testStandardInputIgnoredMeansImmediateEOF() async throws { + let result = try await ProcessExecutor.runCollectingOutput( + executable: "/bin/sh", + [ + "-c", + #""" + set -eu + while read -r line; do + echo "unexpected input $line" + done + exit 0 + """#, + ], + collectStandardOutput: true, + collectStandardError: true + ) + try result.exitReason.throwIfNonZero() + XCTAssertEqual("", String(buffer: result.standardOutput!)) + XCTAssertEqual("", String(buffer: result.standardError!)) + } + + func testStandardInputStreamWriteErrorsBlowUpOldSchoolRunSpawnOnProcessExit() async throws { + do { + let result = try await ProcessExecutor( + executable: "/bin/sh", + [ + "-c", + #""" + set -e + read -r line + if [ "$line" = "go" ]; then + echo "GO" + exit 0 # We're just exiting here which will have the effect of stdin closing + fi + echo "PROBLEM" + while read -r line; do + echo "unexpected input $line" + done + exit 1 + """#, + ], + standardInput: sequence( + first: ByteBuffer(string: "go\n"), + next: { _ in ByteBuffer(string: "extra line\n") } // infinite sequence + ).async, + standardOutput: .discard, + standardError: .discard + ).run() + XCTFail("unexpected result: \(result)") + } catch let error as NIO.IOError { + XCTAssert( + [ + EPIPE, + EBADF, // don't worry, this is a NIO-synthesised (already closed) EBADF + ].contains(error.errnoCode), + "unexpected error: \(error)" + ) + } + } + + func testStandardInputStreamWriteErrorsBlowUpOldSchoolRunOnStandardInputClose() async throws { + do { + let result = try await ProcessExecutor( + executable: "/bin/sh", + [ + "-c", + #""" + set -e + read -r line + if [ "$line" = "go" ]; then + echo "GO" + exec <&- # close stdin but stay alive + while true; do sleep 1; done + exit 0 + fi + echo "PROBLEM" + while read -r line; do + echo "unexpected input $line" + done + exit 1 + """#, + ], + standardInput: sequence( + first: ByteBuffer(string: "go\n"), + next: { _ in ByteBuffer(string: "extra line\n") } // infinite sequence + ).async, + standardOutput: .discard, + standardError: .discard + ).run() + XCTFail("unexpected result: \(result)") + } catch let error as NIO.IOError { + XCTAssert( + [ + EPIPE, + EBADF, // don't worry, this is a NIO-synthesised (already closed) EBADF + ].contains(error.errnoCode), + "unexpected error: \(error)" + ) + } + } + + func testStandardInputStreamWriteErrorsDoNotBlowUpRunCollectingInputOnProcessExit() async throws { + let result = try await ProcessExecutor.runCollectingOutput( + executable: "/bin/sh", + [ + "-c", + #""" + set -e + read -r line + if [ "$line" = "go" ]; then + echo "GO" + exit 0 # We're just exiting here which will have the effect of stdin closing + fi + echo "PROBLEM" + while read -r line; do + echo "unexpected input $line" + done + exit 1 + """#, + ], + standardInput: sequence( + first: ByteBuffer(string: "go\n"), + next: { _ in ByteBuffer(string: "extra line\n") } // infinite sequence + ).async, + collectStandardOutput: true, + collectStandardError: true + ) + XCTAssertEqual(.exit(0), result.exitReason) // child exits by itself + XCTAssertNotNil(result.standardInputWriteError) + XCTAssertEqual( + EPIPE, + (result.standardInputWriteError as? NIO.IOError).map { ioError in + if ioError.errnoCode == EBADF { + // Don't worry, not a real EBADF, just a NIO synthesised one + // https://github.com/apple/swift-nio/issues/3292 + // Let's fudge the error into a sensible one. + let ioError = NIO.IOError(errnoCode: EPIPE, reason: ioError.description) + return ioError + } else { + return ioError + } + }?.errnoCode, + "\(result.standardInputWriteError.debugDescription)" + ) + XCTAssertEqual("GO\n", String(buffer: result.standardOutput!)) + XCTAssertEqual("", String(buffer: result.standardError!)) + } + + func testStandardInputStreamWriteErrorsDoNotBlowUpRunCollectingInputOnStandardInputClose() async throws { + let result = try await ProcessExecutor.runCollectingOutput( + executable: "/bin/sh", + [ + "-c", + #""" + set -e + read -r line + if [ "$line" = "go" ]; then + echo "GO" + exec <&- # close stdin but stay alive + while true; do sleep 1; done + exit 0 + fi + echo "PROBLEM" + while read -r line; do + echo "unexpected input $line" + done + exit 1 + """#, + ], + standardInput: sequence( + first: ByteBuffer(string: "go\n"), + next: { _ in ByteBuffer(string: "extra line\n") } // infinite sequence + ).async, + collectStandardOutput: true, + collectStandardError: true + ) + XCTAssertEqual(.signal(9), result.exitReason) // Child doesn't die by itself, so it'll be killed by our cancel + XCTAssertNotNil(result.standardInputWriteError) + XCTAssertEqual( + EPIPE, + (result.standardInputWriteError as? NIO.IOError).map { ioError in + if ioError.errnoCode == EBADF { + // Don't worry, not a real EBADF, just a NIO synthesised one + // https://github.com/apple/swift-nio/issues/3292 + // Let's fudge the error into a sensible one. + let ioError = NIO.IOError(errnoCode: EPIPE, reason: ioError.description) + return ioError + } else { + return ioError + } + }?.errnoCode, + "\(result.standardInputWriteError.debugDescription)" + ) + XCTAssertEqual("GO\n", String(buffer: result.standardOutput!)) + XCTAssertEqual("", String(buffer: result.standardError!)) + } + + func testStandardInputStreamWriteErrorsCanBeIgnored() async throws { + var spawnOptions = ProcessExecutor.SpawnOptions.default + spawnOptions.ignoreStdinStreamWriteErrors = true + do { + let result = try await ProcessExecutor.runCollectingOutput( + executable: "/bin/sh", + [ + "-c", + #""" + set -e + read -r line + if [ "$line" = "go" ]; then + echo "GO" + exit 0 # We're just exiting here which will have the effect of stdin closing + fi + echo "PROBLEM" + while read -r line; do + echo "unexpected input $line" + done + exit 1 + """#, + ], + spawnOptions: spawnOptions, + standardInput: sequence( + first: ByteBuffer(string: "go\n"), + next: { _ in ByteBuffer(string: "extra line\n") } // infinite sequence + ).async, + collectStandardOutput: true, + collectStandardError: true + ) + try result.exitReason.throwIfNonZero() + XCTAssertEqual("GO\n", String(buffer: result.standardOutput!)) + XCTAssertEqual("", String(buffer: result.standardError!)) + } + } + + func testStandardInputStreamWriteErrorsCanBeReceivedThroughExtendedResults() async throws { + // The default is + // spawnOptions.cancelProcessOnStandardInputWriteFailure = true + // therefore, this should not hang and the program should get killed with SIGKILL (due to cancellation). + let exe = ProcessExecutor( + executable: "/bin/sh", + [ + "-c", + #""" + set -eu + read -r line + if [ "$line" = "go" ]; then + echo "GO" + exec <&- # close stdin but stay alive + while true; do sleep 1; done + exit 0 + fi + echo "PROBLEM" + while read -r line; do + echo "unexpected input $line" + done + exit 1 + """#, + ], + standardInput: sequence( + first: ByteBuffer(string: "go\n"), + next: { _ in ByteBuffer(string: "extra line\n") } // infinite sequence + ).async + ) + async let resultAsync = exe.runWithExtendedInfo() + for try await line in await merge(exe.standardOutput.splitIntoLines(), exe.standardError.splitIntoLines()) { + if String(buffer: line) != "GO" { + XCTFail("unexpected line: \(line)") + } + } + let result = try await resultAsync + XCTAssertEqual(.signal(SIGKILL), result.exitReason) + XCTAssertThrowsError(try result.standardInputWriteError.map { throw $0 }) { error in + if let error = error as? NIO.IOError { + XCTAssert( + [ + EPIPE, + EBADF, // don't worry, this is a NIO-synthesised (already closed) EBADF + ].contains(error.errnoCode), + "unexpected error: \(error)" + ) + } else { + XCTFail("unexpected error: \(error)") + } + } + } + + func testCanMakeProgramHangWhenStdinIsClosedBecauseWeDisabledCancellation() async throws { + // This is quite a complex test. Here we're closing stdin in the child process but disable automatic + // parent process cancellation on child stdin write errors. Therefore, the child will hang until we cancel it + // ourselves. + var spawnOptions = ProcessExecutor.SpawnOptions.default + spawnOptions.cancelProcessOnStandardInputWriteFailure = false + let exe = ProcessExecutor( + executable: "/bin/sh", + [ + "-c", + #""" + set -eu + read -r line + if [ "$line" = "go" ]; then + echo "GO" + exec <&- # close stdin but stay alive + exec >&- # also close stdout to signal to parent + while true; do sleep 1; done + exit 0 + fi + echo "PROBLEM" + while read -r line; do + echo "unexpected input $line" + done + exit 1 + """#, + ], + spawnOptions: spawnOptions, + standardInput: sequence( + first: ByteBuffer(string: "go\n"), + next: { _ in ByteBuffer(string: "extra line\n") } // infinite sequence + ).async + ) + + enum WhoReturned { + case process(Result) + case stderr(Error?) + case stdout(Error?) + case sleep + } + await withTaskGroup(of: WhoReturned.self) { group in + group.addTask { + do { + let result = try await exe.runWithExtendedInfo() + return WhoReturned.process(.success(result)) + } catch { + return WhoReturned.process(.failure(error)) + } + } + group.addTask { + do { + for try await line in await exe.standardError.splitIntoLines() { + XCTFail("unexpected stderr line: \(line)") + } + return .stderr(nil) + } catch { + return .stderr(error) + } + } + group.addTask { + do { + for try await line in await exe.standardOutput.splitIntoLines() { + if line != ByteBuffer(string: "GO") { + XCTFail("unexpected stdout line: \(line)") + } + } + return .stdout(nil) + } catch { + return .stdout(error) + } + } + group.addTask { + try? await Task.sleep(nanoseconds: 500_000_000) + return .sleep + } + + let actualReturn1 = await group.next()! // .stdout (likely) or .sleep (unlikely) + let actualReturn2 = await group.next()! // .sleep (likely) or .stdout (unlikely) + group.cancelAll() + let actualReturn3 = await group.next()! // .stderr or .process + let actualReturn4 = await group.next()! // .stderr or .process + + switch actualReturn1 { + case .stdout(let maybeError): + XCTAssertNil(maybeError) + case .sleep: + () + default: + XCTFail("unexpected: \(actualReturn1)") + } + switch actualReturn2 { + case .stdout(let maybeError): + XCTAssertNil(maybeError) + case .sleep: + () + default: + XCTFail("unexpected: \(actualReturn2)") + } + switch actualReturn3 { + case .stderr(let maybeError): + XCTAssertNil(maybeError) + case .process(let result): + let exitReason = try? result.get() + XCTAssertEqual(.signal(SIGKILL), exitReason?.exitReason) + XCTAssertNotNil(exitReason?.standardInputWriteError) + default: + XCTFail("unexpected: \(actualReturn3)") + } + switch actualReturn4 { + case .stderr(let maybeError): + XCTAssertNil(maybeError) + case .process(let result): + let exitReason = try? result.get() + XCTAssertEqual(.signal(SIGKILL), exitReason?.exitReason) + XCTAssertNotNil(exitReason?.standardInputWriteError) + default: + XCTFail("unexpected: \(actualReturn4)") + } + } + } + + func testWeDoNotHangIfStandardInputRemainsOpenButProcessExits() async throws { + // This tests an odd situation: The child exits but stdin is still not closed, mostly happens if we inherit a + // pipe that we still have another writer to. + + var sleepPidToKill: CInt? + defer { + if let sleepPidToKill { + self.logger.debug( + "killing our sleep grand-child", + metadata: ["pid": "\(sleepPidToKill)"] + ) + kill(sleepPidToKill, SIGKILL) + } else { + XCTFail("didn't find the pid of sleep to kill") + } + } + do { // We create a scope here to make sure we can leave the scope without hanging + let (stdinStream, stdinStreamProducer) = AsyncStream.makeStream(of: ByteBuffer.self) + let exe = ProcessExecutor( + executable: "/bin/sh", + [ + "-c", + #""" + # This construction attempts to emulate a simple `sleep 12345678 < /dev/null` but some shells (eg. dash) + # won't allow stdin inheritance for background processes... + exec 2>&- # close stderr + exec 2<&0 # duplicate stdin into fd 2 (so we can inherit it into sleep + + ( + exec 0<&2 # map the duplicated fd 2 as our stdin + exec 2>&- # close the duplicated fd2 + exec sleep 12345678 # sleep (this will now have the origin stdin as its stdin) + ) & # uber long sleep that will inherit our stdin pipe + exec 2>&- # close duplicated 2 + + read -r line + echo "$line" # write back the line + echo "$!" # write back the sleep + exec >&- + exit 0 + """#, + ], + standardInput: stdinStream + ) + stdinStreamProducer.yield(ByteBuffer(string: "GO\n")) + stdinStreamProducer.yield(ByteBuffer(repeating: 0x42, count: 16 * 1024 * 1024)) + async let resultAsync = exe.runWithExtendedInfo() + async let stderrAsync = Array(exe.standardError) + var stdoutLines = await exe.standardOutput.splitIntoLines().makeAsyncIterator() + let lineGo = try await stdoutLines.next() + XCTAssertEqual(ByteBuffer(string: "GO"), lineGo) + let linePid = try await stdoutLines.next().map(String.init(buffer:)) + let sleepPid = try XCTUnwrap(linePid.flatMap { CInt($0) }) + self.logger.debug("found our sleep grand-child", metadata: ["pid": "\(sleepPid)"]) + sleepPidToKill = sleepPid + let stderrBytes = try await stderrAsync + XCTAssertEqual([], stderrBytes) + let result = try await resultAsync + XCTAssertEqual(.exit(0), result.exitReason) + XCTAssertNotNil(result.standardInputWriteError) + XCTAssertEqual(ChannelError.ioOnClosedChannel, result.standardInputWriteError as? ChannelError) + stdinStreamProducer.finish() + } + } + + #if !os(Linux) // https://github.com/apple/swift-nio/issues/3294 + func testWeDoHangIfStandardInputWriterCouldStillWriteIfWeDisableCancellingInputWriterAfterExit() async throws { + // Here, we do the same thing as in testWeDoNotHangIfStandardInputRemainsOpenButProcessExits but to make matters + // worse, we're setting `spawnOptions.cancelStandardInputWritingWhenProcessExits = false` which means that we're + // not gonna return because the write will be hanging until we kill our long sleep. + + enum WhoReturned { + case processRun + case waiter + } + + try await withThrowingTaskGroup(of: WhoReturned.self) { group in + let (stdinStream, stdinStreamProducer) = AsyncStream.makeStream(of: ByteBuffer.self) + var spawnOptions = ProcessExecutor.SpawnOptions.default + spawnOptions.cancelStandardInputWritingWhenProcessExits = false + let exe = ProcessExecutor( + executable: "/bin/sh", + [ + "-c", + #""" + # This construction attempts to emulate a simple `sleep 12345678 < /dev/null` but some shells (eg. dash) + # won't allow stdin inheritance for background processes... + exec 2>&- # close stderr + exec 2<&0 # duplicate stdin into fd 2 (so we can inherit it into sleep + + ( + exec 0<&2 # map the duplicated fd 2 as our stdin + exec 2>&- # close the duplicated fd2 + exec sleep 12345678 # sleep (this will now have the origin stdin as its stdin) + ) & # uber long sleep that will inherit our stdin pipe + exec 2>&- # close duplicated 2 + + read -r line + echo "$line" # write back the line + echo "$!" # write back the sleep + exec >&- + exit 0 + """#, + ], + spawnOptions: spawnOptions, + standardInput: stdinStream + ) + stdinStreamProducer.yield(ByteBuffer(string: "GO\n")) + stdinStreamProducer.yield(ByteBuffer(repeating: 0x42, count: 32 * 1024 * 1024)) + + group.addTask { + let result = try await exe.runWithExtendedInfo() + XCTAssertEqual(.exit(0), result.exitReason) + XCTAssertNotNil(result.standardInputWriteError) + XCTAssert( + [ + .some(EPIPE), + .some(EBADF), // don't worry, this is a NIO-synthesised (already closed) EBADF + ].contains(result.standardInputWriteError.flatMap { $0 as? NIO.IOError }.map { $0.errnoCode }), + "unexpected error: \(result.standardInputWriteError.debugDescription)" + ) + stdinStreamProducer.finish() + return .processRun + } + var stdoutLines = await exe.standardOutput.splitIntoLines().makeAsyncIterator() + let lineGo = try await stdoutLines.next() + XCTAssertEqual(ByteBuffer(string: "GO"), lineGo) + let linePid = try await stdoutLines.next().map(String.init(buffer:)) + let sleepPid = try XCTUnwrap(linePid.flatMap { CInt($0) }) + self.logger.debug("found our sleep grand-child", metadata: ["pid": "\(sleepPid)"]) + + group.addTask { + try? await Task.sleep(nanoseconds: 500_000_000) // Wait until we're confident that we're stuck + return .waiter + } + + // The situation we set up is the following + // - Our direct child process will have exited here + // - Our grand child (sleep 12345678) is still running and has the stdin pipe + // - We switched off cancelling the stdin writer when our child exits + // - We're stuck now ... + // - ... until our `.waiter` returns + // - When we kill the grand-child + // - Which then unblocks everything else + + let actualReturn1 = try await group.next()! + XCTAssertEqual(.waiter, actualReturn1) + + let stderrBytes = try await Array(exe.standardError) + XCTAssertEqual([], stderrBytes, "\(stderrBytes.map { $0.hexDump(format: .plain(maxBytes: .max)) })") + + let killRet = kill(sleepPid, SIGKILL) + XCTAssertEqual(0, killRet, "kill failed: \(errno)") + + stdinStreamProducer.yield(ByteBuffer(repeating: 0x42, count: 1 * 1024 * 1024)) + + let actualReturn2 = try await group.next()! + XCTAssertEqual(.processRun, actualReturn2) + } + } + #endif + + func testTinyOutputConsumedAfterRun() async throws { + let exe = ProcessExecutor( + executable: "/bin/sh", + ["-c", "echo O; echo >&2 E"] + ) + let result = try await exe.run() + XCTAssertEqual(.exit(0), result) + let stdout = try await Array(await exe.standardOutput.splitIntoLines()) + XCTAssertEqual([ByteBuffer(string: "O")], stdout) + let stderr = try await Array(await exe.standardError.splitIntoLines()) + XCTAssertEqual([ByteBuffer(string: "E")], stderr) + } + + func testTinyOutputConsumedDuringRun() async throws { + let exe = ProcessExecutor( + executable: "/bin/sh", + ["-c", "echo O; echo >&2 E"] + ) + async let asyncResult = exe.run() + try await Task.sleep(nanoseconds: .random(in: 0..<10_000_000)) + let stdout = try await Array(await exe.standardOutput.splitIntoLines()) + XCTAssertEqual([ByteBuffer(string: "O")], stdout) + let stderr = try await Array(await exe.standardError.splitIntoLines()) + XCTAssertEqual([ByteBuffer(string: "E")], stderr) + let result = try await asyncResult + XCTAssertEqual(.exit(0), result) + } + // MARK: - Setup/teardown override func setUp() async throws { self.group = MultiThreadedEventLoopGroup(numberOfThreads: 3)