Skip to content

Commit cb52d81

Browse files
committed
Handle empty query response
1 parent cd5318a commit cb52d81

File tree

6 files changed

+127
-26
lines changed

6 files changed

+127
-26
lines changed

Sources/PostgresNIO/New/Connection State Machine/ExtendedQueryStateMachine.swift

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ struct ExtendedQueryStateMachine {
1010
case parameterDescriptionReceived(ExtendedQueryContext)
1111
case rowDescriptionReceived(ExtendedQueryContext, [RowDescription.Column])
1212
case noDataMessageReceived(ExtendedQueryContext)
13-
13+
case emptyQueryResponseReceived
14+
1415
/// A state that is used if a noData message was received before. If a row description was received `bufferingRows` is
1516
/// used after receiving a `bindComplete` message
1617
case bindCompleteReceived(ExtendedQueryContext)
@@ -122,7 +123,7 @@ struct ExtendedQueryStateMachine {
122123
return .forwardStreamError(.queryCancelled, read: true)
123124
}
124125

125-
case .commandComplete, .error, .drain:
126+
case .commandComplete, .emptyQueryResponseReceived, .error, .drain:
126127
// the stream has already finished.
127128
return .wait
128129

@@ -229,6 +230,7 @@ struct ExtendedQueryStateMachine {
229230
.messagesSent,
230231
.parseCompleteReceived,
231232
.parameterDescriptionReceived,
233+
.emptyQueryResponseReceived,
232234
.bindCompleteReceived,
233235
.streaming,
234236
.drain,
@@ -268,6 +270,7 @@ struct ExtendedQueryStateMachine {
268270
.parseCompleteReceived,
269271
.parameterDescriptionReceived,
270272
.noDataMessageReceived,
273+
.emptyQueryResponseReceived,
271274
.rowDescriptionReceived,
272275
.bindCompleteReceived,
273276
.commandComplete,
@@ -309,6 +312,7 @@ struct ExtendedQueryStateMachine {
309312
.parseCompleteReceived,
310313
.parameterDescriptionReceived,
311314
.noDataMessageReceived,
315+
.emptyQueryResponseReceived,
312316
.rowDescriptionReceived,
313317
.commandComplete,
314318
.error:
@@ -319,7 +323,22 @@ struct ExtendedQueryStateMachine {
319323
}
320324

321325
mutating func emptyQueryResponseReceived() -> Action {
322-
preconditionFailure("Unimplemented")
326+
guard case .bindCompleteReceived(let queryContext) = self.state else {
327+
return self.setAndFireError(.unexpectedBackendMessage(.emptyQueryResponse))
328+
}
329+
330+
switch queryContext.query {
331+
case .unnamed(_, let eventLoopPromise),
332+
.executeStatement(_, let eventLoopPromise):
333+
return self.avoidingStateMachineCoW { state -> Action in
334+
state = .emptyQueryResponseReceived
335+
let result = QueryResult(value: .emptyResponse, logger: queryContext.logger)
336+
return .succeedQuery(eventLoopPromise, with: result)
337+
}
338+
339+
case .prepareStatement(_, _, _, _):
340+
return self.setAndFireError(.unexpectedBackendMessage(.emptyQueryResponse))
341+
}
323342
}
324343

325344
mutating func errorReceived(_ errorMessage: PostgresBackendMessage.ErrorResponse) -> Action {
@@ -336,7 +355,7 @@ struct ExtendedQueryStateMachine {
336355
return self.setAndFireError(error)
337356
case .streaming, .drain:
338357
return self.setAndFireError(error)
339-
case .commandComplete:
358+
case .commandComplete, .emptyQueryResponseReceived:
340359
return self.setAndFireError(.unexpectedBackendMessage(.error(errorMessage)))
341360
case .error:
342361
preconditionFailure("""
@@ -382,6 +401,7 @@ struct ExtendedQueryStateMachine {
382401
.parseCompleteReceived,
383402
.parameterDescriptionReceived,
384403
.noDataMessageReceived,
404+
.emptyQueryResponseReceived,
385405
.rowDescriptionReceived,
386406
.bindCompleteReceived:
387407
preconditionFailure("Requested to consume next row without anything going on.")
@@ -405,6 +425,7 @@ struct ExtendedQueryStateMachine {
405425
.parseCompleteReceived,
406426
.parameterDescriptionReceived,
407427
.noDataMessageReceived,
428+
.emptyQueryResponseReceived,
408429
.rowDescriptionReceived,
409430
.bindCompleteReceived:
410431
return .wait
@@ -449,6 +470,7 @@ struct ExtendedQueryStateMachine {
449470
}
450471
case .initialized,
451472
.commandComplete,
473+
.emptyQueryResponseReceived,
452474
.drain,
453475
.error:
454476
// we already have the complete stream received, now we are waiting for a
@@ -495,7 +517,7 @@ struct ExtendedQueryStateMachine {
495517
return .forwardStreamError(error, read: true)
496518
}
497519

498-
case .commandComplete, .error:
520+
case .commandComplete, .emptyQueryResponseReceived, .error:
499521
preconditionFailure("""
500522
This state must not be reached. If the query `.isComplete`, the
501523
ConnectionStateMachine must not send any further events to the substate machine.
@@ -518,6 +540,9 @@ struct ExtendedQueryStateMachine {
518540
return false
519541
}
520542

543+
case .emptyQueryResponseReceived:
544+
return true
545+
521546
case .initialized, .messagesSent, .parseCompleteReceived, .parameterDescriptionReceived, .bindCompleteReceived, .streaming, .drain:
522547
return false
523548

Sources/PostgresNIO/New/PSQLRowStream.swift

Lines changed: 51 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import Logging
33

44
struct QueryResult {
55
enum Value: Equatable {
6+
case emptyResponse
67
case noRows(String)
78
case rowDescription([RowDescription.Column])
89
}
@@ -19,6 +20,7 @@ final class PSQLRowStream: @unchecked Sendable {
1920
enum Source {
2021
case stream([RowDescription.Column], PSQLRowsDataSource)
2122
case noRows(Result<String, Error>)
23+
case emptyResponse
2224
}
2325

2426
let eventLoop: EventLoop
@@ -27,14 +29,20 @@ final class PSQLRowStream: @unchecked Sendable {
2729
private enum BufferState {
2830
case streaming(buffer: CircularBuffer<DataRow>, dataSource: PSQLRowsDataSource)
2931
case finished(buffer: CircularBuffer<DataRow>, commandTag: String)
32+
case empty
3033
case failure(Error)
3134
}
32-
35+
36+
private enum Consumed {
37+
case tag(String)
38+
case emptyResponse
39+
}
40+
3341
private enum DownstreamState {
3442
case waitingForConsumer(BufferState)
3543
case iteratingRows(onRow: (PostgresRow) throws -> (), EventLoopPromise<Void>, PSQLRowsDataSource)
3644
case waitingForAll([PostgresRow], EventLoopPromise<[PostgresRow]>, PSQLRowsDataSource)
37-
case consumed(Result<String, Error>)
45+
case consumed(Result<Consumed, Error>)
3846
case asyncSequence(AsyncSequenceSource, PSQLRowsDataSource, onFinish: @Sendable () -> ())
3947
}
4048

@@ -58,6 +66,9 @@ final class PSQLRowStream: @unchecked Sendable {
5866
case .noRows(.failure(let error)):
5967
self.rowDescription = []
6068
bufferState = .failure(error)
69+
case .emptyResponse:
70+
self.rowDescription = []
71+
bufferState = .empty
6172
}
6273

6374
self.downstreamState = .waitingForConsumer(bufferState)
@@ -98,11 +109,16 @@ final class PSQLRowStream: @unchecked Sendable {
98109
self.downstreamState = .asyncSequence(source, dataSource, onFinish: onFinish)
99110
self.executeActionBasedOnYieldResult(yieldResult, source: dataSource)
100111

112+
case .empty:
113+
source.finish()
114+
onFinish()
115+
self.downstreamState = .consumed(.success(.emptyResponse))
116+
101117
case .finished(let buffer, let commandTag):
102118
_ = source.yield(contentsOf: buffer)
103119
source.finish()
104120
onFinish()
105-
self.downstreamState = .consumed(.success(commandTag))
121+
self.downstreamState = .consumed(.success(.tag(commandTag)))
106122

107123
case .failure(let error):
108124
source.finish(error)
@@ -195,12 +211,16 @@ final class PSQLRowStream: @unchecked Sendable {
195211
PostgresRow(data: $0, lookupTable: self.lookupTable, columns: self.rowDescription)
196212
}
197213

198-
self.downstreamState = .consumed(.success(commandTag))
214+
self.downstreamState = .consumed(.success(.tag(commandTag)))
199215
return self.eventLoop.makeSucceededFuture(rows)
200216

201217
case .failure(let error):
202218
self.downstreamState = .consumed(.failure(error))
203219
return self.eventLoop.makeFailedFuture(error)
220+
221+
case .empty:
222+
self.downstreamState = .consumed(.success(.emptyResponse))
223+
return self.eventLoop.makeSucceededFuture([])
204224
}
205225
}
206226

@@ -247,7 +267,11 @@ final class PSQLRowStream: @unchecked Sendable {
247267
}
248268

249269
return promise.futureResult
250-
270+
271+
case .empty:
272+
self.downstreamState = .consumed(.success(.emptyResponse))
273+
return self.eventLoop.makeSucceededVoidFuture()
274+
251275
case .finished(let buffer, let commandTag):
252276
do {
253277
for data in buffer {
@@ -259,7 +283,7 @@ final class PSQLRowStream: @unchecked Sendable {
259283
try onRow(row)
260284
}
261285

262-
self.downstreamState = .consumed(.success(commandTag))
286+
self.downstreamState = .consumed(.success(.tag(commandTag)))
263287
return self.eventLoop.makeSucceededVoidFuture()
264288
} catch {
265289
self.downstreamState = .consumed(.failure(error))
@@ -290,9 +314,9 @@ final class PSQLRowStream: @unchecked Sendable {
290314
buffer.append(contentsOf: newRows)
291315
self.downstreamState = .waitingForConsumer(.streaming(buffer: buffer, dataSource: dataSource))
292316

293-
case .waitingForConsumer(.finished), .waitingForConsumer(.failure):
317+
case .waitingForConsumer(.finished), .waitingForConsumer(.failure), .waitingForConsumer(.empty):
294318
preconditionFailure("How can new rows be received, if an end was already signalled?")
295-
319+
296320
case .iteratingRows(let onRow, let promise, let dataSource):
297321
do {
298322
for data in newRows {
@@ -353,20 +377,23 @@ final class PSQLRowStream: @unchecked Sendable {
353377
preconditionFailure("How can we get another end, if an end was already signalled?")
354378

355379
case .iteratingRows(_, let promise, _):
356-
self.downstreamState = .consumed(.success(commandTag))
380+
self.downstreamState = .consumed(.success(.tag(commandTag)))
357381
promise.succeed(())
358382

359383
case .waitingForAll(let rows, let promise, _):
360-
self.downstreamState = .consumed(.success(commandTag))
384+
self.downstreamState = .consumed(.success(.tag(commandTag)))
361385
promise.succeed(rows)
362386

363387
case .asyncSequence(let source, _, let onFinish):
364-
self.downstreamState = .consumed(.success(commandTag))
388+
self.downstreamState = .consumed(.success(.tag(commandTag)))
365389
source.finish()
366390
onFinish()
367391

368-
case .consumed:
392+
case .consumed(.success(.tag)), .consumed(.failure):
369393
break
394+
395+
case .consumed(.success(.emptyResponse)), .waitingForConsumer(.empty):
396+
preconditionFailure("How can we get an end for empty query response?")
370397
}
371398
}
372399

@@ -375,7 +402,7 @@ final class PSQLRowStream: @unchecked Sendable {
375402
case .waitingForConsumer(.streaming):
376403
self.downstreamState = .waitingForConsumer(.failure(error))
377404

378-
case .waitingForConsumer(.finished), .waitingForConsumer(.failure):
405+
case .waitingForConsumer(.finished), .waitingForConsumer(.failure), .waitingForConsumer(.empty):
379406
preconditionFailure("How can we get another end, if an end was already signalled?")
380407

381408
case .iteratingRows(_, let promise, _):
@@ -391,8 +418,11 @@ final class PSQLRowStream: @unchecked Sendable {
391418
consumer.finish(error)
392419
onFinish()
393420

394-
case .consumed:
421+
case .consumed(.success(.tag)), .consumed(.failure):
395422
break
423+
424+
case .consumed(.success(.emptyResponse)):
425+
preconditionFailure("How can we get an error for empty query response?")
396426
}
397427
}
398428

@@ -413,10 +443,15 @@ final class PSQLRowStream: @unchecked Sendable {
413443
}
414444

415445
var commandTag: String {
416-
guard case .consumed(.success(let commandTag)) = self.downstreamState else {
446+
guard case .consumed(.success(let consumed)) = self.downstreamState else {
417447
preconditionFailure("commandTag may only be called if all rows have been consumed")
418448
}
419-
return commandTag
449+
switch consumed {
450+
case .tag(let tag):
451+
return tag
452+
case .emptyResponse:
453+
return ""
454+
}
420455
}
421456
}
422457

Sources/PostgresNIO/New/PostgresChannelHandler.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,13 @@ final class PostgresChannelHandler: ChannelDuplexHandler {
556556
eventLoop: context.channel.eventLoop,
557557
logger: result.logger
558558
)
559+
560+
case .emptyResponse:
561+
rows = PSQLRowStream(
562+
source: .emptyResponse,
563+
eventLoop: context.channel.eventLoop,
564+
logger: result.logger
565+
)
559566
}
560567

561568
promise.succeed(rows)

Sources/PostgresNIO/PostgresDatabase+Query.swift

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,7 @@ public struct PostgresQueryMetadata: Sendable {
7373

7474
init?(string: String) {
7575
let parts = string.split(separator: " ")
76-
guard parts.count >= 1 else {
77-
return nil
78-
}
79-
switch parts[0] {
76+
switch parts.first {
8077
case "INSERT":
8178
// INSERT oid rows
8279
guard parts.count == 3 else {

Tests/IntegrationTests/PSQLIntegrationTests.swift

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,25 @@ final class IntegrationTests: XCTestCase {
123123
XCTAssertEqual(foo, "hello")
124124
}
125125

126+
func testQueryNothing() throws {
127+
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
128+
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
129+
let eventLoop = eventLoopGroup.next()
130+
131+
var conn: PostgresConnection?
132+
XCTAssertNoThrow(conn = try PostgresConnection.test(on: eventLoop).wait())
133+
defer { XCTAssertNoThrow(try conn?.close().wait()) }
134+
135+
var _result: PostgresQueryResult?
136+
XCTAssertNoThrow(_result = try conn?.query("""
137+
-- Some comments
138+
""", logger: .psqlTest).wait())
139+
140+
let result = try XCTUnwrap(_result)
141+
XCTAssertEqual(result.rows, [])
142+
XCTAssertEqual(result.metadata.command, "")
143+
}
144+
126145
func testDecodeIntegers() {
127146
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
128147
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }

Tests/PostgresNIOTests/New/Connection State Machine/ExtendedQueryStateMachineTests.swift

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,25 @@ class ExtendedQueryStateMachineTests: XCTestCase {
7777
XCTAssertEqual(state.commandCompletedReceived("SELECT 2"), .forwardStreamComplete([row5, row6], commandTag: "SELECT 2"))
7878
XCTAssertEqual(state.readyForQueryReceived(.idle), .fireEventReadyForQuery)
7979
}
80-
80+
81+
func testExtendedQueryWithNoQuery() {
82+
var state = ConnectionStateMachine.readyForQuery()
83+
84+
let logger = Logger.psqlTest
85+
let promise = EmbeddedEventLoop().makePromise(of: PSQLRowStream.self)
86+
promise.fail(PSQLError.uncleanShutdown) // we don't care about the error at all.
87+
let query: PostgresQuery = "-- some comments"
88+
let queryContext = ExtendedQueryContext(query: query, logger: logger, promise: promise)
89+
90+
XCTAssertEqual(state.enqueue(task: .extendedQuery(queryContext)), .sendParseDescribeBindExecuteSync(query))
91+
XCTAssertEqual(state.parseCompleteReceived(), .wait)
92+
XCTAssertEqual(state.parameterDescriptionReceived(.init(dataTypes: [.int8])), .wait)
93+
XCTAssertEqual(state.noDataReceived(), .wait)
94+
XCTAssertEqual(state.bindCompleteReceived(), .wait)
95+
XCTAssertEqual(state.emptyQueryResponseReceived(), .succeedQuery(promise, with: .init(value: .emptyResponse, logger: logger)))
96+
XCTAssertEqual(state.readyForQueryReceived(.idle), .fireEventReadyForQuery)
97+
}
98+
8199
func testReceiveTotallyUnexpectedMessageInQuery() {
82100
var state = ConnectionStateMachine.readyForQuery()
83101

0 commit comments

Comments
 (0)