Skip to content

Commit 8138f43

Browse files
committed
Avoid storing ChannelHandlerContext in PostgresCopyFromWriter
1 parent 7d42173 commit 8138f43

File tree

2 files changed

+23
-25
lines changed

2 files changed

+23
-25
lines changed

Sources/PostgresNIO/Connection/PostgresConnection.swift

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -710,12 +710,10 @@ public struct PostgresCopyFromWriter: Sendable {
710710
}
711711

712712
private let channelHandler: NIOLoopBound<PostgresChannelHandler>
713-
private let context: NIOLoopBound<ChannelHandlerContext>
714713
private let eventLoop: any EventLoop
715714

716-
init(handler: PostgresChannelHandler, context: ChannelHandlerContext, eventLoop: any EventLoop) {
715+
init(handler: PostgresChannelHandler, eventLoop: any EventLoop) {
717716
self.channelHandler = NIOLoopBound(handler, eventLoop: eventLoop)
718-
self.context = NIOLoopBound(context, eventLoop: eventLoop)
719717
self.eventLoop = eventLoop
720718
}
721719

@@ -728,20 +726,20 @@ public struct PostgresCopyFromWriter: Sendable {
728726
// error during the data transfer and thus cannot process any more data.
729727
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, any Error>) in
730728
if eventLoop.inEventLoop {
731-
self.channelHandler.value.waitForWritableBuffer(context: self.context.value, continuation)
729+
self.channelHandler.value.waitForWritableBuffer(continuation)
732730
} else {
733731
eventLoop.execute {
734-
self.channelHandler.value.waitForWritableBuffer(context: self.context.value, continuation)
732+
self.channelHandler.value.waitForWritableBuffer(continuation)
735733
}
736734
}
737735
}
738736

739737
// Run the actual data transfer
740738
if eventLoop.inEventLoop {
741-
self.channelHandler.value.copyData(byteBuffer, context: self.context.value)
739+
self.channelHandler.value.copyData(byteBuffer)
742740
} else {
743741
eventLoop.execute {
744-
self.channelHandler.value.copyData(byteBuffer, context: self.context.value)
742+
self.channelHandler.value.copyData(byteBuffer)
745743
}
746744
}
747745
}
@@ -751,10 +749,10 @@ public struct PostgresCopyFromWriter: Sendable {
751749
func done() async throws {
752750
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, any Error>) in
753751
if eventLoop.inEventLoop {
754-
self.channelHandler.value.sendCopyDone(continuation: continuation, context: self.context.value)
752+
self.channelHandler.value.sendCopyDone(continuation: continuation)
755753
} else {
756754
eventLoop.execute {
757-
self.channelHandler.value.sendCopyDone(continuation: continuation, context: self.context.value)
755+
self.channelHandler.value.sendCopyDone(continuation: continuation)
758756
}
759757
}
760758
}
@@ -765,10 +763,10 @@ public struct PostgresCopyFromWriter: Sendable {
765763
func failed(error: any Error) async throws {
766764
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, any Error>) in
767765
if eventLoop.inEventLoop {
768-
self.channelHandler.value.sendCopyFailed(message: "\(error)", continuation: continuation, context: self.context.value)
766+
self.channelHandler.value.sendCopyFailed(message: "\(error)", continuation: continuation)
769767
} else {
770768
eventLoop.execute {
771-
self.channelHandler.value.sendCopyFailed(message: "\(error)", continuation: continuation, context: self.context.value)
769+
self.channelHandler.value.sendCopyFailed(message: "\(error)", continuation: continuation)
772770
}
773771
}
774772
}
@@ -777,10 +775,10 @@ public struct PostgresCopyFromWriter: Sendable {
777775
/// Send a `Sync` message to the backend.
778776
func sync() {
779777
if eventLoop.inEventLoop {
780-
self.channelHandler.value.sendSync(context: self.context.value)
778+
self.channelHandler.value.sendSync()
781779
} else {
782780
eventLoop.execute {
783-
self.channelHandler.value.sendSync(context: self.context.value)
781+
self.channelHandler.value.sendSync()
784782
}
785783
}
786784
}

Sources/PostgresNIO/New/PostgresChannelHandler.swift

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -176,11 +176,11 @@ final class PostgresChannelHandler: ChannelDuplexHandler {
176176
///
177177
/// This fails the continuation with a `PostgresCopyFromWriter.CopyCancellationError` when the server has cancelled
178178
/// the data transfer to indicate that the frontend should not send any more data.
179-
func waitForWritableBuffer(context: ChannelHandlerContext, _ continuation: CheckedContinuation<Void, any Error>) {
180-
let action = self.state.waitForWritableBuffer(channel: context.channel, continuation: continuation)
179+
func waitForWritableBuffer(_ continuation: CheckedContinuation<Void, any Error>) {
180+
let action = self.state.waitForWritableBuffer(channel: handlerContext!.channel, continuation: continuation)
181181
switch action {
182182
case .waitForBackpressureRelieve:
183-
context.channel.flush()
183+
self.handlerContext!.channel.flush()
184184
case .resumeContinuation(let continuation):
185185
continuation.resume()
186186
case .failContinuation(_, error: let error):
@@ -189,27 +189,27 @@ final class PostgresChannelHandler: ChannelDuplexHandler {
189189
}
190190

191191
/// Send a `CopyData` message to the backend using the given data.
192-
func copyData(_ data: ByteBuffer, context: ChannelHandlerContext) {
192+
func copyData(_ data: ByteBuffer) {
193193
self.encoder.copyData(data: data)
194-
context.write(self.wrapOutboundOut(self.encoder.flushBuffer()), promise: nil)
194+
self.handlerContext!.write(self.wrapOutboundOut(self.encoder.flushBuffer()), promise: nil)
195195
}
196196

197197
/// Put the state machine out of the copying mode and send a `CopyDone` message to the backend.
198-
func sendCopyDone(continuation: CheckedContinuation<Void, any Error>, context: ChannelHandlerContext) {
198+
func sendCopyDone(continuation: CheckedContinuation<Void, any Error>) {
199199
let action = self.state.sendCopyDone(continuation: continuation)
200-
self.run(action, with: context)
200+
self.run(action, with: self.handlerContext!)
201201
}
202202

203203
/// Put the state machine out of the copying mode and send a `CopyFail` message to the backend.
204-
func sendCopyFailed(message: String, continuation: CheckedContinuation<Void, any Error>, context: ChannelHandlerContext) {
204+
func sendCopyFailed(message: String, continuation: CheckedContinuation<Void, any Error>) {
205205
let action = self.state.sendCopyFail(message: message, continuation: continuation)
206-
self.run(action, with: context)
206+
self.run(action, with: self.handlerContext!)
207207
}
208208

209209
/// Send a `Sync` message to the backend.
210-
func sendSync(context: ChannelHandlerContext) {
210+
func sendSync() {
211211
self.encoder.sync()
212-
context.writeAndFlush(self.wrapOutboundOut(self.encoder.flushBuffer()), promise: nil)
212+
self.handlerContext!.writeAndFlush(self.wrapOutboundOut(self.encoder.flushBuffer()), promise: nil)
213213
}
214214

215215
func channelReadComplete(context: ChannelHandlerContext) {
@@ -423,7 +423,7 @@ final class PostgresChannelHandler: ChannelDuplexHandler {
423423
}
424424
continuation.resume(throwing: error)
425425
case .triggerCopyData(let triggerCopy):
426-
let writer = PostgresCopyFromWriter(handler: self, context: context, eventLoop: eventLoop)
426+
let writer = PostgresCopyFromWriter(handler: self, eventLoop: eventLoop)
427427
triggerCopy.resume(returning: writer)
428428
case .sendCopyDone:
429429
self.encoder.copyDone()

0 commit comments

Comments
 (0)