@@ -57,14 +57,10 @@ class SimpleQueryStateMachineTests: XCTestCase {
57
57
let input : [ RowDescription . Column ] = [
58
58
. init( name: " version " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . text, dataTypeSize: - 1 , dataTypeModifier: - 1 , format: . text)
59
59
]
60
- let expected : [ RowDescription . Column ] = input. map {
61
- . init( name: $0. name, tableOID: $0. tableOID, columnAttributeNumber: $0. columnAttributeNumber, dataType: $0. dataType,
62
- dataTypeSize: $0. dataTypeSize, dataTypeModifier: $0. dataTypeModifier, format: . text)
63
- }
64
60
65
61
XCTAssertEqual ( state. rowDescriptionReceived ( . init( columns: input) ) , . wait)
66
62
let row1 : DataRow = [ ByteBuffer ( string: " test1 " ) ]
67
- let result = QueryResult ( value: . rowDescription( expected ) , logger: queryContext. logger)
63
+ let result = QueryResult ( value: . rowDescription( input ) , logger: queryContext. logger)
68
64
XCTAssertEqual ( state. dataRowReceived ( row1) , . succeedQuery( promise, with: result) )
69
65
XCTAssertEqual ( state. channelReadComplete ( ) , . forwardRows( [ row1] ) )
70
66
XCTAssertEqual ( state. readEventCaught ( ) , . wait)
@@ -128,7 +124,7 @@ class SimpleQueryStateMachineTests: XCTestCase {
128
124
. failQuery( promise, with: psqlError, cleanupContext: . init( action: . close, tasks: [ ] , error: psqlError, closePromise: nil ) ) )
129
125
}
130
126
131
- func testQueryIsCancelledImmediatly ( ) {
127
+ func testQueryIsCancelledImmediately ( ) {
132
128
var state = ConnectionStateMachine . readyForQuery ( )
133
129
134
130
let logger = Logger . psqlTest
@@ -183,6 +179,46 @@ class SimpleQueryStateMachineTests: XCTestCase {
183
179
XCTAssertEqual ( state. readyForQueryReceived ( . idle) , . fireEventReadyForQuery)
184
180
}
185
181
182
+ func testQueryIsCancelledWithReadPendingWhileStreaming( ) {
183
+ var state = ConnectionStateMachine . readyForQuery ( )
184
+
185
+ let logger = Logger . psqlTest
186
+ let promise = EmbeddedEventLoop ( ) . makePromise ( of: PSQLRowStream . self)
187
+ promise. fail ( PSQLError . uncleanShutdown) // we don't care about the error at all.
188
+ let query = " SELECT version() "
189
+ let queryContext = SimpleQueryContext ( query: query, logger: logger, promise: promise)
190
+
191
+ XCTAssertEqual ( state. enqueue ( task: . simpleQuery( queryContext) ) , . sendQuery( query) )
192
+
193
+ // We need to ensure that even though the row description from the wire says that we
194
+ // will receive data in `.text` format, we will actually receive it in binary format,
195
+ // since we requested it in binary with our bind message.
196
+ let input : [ RowDescription . Column ] = [
197
+ . init( name: " version " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . text, dataTypeSize: - 1 , dataTypeModifier: - 1 , format: . text)
198
+ ]
199
+
200
+ XCTAssertEqual ( state. rowDescriptionReceived ( . init( columns: input) ) , . wait)
201
+ let row1 : DataRow = [ ByteBuffer ( string: " test1 " ) ]
202
+ let result = QueryResult ( value: . rowDescription( input) , logger: queryContext. logger)
203
+ XCTAssertEqual ( state. dataRowReceived ( row1) , . succeedQuery( promise, with: result) )
204
+ XCTAssertEqual ( state. cancelQueryStream ( ) , . forwardStreamError( . queryCancelled, read: false , cleanupContext: nil ) )
205
+ XCTAssertEqual ( state. channelReadComplete ( ) , . wait)
206
+ XCTAssertEqual ( state. readEventCaught ( ) , . read)
207
+
208
+ XCTAssertEqual ( state. dataRowReceived ( [ ByteBuffer ( string: " test2 " ) ] ) , . wait)
209
+ XCTAssertEqual ( state. dataRowReceived ( [ ByteBuffer ( string: " test3 " ) ] ) , . wait)
210
+ XCTAssertEqual ( state. dataRowReceived ( [ ByteBuffer ( string: " test4 " ) ] ) , . wait)
211
+ XCTAssertEqual ( state. channelReadComplete ( ) , . wait)
212
+ XCTAssertEqual ( state. readEventCaught ( ) , . read)
213
+
214
+ XCTAssertEqual ( state. channelReadComplete ( ) , . wait)
215
+ XCTAssertEqual ( state. readEventCaught ( ) , . read)
216
+
217
+ XCTAssertEqual ( state. commandCompletedReceived ( " SELECT 2 " ) , . wait)
218
+ XCTAssertEqual ( state. readyForQueryReceived ( . idle) , . fireEventReadyForQuery)
219
+ }
220
+
221
+
186
222
func testCancelQueryAfterServerError( ) {
187
223
var state = ConnectionStateMachine . readyForQuery ( )
188
224
@@ -200,13 +236,9 @@ class SimpleQueryStateMachineTests: XCTestCase {
200
236
let input : [ RowDescription . Column ] = [
201
237
. init( name: " version " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . text, dataTypeSize: - 1 , dataTypeModifier: - 1 , format: . text)
202
238
]
203
- let expected : [ RowDescription . Column ] = input. map {
204
- . init( name: $0. name, tableOID: $0. tableOID, columnAttributeNumber: $0. columnAttributeNumber, dataType: $0. dataType,
205
- dataTypeSize: $0. dataTypeSize, dataTypeModifier: $0. dataTypeModifier, format: . text)
206
- }
207
239
208
240
XCTAssertEqual ( state. rowDescriptionReceived ( . init( columns: input) ) , . wait)
209
- let result = QueryResult ( value: . rowDescription( expected ) , logger: queryContext. logger)
241
+ let result = QueryResult ( value: . rowDescription( input ) , logger: queryContext. logger)
210
242
let row1 : DataRow = [ ByteBuffer ( string: " test1 " ) ]
211
243
XCTAssertEqual ( state. dataRowReceived ( row1) , . succeedQuery( promise, with: result) )
212
244
0 commit comments