Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit ae5b2a7

Browse files
authored
Reduce serialization errors in MultiWriterIdGen (#8456)
We call `_update_stream_positions_table_txn` a lot, which is an UPSERT that can conflict in `REPEATABLE READ` isolation level. Instead of doing a transaction consisting of a single query we may as well run it outside of a transaction.
1 parent 52a50e8 commit ae5b2a7

File tree

7 files changed

+112
-8
lines changed

7 files changed

+112
-8
lines changed

changelog.d/8456.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Reduce number of serialization errors of `MultiWriterIdGenerator._update_table`.

synapse/storage/database.py

+63-6
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,24 @@ def new_transaction(
463463
*args: Any,
464464
**kwargs: Any
465465
) -> R:
466+
"""Start a new database transaction with the given connection.
467+
468+
Note: The given func may be called multiple times under certain
469+
failure modes. This is normally fine when in a standard transaction,
470+
but care must be taken if the connection is in `autocommit` mode that
471+
the function will correctly handle being aborted and retried half way
472+
through its execution.
473+
474+
Args:
475+
conn
476+
desc
477+
after_callbacks
478+
exception_callbacks
479+
func
480+
*args
481+
**kwargs
482+
"""
483+
466484
start = monotonic_time()
467485
txn_id = self._TXN_ID
468486

@@ -566,7 +584,12 @@ def new_transaction(
566584
sql_txn_timer.labels(desc).observe(duration)
567585

568586
async def runInteraction(
569-
self, desc: str, func: "Callable[..., R]", *args: Any, **kwargs: Any
587+
self,
588+
desc: str,
589+
func: "Callable[..., R]",
590+
*args: Any,
591+
db_autocommit: bool = False,
592+
**kwargs: Any
570593
) -> R:
571594
"""Starts a transaction on the database and runs a given function
572595
@@ -576,6 +599,18 @@ async def runInteraction(
576599
database transaction (twisted.enterprise.adbapi.Transaction) as
577600
its first argument, followed by `args` and `kwargs`.
578601
602+
db_autocommit: Whether to run the function in "autocommit" mode,
603+
i.e. outside of a transaction. This is useful for transactions
604+
that are only a single query.
605+
606+
Currently, this is only implemented for Postgres. SQLite will still
607+
run the function inside a transaction.
608+
609+
WARNING: This means that if func fails half way through then
610+
the changes will *not* be rolled back. `func` may also get
611+
called multiple times if the transaction is retried, so must
612+
correctly handle that case.
613+
579614
args: positional args to pass to `func`
580615
kwargs: named args to pass to `func`
581616
@@ -596,6 +631,7 @@ async def runInteraction(
596631
exception_callbacks,
597632
func,
598633
*args,
634+
db_autocommit=db_autocommit,
599635
**kwargs
600636
)
601637

@@ -609,7 +645,11 @@ async def runInteraction(
609645
return cast(R, result)
610646

611647
async def runWithConnection(
612-
self, func: "Callable[..., R]", *args: Any, **kwargs: Any
648+
self,
649+
func: "Callable[..., R]",
650+
*args: Any,
651+
db_autocommit: bool = False,
652+
**kwargs: Any
613653
) -> R:
614654
"""Wraps the .runWithConnection() method on the underlying db_pool.
615655
@@ -618,6 +658,9 @@ async def runWithConnection(
618658
database connection (twisted.enterprise.adbapi.Connection) as
619659
its first argument, followed by `args` and `kwargs`.
620660
args: positional args to pass to `func`
661+
db_autocommit: Whether to run the function in "autocommit" mode,
662+
i.e. outside of a transaction. This is useful for transaction
663+
that are only a single query. Currently only affects postgres.
621664
kwargs: named args to pass to `func`
622665
623666
Returns:
@@ -633,6 +676,13 @@ async def runWithConnection(
633676
start_time = monotonic_time()
634677

635678
def inner_func(conn, *args, **kwargs):
679+
# We shouldn't be in a transaction. If we are then something
680+
# somewhere hasn't committed after doing work. (This is likely only
681+
# possible during startup, as `run*` will ensure changes are
682+
# committed/rolled back before putting the connection back in the
683+
# pool).
684+
assert not self.engine.in_transaction(conn)
685+
636686
with LoggingContext("runWithConnection", parent_context) as context:
637687
sched_duration_sec = monotonic_time() - start_time
638688
sql_scheduling_timer.observe(sched_duration_sec)
@@ -642,10 +692,17 @@ def inner_func(conn, *args, **kwargs):
642692
logger.debug("Reconnecting closed database connection")
643693
conn.reconnect()
644694

645-
db_conn = LoggingDatabaseConnection(
646-
conn, self.engine, "runWithConnection"
647-
)
648-
return func(db_conn, *args, **kwargs)
695+
try:
696+
if db_autocommit:
697+
self.engine.attempt_to_set_autocommit(conn, True)
698+
699+
db_conn = LoggingDatabaseConnection(
700+
conn, self.engine, "runWithConnection"
701+
)
702+
return func(db_conn, *args, **kwargs)
703+
finally:
704+
if db_autocommit:
705+
self.engine.attempt_to_set_autocommit(conn, False)
649706

650707
return await make_deferred_yieldable(
651708
self._db_pool.runWithConnection(inner_func, *args, **kwargs)

synapse/storage/engines/_base.py

+17
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,20 @@ def server_version(self) -> str:
9797
"""Gets a string giving the server version. For example: '3.22.0'
9898
"""
9999
...
100+
101+
@abc.abstractmethod
102+
def in_transaction(self, conn: Connection) -> bool:
103+
"""Whether the connection is currently in a transaction.
104+
"""
105+
...
106+
107+
@abc.abstractmethod
108+
def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
109+
"""Attempt to set the connections autocommit mode.
110+
111+
When True queries are run outside of transactions.
112+
113+
Note: This has no effect on SQLite3, so callers still need to
114+
commit/rollback the connections.
115+
"""
116+
...

synapse/storage/engines/postgres.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515

1616
import logging
1717

18-
from ._base import BaseDatabaseEngine, IncorrectDatabaseSetup
18+
from synapse.storage.engines._base import BaseDatabaseEngine, IncorrectDatabaseSetup
19+
from synapse.storage.types import Connection
1920

2021
logger = logging.getLogger(__name__)
2122

@@ -119,6 +120,7 @@ def on_new_connection(self, db_conn):
119120
cursor.execute("SET synchronous_commit TO OFF")
120121

121122
cursor.close()
123+
db_conn.commit()
122124

123125
@property
124126
def can_native_upsert(self):
@@ -171,3 +173,9 @@ def server_version(self):
171173
return "%i.%i" % (numver / 10000, numver % 10000)
172174
else:
173175
return "%i.%i.%i" % (numver / 10000, (numver % 10000) / 100, numver % 100)
176+
177+
def in_transaction(self, conn: Connection) -> bool:
178+
return conn.status != self.module.extensions.STATUS_READY # type: ignore
179+
180+
def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
181+
return conn.set_session(autocommit=autocommit) # type: ignore

synapse/storage/engines/sqlite.py

+10
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import typing
1818

1919
from synapse.storage.engines import BaseDatabaseEngine
20+
from synapse.storage.types import Connection
2021

2122
if typing.TYPE_CHECKING:
2223
import sqlite3 # noqa: F401
@@ -86,6 +87,7 @@ def on_new_connection(self, db_conn):
8687

8788
db_conn.create_function("rank", 1, _rank)
8889
db_conn.execute("PRAGMA foreign_keys = ON;")
90+
db_conn.commit()
8991

9092
def is_deadlock(self, error):
9193
return False
@@ -105,6 +107,14 @@ def server_version(self):
105107
"""
106108
return "%i.%i.%i" % self.module.sqlite_version_info
107109

110+
def in_transaction(self, conn: Connection) -> bool:
111+
return conn.in_transaction # type: ignore
112+
113+
def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
114+
# Twisted doesn't let us set attributes on the connections, so we can't
115+
# set the connection to autocommit mode.
116+
pass
117+
108118

109119
# Following functions taken from: https://github.com/coleifer/peewee
110120

synapse/storage/util/id_generators.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
from synapse.metrics.background_process_metrics import run_as_background_process
2626
from synapse.storage.database import DatabasePool, LoggingTransaction
27+
from synapse.storage.types import Cursor
2728
from synapse.storage.util.sequence import PostgresSequenceGenerator
2829

2930
logger = logging.getLogger(__name__)
@@ -548,7 +549,7 @@ def _add_persisted_position(self, new_id: int):
548549
# do.
549550
break
550551

551-
def _update_stream_positions_table_txn(self, txn):
552+
def _update_stream_positions_table_txn(self, txn: Cursor):
552553
"""Update the `stream_positions` table with newly persisted position.
553554
"""
554555

@@ -598,10 +599,13 @@ class _MultiWriterCtxManager:
598599
stream_ids = attr.ib(type=List[int], factory=list)
599600

600601
async def __aenter__(self) -> Union[int, List[int]]:
602+
# It's safe to run this in autocommit mode as fetching values from a
603+
# sequence ignores transaction semantics anyway.
601604
self.stream_ids = await self.id_gen._db.runInteraction(
602605
"_load_next_mult_id",
603606
self.id_gen._load_next_mult_id_txn,
604607
self.multiple_ids or 1,
608+
db_autocommit=True,
605609
)
606610

607611
# Assert the fetched ID is actually greater than any ID we've already
@@ -632,10 +636,16 @@ async def __aexit__(self, exc_type, exc, tb):
632636
#
633637
# We only do this on the success path so that the persisted current
634638
# position points to a persisted row with the correct instance name.
639+
#
640+
# We do this in autocommit mode as a) the upsert works correctly outside
641+
# transactions and b) reduces the amount of time the rows are locked
642+
# for. If we don't do this then we'll often hit serialization errors due
643+
# to the fact we default to REPEATABLE READ isolation levels.
635644
if self.id_gen._writers:
636645
await self.id_gen._db.runInteraction(
637646
"MultiWriterIdGenerator._update_table",
638647
self.id_gen._update_stream_positions_table_txn,
648+
db_autocommit=True,
639649
)
640650

641651
return False

tests/storage/test_base.py

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def runWithConnection(func, *args, **kwargs):
5656
engine = create_engine(sqlite_config)
5757
fake_engine = Mock(wraps=engine)
5858
fake_engine.can_native_upsert = False
59+
fake_engine.in_transaction.return_value = False
5960

6061
db = DatabasePool(Mock(), Mock(config=sqlite_config), fake_engine)
6162
db._db_pool = self.db_pool

0 commit comments

Comments
 (0)