Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit fa8934b

Browse files
committed
Reduce serialization errors in MultiWriterIdGen (#8456)
We call `_update_stream_positions_table_txn` a lot, which is an UPSERT that can conflict in `REPEATABLE READ` isolation level. Instead of doing a transaction consisting of a single query we may as well run it outside of a transaction.
1 parent d9b55bd commit fa8934b

File tree

7 files changed

+109
-5
lines changed

7 files changed

+109
-5
lines changed

changelog.d/8456.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Reduce number of serialization errors of `MultiWriterIdGenerator._update_table`.

synapse/storage/database.py

+60-3
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,24 @@ def new_transaction(
403403
*args: Any,
404404
**kwargs: Any
405405
) -> R:
406+
"""Start a new database transaction with the given connection.
407+
408+
Note: The given func may be called multiple times under certain
409+
failure modes. This is normally fine when in a standard transaction,
410+
but care must be taken if the connection is in `autocommit` mode that
411+
the function will correctly handle being aborted and retried half way
412+
through its execution.
413+
414+
Args:
415+
conn
416+
desc
417+
after_callbacks
418+
exception_callbacks
419+
func
420+
*args
421+
**kwargs
422+
"""
423+
406424
start = monotonic_time()
407425
txn_id = self._TXN_ID
408426

@@ -508,7 +526,12 @@ def new_transaction(
508526
sql_txn_timer.labels(desc).observe(duration)
509527

510528
async def runInteraction(
511-
self, desc: str, func: "Callable[..., R]", *args: Any, **kwargs: Any
529+
self,
530+
desc: str,
531+
func: "Callable[..., R]",
532+
*args: Any,
533+
db_autocommit: bool = False,
534+
**kwargs: Any
512535
) -> R:
513536
"""Starts a transaction on the database and runs a given function
514537
@@ -518,6 +541,18 @@ async def runInteraction(
518541
database transaction (twisted.enterprise.adbapi.Transaction) as
519542
its first argument, followed by `args` and `kwargs`.
520543
544+
db_autocommit: Whether to run the function in "autocommit" mode,
545+
i.e. outside of a transaction. This is useful for transactions
546+
that are only a single query.
547+
548+
Currently, this is only implemented for Postgres. SQLite will still
549+
run the function inside a transaction.
550+
551+
WARNING: This means that if func fails half way through then
552+
the changes will *not* be rolled back. `func` may also get
553+
called multiple times if the transaction is retried, so must
554+
correctly handle that case.
555+
521556
args: positional args to pass to `func`
522557
kwargs: named args to pass to `func`
523558
@@ -538,6 +573,7 @@ async def runInteraction(
538573
exception_callbacks,
539574
func,
540575
*args,
576+
db_autocommit=db_autocommit,
541577
**kwargs
542578
)
543579

@@ -551,7 +587,11 @@ async def runInteraction(
551587
return cast(R, result)
552588

553589
async def runWithConnection(
554-
self, func: "Callable[..., R]", *args: Any, **kwargs: Any
590+
self,
591+
func: "Callable[..., R]",
592+
*args: Any,
593+
db_autocommit: bool = False,
594+
**kwargs: Any
555595
) -> R:
556596
"""Wraps the .runWithConnection() method on the underlying db_pool.
557597
@@ -560,6 +600,9 @@ async def runWithConnection(
560600
database connection (twisted.enterprise.adbapi.Connection) as
561601
its first argument, followed by `args` and `kwargs`.
562602
args: positional args to pass to `func`
603+
db_autocommit: Whether to run the function in "autocommit" mode,
604+
i.e. outside of a transaction. This is useful for transaction
605+
that are only a single query. Currently only affects postgres.
563606
kwargs: named args to pass to `func`
564607
565608
Returns:
@@ -575,6 +618,13 @@ async def runWithConnection(
575618
start_time = monotonic_time()
576619

577620
def inner_func(conn, *args, **kwargs):
621+
# We shouldn't be in a transaction. If we are then something
622+
# somewhere hasn't committed after doing work. (This is likely only
623+
# possible during startup, as `run*` will ensure changes are
624+
# committed/rolled back before putting the connection back in the
625+
# pool).
626+
assert not self.engine.in_transaction(conn)
627+
578628
with LoggingContext("runWithConnection", parent_context) as context:
579629
sched_duration_sec = monotonic_time() - start_time
580630
sql_scheduling_timer.observe(sched_duration_sec)
@@ -584,7 +634,14 @@ def inner_func(conn, *args, **kwargs):
584634
logger.debug("Reconnecting closed database connection")
585635
conn.reconnect()
586636

587-
return func(conn, *args, **kwargs)
637+
try:
638+
if db_autocommit:
639+
self.engine.attempt_to_set_autocommit(conn, True)
640+
641+
return func(conn, *args, **kwargs)
642+
finally:
643+
if db_autocommit:
644+
self.engine.attempt_to_set_autocommit(conn, False)
588645

589646
return await make_deferred_yieldable(
590647
self._db_pool.runWithConnection(inner_func, *args, **kwargs)

synapse/storage/engines/_base.py

+17
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,20 @@ def server_version(self) -> str:
9797
"""Gets a string giving the server version. For example: '3.22.0'
9898
"""
9999
...
100+
101+
@abc.abstractmethod
102+
def in_transaction(self, conn: Connection) -> bool:
103+
"""Whether the connection is currently in a transaction.
104+
"""
105+
...
106+
107+
@abc.abstractmethod
108+
def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
109+
"""Attempt to set the connections autocommit mode.
110+
111+
When True queries are run outside of transactions.
112+
113+
Note: This has no effect on SQLite3, so callers still need to
114+
commit/rollback the connections.
115+
"""
116+
...

synapse/storage/engines/postgres.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515

1616
import logging
1717

18-
from ._base import BaseDatabaseEngine, IncorrectDatabaseSetup
18+
from synapse.storage.engines._base import BaseDatabaseEngine, IncorrectDatabaseSetup
19+
from synapse.storage.types import Connection
1920

2021
logger = logging.getLogger(__name__)
2122

@@ -119,6 +120,7 @@ def on_new_connection(self, db_conn):
119120
cursor.execute("SET synchronous_commit TO OFF")
120121

121122
cursor.close()
123+
db_conn.commit()
122124

123125
@property
124126
def can_native_upsert(self):
@@ -171,3 +173,9 @@ def server_version(self):
171173
return "%i.%i" % (numver / 10000, numver % 10000)
172174
else:
173175
return "%i.%i.%i" % (numver / 10000, (numver % 10000) / 100, numver % 100)
176+
177+
def in_transaction(self, conn: Connection) -> bool:
178+
return conn.status != self.module.extensions.STATUS_READY # type: ignore
179+
180+
def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
181+
return conn.set_session(autocommit=autocommit) # type: ignore

synapse/storage/engines/sqlite.py

+10
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import typing
1818

1919
from synapse.storage.engines import BaseDatabaseEngine
20+
from synapse.storage.types import Connection
2021

2122
if typing.TYPE_CHECKING:
2223
import sqlite3 # noqa: F401
@@ -86,6 +87,7 @@ def on_new_connection(self, db_conn):
8687

8788
db_conn.create_function("rank", 1, _rank)
8889
db_conn.execute("PRAGMA foreign_keys = ON;")
90+
db_conn.commit()
8991

9092
def is_deadlock(self, error):
9193
return False
@@ -105,6 +107,14 @@ def server_version(self):
105107
"""
106108
return "%i.%i.%i" % self.module.sqlite_version_info
107109

110+
def in_transaction(self, conn: Connection) -> bool:
111+
return conn.in_transaction # type: ignore
112+
113+
def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
114+
# Twisted doesn't let us set attributes on the connections, so we can't
115+
# set the connection to autocommit mode.
116+
pass
117+
108118

109119
# Following functions taken from: https://github.com/coleifer/peewee
110120

synapse/storage/util/id_generators.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
from synapse.metrics.background_process_metrics import run_as_background_process
2626
from synapse.storage.database import DatabasePool, LoggingTransaction
27+
from synapse.storage.types import Cursor
2728
from synapse.storage.util.sequence import PostgresSequenceGenerator
2829

2930
logger = logging.getLogger(__name__)
@@ -552,7 +553,7 @@ def _add_persisted_position(self, new_id: int):
552553
# do.
553554
break
554555

555-
def _update_stream_positions_table_txn(self, txn):
556+
def _update_stream_positions_table_txn(self, txn: Cursor):
556557
"""Update the `stream_positions` table with newly persisted position.
557558
"""
558559

@@ -602,10 +603,13 @@ class _MultiWriterCtxManager:
602603
stream_ids = attr.ib(type=List[int], factory=list)
603604

604605
async def __aenter__(self) -> Union[int, List[int]]:
606+
# It's safe to run this in autocommit mode as fetching values from a
607+
# sequence ignores transaction semantics anyway.
605608
self.stream_ids = await self.id_gen._db.runInteraction(
606609
"_load_next_mult_id",
607610
self.id_gen._load_next_mult_id_txn,
608611
self.multiple_ids or 1,
612+
db_autocommit=True,
609613
)
610614

611615
# Assert the fetched ID is actually greater than any ID we've already
@@ -636,10 +640,16 @@ async def __aexit__(self, exc_type, exc, tb):
636640
#
637641
# We only do this on the success path so that the persisted current
638642
# position points to a persisted row with the correct instance name.
643+
#
644+
# We do this in autocommit mode as a) the upsert works correctly outside
645+
# transactions and b) reduces the amount of time the rows are locked
646+
# for. If we don't do this then we'll often hit serialization errors due
647+
# to the fact we default to REPEATABLE READ isolation levels.
639648
if self.id_gen._writers:
640649
await self.id_gen._db.runInteraction(
641650
"MultiWriterIdGenerator._update_table",
642651
self.id_gen._update_stream_positions_table_txn,
652+
db_autocommit=True,
643653
)
644654

645655
return False

tests/storage/test_base.py

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def runWithConnection(func, *args, **kwargs):
5656
engine = create_engine(sqlite_config)
5757
fake_engine = Mock(wraps=engine)
5858
fake_engine.can_native_upsert = False
59+
fake_engine.in_transaction.return_value = False
5960

6061
db = DatabasePool(Mock(), Mock(config=sqlite_config), fake_engine)
6162
db._db_pool = self.db_pool

0 commit comments

Comments
 (0)