Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Reduce serialization errors in MultiWriterIdGen #8456

Merged
merged 8 commits into from
Oct 7, 2020
Merged
18 changes: 11 additions & 7 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,20 +461,23 @@ def new_transaction(
exception_callbacks: List[_CallbackListEntry],
func: "Callable[..., R]",
*args: Any,
db_retry: bool = True,
**kwargs: Any
) -> R:
"""Start a new database transaction with the given connection.

Note: The given func may be called multiple times under certain
failure modes. This is normally fine when in a standard transaction,
but care must be taken if the connection is in `autocommit` mode that
the function will correctly handle being aborted and retried half way
through its execution.

Args:
conn
desc
after_callbacks
exception_callbacks
func
*args
db_retry: Whether to retry the transaction by calling `func` again.
This should be disabled if connection is in autocommit mode.
**kwargs
"""

Expand Down Expand Up @@ -508,7 +511,7 @@ def new_transaction(
transaction_logger.warning(
"[TXN OPERROR] {%s} %s %d/%d", name, e, i, N,
)
if db_retry and i < N:
if i < N:
i += 1
try:
conn.rollback()
Expand All @@ -521,7 +524,7 @@ def new_transaction(
transaction_logger.warning(
"[TXN DEADLOCK] {%s} %d/%d", name, i, N
)
if db_retry and i < N:
if i < N:
i += 1
try:
conn.rollback()
Expand Down Expand Up @@ -600,7 +603,9 @@ async def runInteraction(
i.e. outside of a transaction. This is useful for transaction
that are only a single query. Currently only affects postgres.
WARNING: This means that if func fails half way through then
the changes will *not* be rolled back.
the changes will *not* be rolled back. `func` may also get
called multiple times if the transaction is retried, so must
correctly handle that case.

args: positional args to pass to `func`
kwargs: named args to pass to `func`
Expand All @@ -623,7 +628,6 @@ async def runInteraction(
func,
*args,
db_autocommit=db_autocommit,
db_retry=not db_autocommit, # Don't retry in auto commit mode.
**kwargs
)

Expand Down
1 change: 0 additions & 1 deletion synapse/storage/engines/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ def on_new_connection(self, db_conn):
db_conn.set_isolation_level(
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
)
db_conn.set_session(self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ)

# Set the bytea output to escape, vs the default of hex
cursor = db_conn.cursor()
Expand Down
3 changes: 2 additions & 1 deletion synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.types import Cursor
from synapse.storage.util.sequence import PostgresSequenceGenerator

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -548,7 +549,7 @@ def _add_persisted_position(self, new_id: int):
# do.
break

def _update_stream_positions_table_txn(self, txn: LoggingTransaction):
def _update_stream_positions_table_txn(self, txn: Cursor):
"""Update the `stream_positions` table with newly persisted position.
"""

Expand Down