Skip to content

Commit 2ad4173

Browse files
committed
support amazonlinux2 & many fixes
1 parent f396a28 commit 2ad4173

File tree

12 files changed

+1367
-161
lines changed

12 files changed

+1367
-161
lines changed

Package.swift

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,12 @@ let package = Package(
7979

8080
// `AsyncProcess` modules and dependencies
8181

82-
.target(name: "CProcessSpawnSync"),
82+
.target(
83+
name: "CProcessSpawnSync",
84+
cSettings: [
85+
.define("_GNU_SOURCE")
86+
]
87+
),
8388
.target(
8489
name: "ProcessSpawnSync",
8590
dependencies: [

Sources/AsyncProcess/ChunkSequence.swift

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,41 @@ public struct IllegalStreamConsumptionError: Error {
2323
}
2424

2525
public struct ChunkSequence: AsyncSequence & Sendable {
26-
private let fileHandle: FileHandle?
27-
private let group: EventLoopGroup
26+
private let contentStream: FileContentStream?
2827

29-
public init(takingOwnershipOfFileHandle fileHandle: FileHandle?, group: EventLoopGroup) {
30-
self.group = group
31-
self.fileHandle = fileHandle
28+
public init(
29+
takingOwnershipOfFileHandle fileHandle: FileHandle,
30+
group: EventLoopGroup
31+
) async throws {
32+
// This will close the fileHandle
33+
let contentStream = try await fileHandle.fileContentStream(eventLoop: group.any())
34+
self.init(contentStream: contentStream)
35+
}
36+
37+
internal func isSameAs(_ other: ChunkSequence) -> Bool {
38+
guard let myContentStream = self.contentStream else {
39+
return other.contentStream == nil
40+
}
41+
guard let otherContentStream = other.contentStream else {
42+
return self.contentStream == nil
43+
}
44+
return myContentStream.isSameAs(otherContentStream)
45+
}
46+
47+
public func close() async throws {
48+
try await self.contentStream?.close()
49+
}
50+
51+
private init(contentStream: FileContentStream?) {
52+
self.contentStream = contentStream
53+
}
54+
55+
public static func makeEmptyStream() -> Self {
56+
return Self.init(contentStream: nil)
3257
}
3358

3459
public func makeAsyncIterator() -> AsyncIterator {
35-
// This will close the file handle.
36-
return AsyncIterator(try! self.fileHandle?.fileContentStream(eventLoop: group.any()))
60+
return AsyncIterator(self.contentStream)
3761
}
3862

3963
public typealias Element = ByteBuffer

Sources/AsyncProcess/FileContentStream.swift

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,25 @@ public struct _FileContentStream: AsyncSequence & Sendable {
2424
public typealias Element = ByteBuffer
2525
typealias Underlying = AsyncThrowingChannel<Element, Error>
2626

27-
public func makeAsyncIterator() -> AsyncIterator {
28-
return AsyncIterator(underlying: self.asyncChannel.makeAsyncIterator())
29-
}
30-
31-
public struct AsyncIterator: AsyncIteratorProtocol {
27+
public final class AsyncIterator: AsyncIteratorProtocol {
3228
public typealias Element = ByteBuffer
3329

30+
deinit {
31+
// This is painful and so wrong but unfortunately, our iterators don't have a cancel signal, so the only
32+
// thing we can do is hope for `deinit` to be invoked :(.
33+
// AsyncIteratorProtocol also doesn't support `~Copyable` so we also have to make this a class.
34+
self.channel?.close(promise: nil)
35+
}
36+
37+
init(underlying: Underlying.AsyncIterator, channel: (any Channel)?) {
38+
self.underlying = underlying
39+
self.channel = channel
40+
}
41+
3442
var underlying: Underlying.AsyncIterator
43+
let channel: (any Channel)?
3544

36-
public mutating func next() async throws -> ByteBuffer? {
45+
public func next() async throws -> ByteBuffer? {
3746
return try await self.underlying.next()
3847
}
3948
}
@@ -47,22 +56,41 @@ public struct _FileContentStream: AsyncSequence & Sendable {
4756
}
4857

4958
private let asyncChannel: AsyncThrowingChannel<ByteBuffer, Error>
59+
private let channel: (any Channel)?
60+
61+
internal func isSameAs(_ other: FileContentStream) -> Bool {
62+
return (self.asyncChannel === other.asyncChannel) && (self.channel === other.channel)
63+
}
64+
65+
public func makeAsyncIterator() -> AsyncIterator {
66+
return AsyncIterator(
67+
underlying: self.asyncChannel.makeAsyncIterator(),
68+
channel: self.channel
69+
)
70+
}
71+
72+
public func close() async throws {
73+
self.asyncChannel.finish()
74+
do {
75+
try await self.channel?.close().get()
76+
} catch ChannelError.alreadyClosed {
77+
// That's okay
78+
}
79+
}
5080

5181
public static func makeReader(
5282
fileDescriptor: CInt,
5383
eventLoop: EventLoop = MultiThreadedEventLoopGroup.singleton.any(),
5484
blockingPool: NIOThreadPool = .singleton
5585
) async throws -> _FileContentStream {
56-
return try await eventLoop.submit {
57-
try FileContentStream(fileDescriptor: fileDescriptor, eventLoop: eventLoop, blockingPool: blockingPool)
58-
}.get()
86+
try await FileContentStream(fileDescriptor: fileDescriptor, eventLoop: eventLoop, blockingPool: blockingPool)
5987
}
6088

6189
internal init(
6290
fileDescriptor: CInt,
6391
eventLoop: EventLoop,
6492
blockingPool: NIOThreadPool? = nil
65-
) throws {
93+
) async throws {
6694
var statInfo: stat = .init()
6795
let statError = fstat(fileDescriptor, &statInfo)
6896
if statError != 0 {
@@ -103,23 +131,36 @@ public struct _FileContentStream: AsyncSequence & Sendable {
103131
asyncChannel.finish()
104132
}
105133
}
134+
self.channel = nil
106135
case S_IFSOCK:
107-
_ = ClientBootstrap(group: eventLoop)
136+
self.channel = try await ClientBootstrap(group: eventLoop)
108137
.channelInitializer { channel in
109-
channel.pipeline.addHandler(ReadIntoAsyncChannelHandler(sink: asyncChannel))
138+
do {
139+
try channel.pipeline.syncOperations.addHandler(ReadIntoAsyncChannelHandler(sink: asyncChannel))
140+
return channel.eventLoop.makeSucceededFuture(())
141+
} catch {
142+
return channel.eventLoop.makeFailedFuture(error)
143+
}
110144
}
111145
.withConnectedSocket(dupedFD)
146+
.get()
112147
case S_IFIFO:
113-
NIOPipeBootstrap(group: eventLoop)
148+
self.channel = try await NIOPipeBootstrap(group: eventLoop)
114149
.channelInitializer { channel in
115-
channel.pipeline.addHandler(ReadIntoAsyncChannelHandler(sink: asyncChannel))
150+
do {
151+
try channel.pipeline.syncOperations.addHandler(ReadIntoAsyncChannelHandler(sink: asyncChannel))
152+
return channel.eventLoop.makeSucceededFuture(())
153+
} catch {
154+
return channel.eventLoop.makeFailedFuture(error)
155+
}
116156
}
117157
.takingOwnershipOfDescriptor(
118158
input: dupedFD
119159
)
120-
.whenSuccess { channel in
160+
.map { channel in
121161
channel.close(mode: .output, promise: nil)
122-
}
162+
return channel
163+
}.get()
123164
case S_IFDIR:
124165
throw IOError(errnoValue: EISDIR)
125166
case S_IFBLK, S_IFCHR, S_IFLNK:
@@ -265,8 +306,8 @@ private final class ReadIntoAsyncChannelHandler: ChannelDuplexHandler {
265306
}
266307

267308
extension FileHandle {
268-
func fileContentStream(eventLoop: EventLoop) throws -> FileContentStream {
269-
let asyncBytes = try FileContentStream(fileDescriptor: self.fileDescriptor, eventLoop: eventLoop)
309+
func fileContentStream(eventLoop: EventLoop) async throws -> FileContentStream {
310+
let asyncBytes = try await FileContentStream(fileDescriptor: self.fileDescriptor, eventLoop: eventLoop)
270311
try self.close()
271312
return asyncBytes
272313
}

Sources/AsyncProcess/NIOAsyncPipeWriter.swift

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ struct NIOAsyncPipeWriter<Chunks: AsyncSequence & Sendable> where Chunks.Element
1717
static func sinkSequenceInto(
1818
_ chunks: Chunks,
1919
takingOwnershipOfFD fd: CInt,
20+
ignoreWriteErrors: Bool,
2021
eventLoop: EventLoop
2122
) async throws {
2223
let channel = try await NIOPipeBootstrap(group: eventLoop)
@@ -26,11 +27,27 @@ struct NIOAsyncPipeWriter<Chunks: AsyncSequence & Sendable> where Chunks.Element
2627
output: fd
2728
).get()
2829
channel.close(mode: .input, promise: nil)
29-
defer {
30-
channel.close(promise: nil)
31-
}
32-
for try await chunk in chunks {
33-
try await channel.writeAndFlush(chunk).get()
30+
return try await asyncDo {
31+
try await withTaskCancellationHandler {
32+
for try await chunk in chunks {
33+
do {
34+
try await channel.writeAndFlush(chunk).get()
35+
} catch {
36+
if !ignoreWriteErrors {
37+
throw error
38+
}
39+
break
40+
}
41+
}
42+
} onCancel: {
43+
channel.close(promise: nil)
44+
}
45+
} finally: { _ in
46+
do {
47+
try await channel.close()
48+
} catch ChannelError.alreadyClosed {
49+
// ok
50+
}
3451
}
3552
}
3653
}

Sources/AsyncProcess/ProcessExecutor+Convenience.swift

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -235,13 +235,33 @@ extension ProcessExecutor {
235235
}
236236

237237
public struct ProcessExitReasonAndOutput: Sendable & Hashable {
238+
public func hash(into hasher: inout Hasher) {
239+
self.exitReason.hash(into: &hasher)
240+
self.standardOutput.hash(into: &hasher)
241+
self.standardError.hash(into: &hasher)
242+
(self.standardInputWriteError == nil).hash(into: &hasher)
243+
}
244+
245+
public static func == (
246+
lhs: ProcessExecutor.ProcessExitReasonAndOutput,
247+
rhs: ProcessExecutor.ProcessExitReasonAndOutput
248+
) -> Bool {
249+
return lhs.exitReason == rhs.exitReason && lhs.standardOutput == rhs.standardOutput
250+
&& lhs.standardError == rhs.standardError
251+
&& (lhs.standardInputWriteError == nil) == (rhs.standardInputWriteError == nil)
252+
}
253+
238254
public var exitReason: ProcessExitReason
255+
256+
/// Any errors that occurred whilst writing the provided `standardInput` sequence into the child process' standard input.
257+
public var standardInputWriteError: Optional<any Error>
258+
239259
public var standardOutput: ByteBuffer?
240260
public var standardError: ByteBuffer?
241261
}
242262

243263
internal enum ProcessExitInformationPiece {
244-
case exitReason(ProcessExitReason)
264+
case exitReason(ProcessExitExtendedInfo)
245265
case standardOutput(ByteBuffer?)
246266
case standardError(ByteBuffer?)
247267
}
@@ -319,14 +339,20 @@ extension ProcessExecutor {
319339
}
320340

321341
group.addTask {
322-
return .exitReason(try await exe.run())
342+
return .exitReason(try await exe.runWithExtendedInfo())
323343
}
324344

325-
var allInfo = ProcessExitReasonAndOutput(exitReason: .exit(-1), standardOutput: nil, standardError: nil)
345+
var allInfo = ProcessExitReasonAndOutput(
346+
exitReason: .exit(-1),
347+
standardInputWriteError: nil,
348+
standardOutput: nil,
349+
standardError: nil
350+
)
326351
while let next = try await group.next() {
327352
switch next {
328353
case .exitReason(let exitReason):
329-
allInfo.exitReason = exitReason
354+
allInfo.exitReason = exitReason.exitReason
355+
allInfo.standardInputWriteError = exitReason.standardInputWriteError
330356
case .standardOutput(let output):
331357
allInfo.standardOutput = output
332358
case .standardError(let output):

0 commit comments

Comments
 (0)