Skip to content

Commit 5d817be

Browse files
authored
Improve transaction handling (#538)
1 parent 712740b commit 5d817be

File tree

4 files changed

+199
-22
lines changed

4 files changed

+199
-22
lines changed

Sources/PostgresNIO/Connection/PostgresConnection.swift

+104
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,110 @@ extension PostgresConnection {
530530
throw error // rethrow with more metadata
531531
}
532532
}
533+
534+
#if compiler(>=6.0)
535+
/// Puts the connection into an open transaction state, for the provided `closure`'s lifetime.
536+
///
537+
/// The function starts a transaction by running a `BEGIN` query on the connection against the database. It then
538+
/// lends the connection to the user provided closure. The user can then modify the database as they wish. If the user
539+
/// provided closure returns successfully, the function will attempt to commit the changes by running a `COMMIT`
540+
/// query against the database. If the user provided closure throws an error, the function will attempt to rollback the
541+
/// changes made within the closure.
542+
///
543+
/// - Parameters:
544+
/// - logger: The `Logger` to log into for the transaction.
545+
/// - file: The file, the transaction was started in. Used for better error reporting.
546+
/// - line: The line, the transaction was started in. Used for better error reporting.
547+
/// - closure: The user provided code to modify the database. Use the provided connection to run queries.
548+
/// The connection must stay in the transaction mode. Otherwise this method will throw!
549+
/// - Returns: The closure's return value.
550+
public func withTransaction<Result>(
551+
logger: Logger,
552+
file: String = #file,
553+
line: Int = #line,
554+
isolation: isolated (any Actor)? = #isolation,
555+
// DO NOT FIX THE WHITESPACE IN THE NEXT LINE UNTIL 5.10 IS UNSUPPORTED
556+
// https://github.com/swiftlang/swift/issues/79285
557+
_ process: (PostgresConnection) async throws -> sending Result) async throws -> sending Result {
558+
do {
559+
try await self.query("BEGIN;", logger: logger)
560+
} catch {
561+
throw PostgresTransactionError(file: file, line: line, beginError: error)
562+
}
563+
564+
var closureHasFinished: Bool = false
565+
do {
566+
let value = try await process(self)
567+
closureHasFinished = true
568+
try await self.query("COMMIT;", logger: logger)
569+
return value
570+
} catch {
571+
var transactionError = PostgresTransactionError(file: file, line: line)
572+
if !closureHasFinished {
573+
transactionError.closureError = error
574+
do {
575+
try await self.query("ROLLBACK;", logger: logger)
576+
} catch {
577+
transactionError.rollbackError = error
578+
}
579+
} else {
580+
transactionError.commitError = error
581+
}
582+
583+
throw transactionError
584+
}
585+
}
586+
#else
587+
/// Puts the connection into an open transaction state, for the provided `closure`'s lifetime.
588+
///
589+
/// The function starts a transaction by running a `BEGIN` query on the connection against the database. It then
590+
/// lends the connection to the user provided closure. The user can then modify the database as they wish. If the user
591+
/// provided closure returns successfully, the function will attempt to commit the changes by running a `COMMIT`
592+
/// query against the database. If the user provided closure throws an error, the function will attempt to rollback the
593+
/// changes made within the closure.
594+
///
595+
/// - Parameters:
596+
/// - logger: The `Logger` to log into for the transaction.
597+
/// - file: The file, the transaction was started in. Used for better error reporting.
598+
/// - line: The line, the transaction was started in. Used for better error reporting.
599+
/// - closure: The user provided code to modify the database. Use the provided connection to run queries.
600+
/// The connection must stay in the transaction mode. Otherwise this method will throw!
601+
/// - Returns: The closure's return value.
602+
public func withTransaction<Result>(
603+
logger: Logger,
604+
file: String = #file,
605+
line: Int = #line,
606+
_ process: (PostgresConnection) async throws -> Result
607+
) async throws -> Result {
608+
do {
609+
try await self.query("BEGIN;", logger: logger)
610+
} catch {
611+
throw PostgresTransactionError(file: file, line: line, beginError: error)
612+
}
613+
614+
var closureHasFinished: Bool = false
615+
do {
616+
let value = try await process(self)
617+
closureHasFinished = true
618+
try await self.query("COMMIT;", logger: logger)
619+
return value
620+
} catch {
621+
var transactionError = PostgresTransactionError(file: file, line: line)
622+
if !closureHasFinished {
623+
transactionError.closureError = error
624+
do {
625+
try await self.query("ROLLBACK;", logger: logger)
626+
} catch {
627+
transactionError.rollbackError = error
628+
}
629+
} else {
630+
transactionError.commitError = error
631+
}
632+
633+
throw transactionError
634+
}
635+
}
636+
#endif
533637
}
534638

535639
// MARK: EventLoopFuture interface
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/// A wrapper around the errors that can occur during a transaction.
2+
public struct PostgresTransactionError: Error {
3+
4+
/// The file in which the transaction was started
5+
public var file: String
6+
/// The line in which the transaction was started
7+
public var line: Int
8+
9+
/// The error thrown when running the `BEGIN` query
10+
public var beginError: Error?
11+
/// The error thrown in the transaction closure
12+
public var closureError: Error?
13+
14+
/// The error thrown while rolling the transaction back. If the ``closureError`` is set,
15+
/// but the ``rollbackError`` is empty, the rollback was successful. If the ``rollbackError``
16+
/// is set, the rollback failed.
17+
public var rollbackError: Error?
18+
19+
/// The error thrown while commiting the transaction.
20+
public var commitError: Error?
21+
}

Sources/PostgresNIO/Pool/PostgresClient.swift

+68-16
Original file line numberDiff line numberDiff line change
@@ -293,42 +293,94 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
293293
return ConnectionAndMetadata(connection: connection, maximalStreamsOnConnection: 1)
294294
}
295295
}
296-
297296

298297
/// Lease a connection for the provided `closure`'s lifetime.
299298
///
300299
/// - Parameter closure: A closure that uses the passed `PostgresConnection`. The closure **must not** capture
301300
/// the provided `PostgresConnection`.
302301
/// - Returns: The closure's return value.
302+
@_disfavoredOverload
303303
public func withConnection<Result>(_ closure: (PostgresConnection) async throws -> Result) async throws -> Result {
304304
let connection = try await self.leaseConnection()
305305

306306
defer { self.pool.releaseConnection(connection) }
307307

308308
return try await closure(connection)
309309
}
310-
310+
311+
#if compiler(>=6.0)
311312
/// Lease a connection for the provided `closure`'s lifetime.
312-
/// A transation starts with call to withConnection
313-
/// A transaction should end with a call to COMMIT or ROLLBACK
314-
/// COMMIT is called upon successful completion and ROLLBACK is called should any steps fail
315313
///
316314
/// - Parameter closure: A closure that uses the passed `PostgresConnection`. The closure **must not** capture
317315
/// the provided `PostgresConnection`.
318316
/// - Returns: The closure's return value.
319-
public func withTransaction<Result>(_ process: (PostgresConnection) async throws -> Result) async throws -> Result {
320-
try await withConnection { connection in
321-
try await connection.query("BEGIN;", logger: self.backgroundLogger)
322-
do {
323-
let value = try await process(connection)
324-
try await connection.query("COMMIT;", logger: self.backgroundLogger)
325-
return value
326-
} catch {
327-
try await connection.query("ROLLBACK;", logger: self.backgroundLogger)
328-
throw error
329-
}
317+
public func withConnection<Result>(
318+
isolation: isolated (any Actor)? = #isolation,
319+
// DO NOT FIX THE WHITESPACE IN THE NEXT LINE UNTIL 5.10 IS UNSUPPORTED
320+
// https://github.com/swiftlang/swift/issues/79285
321+
_ closure: (PostgresConnection) async throws -> sending Result) async throws -> sending Result {
322+
let connection = try await self.leaseConnection()
323+
324+
defer { self.pool.releaseConnection(connection) }
325+
326+
return try await closure(connection)
327+
}
328+
329+
/// Lease a connection, which is in an open transaction state, for the provided `closure`'s lifetime.
330+
///
331+
/// The function leases a connection from the underlying connection pool and starts a transaction by running a `BEGIN`
332+
/// query on the leased connection against the database. It then lends the connection to the user provided closure.
333+
/// The user can then modify the database as they wish. If the user provided closure returns successfully, the function
334+
/// will attempt to commit the changes by running a `COMMIT` query against the database. If the user provided closure
335+
/// throws an error, the function will attempt to rollback the changes made within the closure.
336+
///
337+
/// - Parameters:
338+
/// - logger: The `Logger` to log into for the transaction.
339+
/// - file: The file, the transaction was started in. Used for better error reporting.
340+
/// - line: The line, the transaction was started in. Used for better error reporting.
341+
/// - closure: The user provided code to modify the database. Use the provided connection to run queries.
342+
/// The connection must stay in the transaction mode. Otherwise this method will throw!
343+
/// - Returns: The closure's return value.
344+
public func withTransaction<Result>(
345+
logger: Logger,
346+
file: String = #file,
347+
line: Int = #line,
348+
isolation: isolated (any Actor)? = #isolation,
349+
// DO NOT FIX THE WHITESPACE IN THE NEXT LINE UNTIL 5.10 IS UNSUPPORTED
350+
// https://github.com/swiftlang/swift/issues/79285
351+
_ closure: (PostgresConnection) async throws -> sending Result) async throws -> sending Result {
352+
try await self.withConnection { connection in
353+
try await connection.withTransaction(logger: logger, file: file, line: line, closure)
354+
}
355+
}
356+
#else
357+
358+
/// Lease a connection, which is in an open transaction state, for the provided `closure`'s lifetime.
359+
///
360+
/// The function leases a connection from the underlying connection pool and starts a transaction by running a `BEGIN`
361+
/// query on the leased connection against the database. It then lends the connection to the user provided closure.
362+
/// The user can then modify the database as they wish. If the user provided closure returns successfully, the function
363+
/// will attempt to commit the changes by running a `COMMIT` query against the database. If the user provided closure
364+
/// throws an error, the function will attempt to rollback the changes made within the closure.
365+
///
366+
/// - Parameters:
367+
/// - logger: The `Logger` to log into for the transaction.
368+
/// - file: The file, the transaction was started in. Used for better error reporting.
369+
/// - line: The line, the transaction was started in. Used for better error reporting.
370+
/// - closure: The user provided code to modify the database. Use the provided connection to run queries.
371+
/// The connection must stay in the transaction mode. Otherwise this method will throw!
372+
/// - Returns: The closure's return value.
373+
public func withTransaction<Result>(
374+
logger: Logger,
375+
file: String = #file,
376+
line: Int = #line,
377+
_ closure: (PostgresConnection) async throws -> Result
378+
) async throws -> Result {
379+
try await self.withConnection { connection in
380+
try await connection.withTransaction(logger: logger, file: file, line: line, closure)
330381
}
331382
}
383+
#endif
332384

333385
/// Run a query on the Postgres server the client is connected to.
334386
///

Tests/IntegrationTests/PostgresClientTests.swift

+6-6
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ final class PostgresClientTests: XCTestCase {
7777

7878
for _ in 0..<iterations {
7979
taskGroup.addTask {
80-
let _ = try await client.withTransaction { transaction in
80+
let _ = try await client.withTransaction(logger: logger) { transaction in
8181
try await transaction.query(
8282
"""
8383
INSERT INTO "\(unescaped: tableName)" (uuid) VALUES (\(UUID()));
@@ -101,7 +101,7 @@ final class PostgresClientTests: XCTestCase {
101101
taskGroup.addTask {
102102

103103
do {
104-
let _ = try await client.withTransaction { transaction in
104+
let _ = try await client.withTransaction(logger: logger) { transaction in
105105
/// insert valid data
106106
try await transaction.query(
107107
"""
@@ -120,10 +120,10 @@ final class PostgresClientTests: XCTestCase {
120120
}
121121
} catch {
122122
XCTAssertNotNil(error)
123-
guard let error = error as? PSQLError else { return XCTFail("Unexpected error type") }
124-
125-
XCTAssertEqual(error.code, .server)
126-
XCTAssertEqual(error.serverInfo?[.severity], "ERROR")
123+
guard let error = error as? PostgresTransactionError else { return XCTFail("Unexpected error type: \(error)") }
124+
125+
XCTAssertEqual((error.closureError as? PSQLError)?.code, .server)
126+
XCTAssertEqual((error.closureError as? PSQLError)?.serverInfo?[.severity], "ERROR")
127127
}
128128
}
129129

0 commit comments

Comments
 (0)