Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 19b15d6

Browse files
authored
Use autocommit mode for single statement DB functions. (#8542)
Autocommit means that we don't wrap the functions in transactions, and instead get executed directly. Introduced in #8456. This will help: 1. reduce the number of `could not serialize access due to concurrent delete` errors that we see (though there are a few functions that often cause serialization errors that we don't fix here); 2. improve the DB performance, as it no longer needs to deal with the overhead of `REPEATABLE READ` isolation levels; and 3. improve wall clock speed of these functions, as we no longer need to send `BEGIN` and `COMMIT` to the DB. Some notes about the differences between autocommit mode and our default `REPEATABLE READ` transactions: 1. Currently `autocommit` only applies when using PostgreSQL, and is ignored when using SQLite (due to silliness with [Twisted DB classes](https://twistedmatrix.com/trac/ticket/9998)). 2. Autocommit functions may get retried on error, which means they can get applied *twice* (or more) to the DB (since they are not in a transaction the previous call would not get rolled back). This means that the functions need to be idempotent (or otherwise not care about being called multiple times). Read queries, simple deletes, and updates/upserts that replace rows (rather than generating new values from existing rows) are all idempotent. 3. Autocommit functions no longer get executed in [`REPEATABLE READ`](https://www.postgresql.org/docs/current/transaction-iso.html) isolation level, and so data can change queries, which is fine for single statement queries.
1 parent 618d405 commit 19b15d6

File tree

5 files changed

+156
-70
lines changed

5 files changed

+156
-70
lines changed

changelog.d/8542.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Improve database performance by executing more queries without starting transactions.

synapse/storage/database.py

+91-8
Original file line numberDiff line numberDiff line change
@@ -893,6 +893,12 @@ async def simple_upsert(
893893
attempts = 0
894894
while True:
895895
try:
896+
# We can autocommit if we are going to use native upserts
897+
autocommit = (
898+
self.engine.can_native_upsert
899+
and table not in self._unsafe_to_upsert_tables
900+
)
901+
896902
return await self.runInteraction(
897903
desc,
898904
self.simple_upsert_txn,
@@ -901,6 +907,7 @@ async def simple_upsert(
901907
values,
902908
insertion_values,
903909
lock=lock,
910+
db_autocommit=autocommit,
904911
)
905912
except self.engine.module.IntegrityError as e:
906913
attempts += 1
@@ -1063,6 +1070,43 @@ def simple_upsert_txn_native_upsert(
10631070
)
10641071
txn.execute(sql, list(allvalues.values()))
10651072

1073+
async def simple_upsert_many(
1074+
self,
1075+
table: str,
1076+
key_names: Collection[str],
1077+
key_values: Collection[Iterable[Any]],
1078+
value_names: Collection[str],
1079+
value_values: Iterable[Iterable[Any]],
1080+
desc: str,
1081+
) -> None:
1082+
"""
1083+
Upsert, many times.
1084+
1085+
Args:
1086+
table: The table to upsert into
1087+
key_names: The key column names.
1088+
key_values: A list of each row's key column values.
1089+
value_names: The value column names
1090+
value_values: A list of each row's value column values.
1091+
Ignored if value_names is empty.
1092+
"""
1093+
1094+
# We can autocommit if we are going to use native upserts
1095+
autocommit = (
1096+
self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables
1097+
)
1098+
1099+
return await self.runInteraction(
1100+
desc,
1101+
self.simple_upsert_many_txn,
1102+
table,
1103+
key_names,
1104+
key_values,
1105+
value_names,
1106+
value_values,
1107+
db_autocommit=autocommit,
1108+
)
1109+
10661110
def simple_upsert_many_txn(
10671111
self,
10681112
txn: LoggingTransaction,
@@ -1214,7 +1258,13 @@ async def simple_select_one(
12141258
desc: description of the transaction, for logging and metrics
12151259
"""
12161260
return await self.runInteraction(
1217-
desc, self.simple_select_one_txn, table, keyvalues, retcols, allow_none
1261+
desc,
1262+
self.simple_select_one_txn,
1263+
table,
1264+
keyvalues,
1265+
retcols,
1266+
allow_none,
1267+
db_autocommit=True,
12181268
)
12191269

12201270
@overload
@@ -1265,6 +1315,7 @@ async def simple_select_one_onecol(
12651315
keyvalues,
12661316
retcol,
12671317
allow_none=allow_none,
1318+
db_autocommit=True,
12681319
)
12691320

12701321
@overload
@@ -1346,7 +1397,12 @@ async def simple_select_onecol(
13461397
Results in a list
13471398
"""
13481399
return await self.runInteraction(
1349-
desc, self.simple_select_onecol_txn, table, keyvalues, retcol
1400+
desc,
1401+
self.simple_select_onecol_txn,
1402+
table,
1403+
keyvalues,
1404+
retcol,
1405+
db_autocommit=True,
13501406
)
13511407

13521408
async def simple_select_list(
@@ -1371,7 +1427,12 @@ async def simple_select_list(
13711427
A list of dictionaries.
13721428
"""
13731429
return await self.runInteraction(
1374-
desc, self.simple_select_list_txn, table, keyvalues, retcols
1430+
desc,
1431+
self.simple_select_list_txn,
1432+
table,
1433+
keyvalues,
1434+
retcols,
1435+
db_autocommit=True,
13751436
)
13761437

13771438
@classmethod
@@ -1450,6 +1511,7 @@ async def simple_select_many_batch(
14501511
chunk,
14511512
keyvalues,
14521513
retcols,
1514+
db_autocommit=True,
14531515
)
14541516

14551517
results.extend(rows)
@@ -1548,7 +1610,12 @@ async def simple_update_one(
15481610
desc: description of the transaction, for logging and metrics
15491611
"""
15501612
await self.runInteraction(
1551-
desc, self.simple_update_one_txn, table, keyvalues, updatevalues
1613+
desc,
1614+
self.simple_update_one_txn,
1615+
table,
1616+
keyvalues,
1617+
updatevalues,
1618+
db_autocommit=True,
15521619
)
15531620

15541621
@classmethod
@@ -1607,7 +1674,9 @@ async def simple_delete_one(
16071674
keyvalues: dict of column names and values to select the row with
16081675
desc: description of the transaction, for logging and metrics
16091676
"""
1610-
await self.runInteraction(desc, self.simple_delete_one_txn, table, keyvalues)
1677+
await self.runInteraction(
1678+
desc, self.simple_delete_one_txn, table, keyvalues, db_autocommit=True,
1679+
)
16111680

16121681
@staticmethod
16131682
def simple_delete_one_txn(
@@ -1646,7 +1715,9 @@ async def simple_delete(
16461715
Returns:
16471716
The number of deleted rows.
16481717
"""
1649-
return await self.runInteraction(desc, self.simple_delete_txn, table, keyvalues)
1718+
return await self.runInteraction(
1719+
desc, self.simple_delete_txn, table, keyvalues, db_autocommit=True
1720+
)
16501721

16511722
@staticmethod
16521723
def simple_delete_txn(
@@ -1694,7 +1765,13 @@ async def simple_delete_many(
16941765
Number rows deleted
16951766
"""
16961767
return await self.runInteraction(
1697-
desc, self.simple_delete_many_txn, table, column, iterable, keyvalues
1768+
desc,
1769+
self.simple_delete_many_txn,
1770+
table,
1771+
column,
1772+
iterable,
1773+
keyvalues,
1774+
db_autocommit=True,
16981775
)
16991776

17001777
@staticmethod
@@ -1860,7 +1937,13 @@ async def simple_search_list(
18601937
"""
18611938

18621939
return await self.runInteraction(
1863-
desc, self.simple_search_list_txn, table, term, col, retcols
1940+
desc,
1941+
self.simple_search_list_txn,
1942+
table,
1943+
term,
1944+
col,
1945+
retcols,
1946+
db_autocommit=True,
18641947
)
18651948

18661949
@classmethod

synapse/storage/databases/main/keys.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,7 @@ async def store_server_verify_keys(
122122
# param, which is itself the 2-tuple (server_name, key_id).
123123
invalidations.append((server_name, key_id))
124124

125-
await self.db_pool.runInteraction(
126-
"store_server_verify_keys",
127-
self.db_pool.simple_upsert_many_txn,
125+
await self.db_pool.simple_upsert_many(
128126
table="server_signature_keys",
129127
key_names=("server_name", "key_id"),
130128
key_values=key_values,
@@ -135,6 +133,7 @@ async def store_server_verify_keys(
135133
"verify_key",
136134
),
137135
value_values=value_values,
136+
desc="store_server_verify_keys",
138137
)
139138

140139
invalidate = self._get_server_verify_key.invalidate

synapse/storage/databases/main/transactions.py

+45-31
Original file line numberDiff line numberDiff line change
@@ -208,42 +208,56 @@ async def set_destination_retry_timings(
208208
"""
209209

210210
self._destination_retry_cache.pop(destination, None)
211-
return await self.db_pool.runInteraction(
212-
"set_destination_retry_timings",
213-
self._set_destination_retry_timings,
214-
destination,
215-
failure_ts,
216-
retry_last_ts,
217-
retry_interval,
218-
)
211+
if self.database_engine.can_native_upsert:
212+
return await self.db_pool.runInteraction(
213+
"set_destination_retry_timings",
214+
self._set_destination_retry_timings_native,
215+
destination,
216+
failure_ts,
217+
retry_last_ts,
218+
retry_interval,
219+
db_autocommit=True, # Safe as its a single upsert
220+
)
221+
else:
222+
return await self.db_pool.runInteraction(
223+
"set_destination_retry_timings",
224+
self._set_destination_retry_timings_emulated,
225+
destination,
226+
failure_ts,
227+
retry_last_ts,
228+
retry_interval,
229+
)
219230

220-
def _set_destination_retry_timings(
231+
def _set_destination_retry_timings_native(
221232
self, txn, destination, failure_ts, retry_last_ts, retry_interval
222233
):
234+
assert self.database_engine.can_native_upsert
235+
236+
# Upsert retry time interval if retry_interval is zero (i.e. we're
237+
# resetting it) or greater than the existing retry interval.
238+
#
239+
# WARNING: This is executed in autocommit, so we shouldn't add any more
240+
# SQL calls in here (without being very careful).
241+
sql = """
242+
INSERT INTO destinations (
243+
destination, failure_ts, retry_last_ts, retry_interval
244+
)
245+
VALUES (?, ?, ?, ?)
246+
ON CONFLICT (destination) DO UPDATE SET
247+
failure_ts = EXCLUDED.failure_ts,
248+
retry_last_ts = EXCLUDED.retry_last_ts,
249+
retry_interval = EXCLUDED.retry_interval
250+
WHERE
251+
EXCLUDED.retry_interval = 0
252+
OR destinations.retry_interval IS NULL
253+
OR destinations.retry_interval < EXCLUDED.retry_interval
254+
"""
223255

224-
if self.database_engine.can_native_upsert:
225-
# Upsert retry time interval if retry_interval is zero (i.e. we're
226-
# resetting it) or greater than the existing retry interval.
227-
228-
sql = """
229-
INSERT INTO destinations (
230-
destination, failure_ts, retry_last_ts, retry_interval
231-
)
232-
VALUES (?, ?, ?, ?)
233-
ON CONFLICT (destination) DO UPDATE SET
234-
failure_ts = EXCLUDED.failure_ts,
235-
retry_last_ts = EXCLUDED.retry_last_ts,
236-
retry_interval = EXCLUDED.retry_interval
237-
WHERE
238-
EXCLUDED.retry_interval = 0
239-
OR destinations.retry_interval IS NULL
240-
OR destinations.retry_interval < EXCLUDED.retry_interval
241-
"""
242-
243-
txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
244-
245-
return
256+
txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
246257

258+
def _set_destination_retry_timings_emulated(
259+
self, txn, destination, failure_ts, retry_last_ts, retry_interval
260+
):
247261
self.database_engine.lock_table(txn, "destinations")
248262

249263
# We need to be careful here as the data may have changed from under us

synapse/storage/databases/main/user_directory.py

+17-28
Original file line numberDiff line numberDiff line change
@@ -480,21 +480,16 @@ async def add_users_who_share_private_room(
480480
user_id_tuples: iterable of 2-tuple of user IDs.
481481
"""
482482

483-
def _add_users_who_share_room_txn(txn):
484-
self.db_pool.simple_upsert_many_txn(
485-
txn,
486-
table="users_who_share_private_rooms",
487-
key_names=["user_id", "other_user_id", "room_id"],
488-
key_values=[
489-
(user_id, other_user_id, room_id)
490-
for user_id, other_user_id in user_id_tuples
491-
],
492-
value_names=(),
493-
value_values=None,
494-
)
495-
496-
await self.db_pool.runInteraction(
497-
"add_users_who_share_room", _add_users_who_share_room_txn
483+
await self.db_pool.simple_upsert_many(
484+
table="users_who_share_private_rooms",
485+
key_names=["user_id", "other_user_id", "room_id"],
486+
key_values=[
487+
(user_id, other_user_id, room_id)
488+
for user_id, other_user_id in user_id_tuples
489+
],
490+
value_names=(),
491+
value_values=None,
492+
desc="add_users_who_share_room",
498493
)
499494

500495
async def add_users_in_public_rooms(
@@ -508,19 +503,13 @@ async def add_users_in_public_rooms(
508503
user_ids
509504
"""
510505

511-
def _add_users_in_public_rooms_txn(txn):
512-
513-
self.db_pool.simple_upsert_many_txn(
514-
txn,
515-
table="users_in_public_rooms",
516-
key_names=["user_id", "room_id"],
517-
key_values=[(user_id, room_id) for user_id in user_ids],
518-
value_names=(),
519-
value_values=None,
520-
)
521-
522-
await self.db_pool.runInteraction(
523-
"add_users_in_public_rooms", _add_users_in_public_rooms_txn
506+
await self.db_pool.simple_upsert_many(
507+
table="users_in_public_rooms",
508+
key_names=["user_id", "room_id"],
509+
key_values=[(user_id, room_id) for user_id in user_ids],
510+
value_names=(),
511+
value_values=None,
512+
desc="add_users_in_public_rooms",
524513
)
525514

526515
async def delete_all_from_user_dir(self) -> None:

0 commit comments

Comments
 (0)