Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 4213fe1

Browse files
committed
Merge commit '7586fdf1e' into anoa/dinsic_release_1_21_x
* commit '7586fdf1e': Bump canonicaljson to version 1.4.0 (#8262) Run database updates in a transaction (#8265) Add tests for `last_successful_stream_ordering` (#8258)
2 parents 15212f5 + 7586fdf commit 4213fe1

File tree

6 files changed

+102
-6
lines changed

6 files changed

+102
-6
lines changed

changelog.d/8258.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Track the `stream_ordering` of the last successfully-sent event to every destination, so we can use this information to 'catch up' a remote server after an outage.

changelog.d/8262.bugfix

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Upgrade canonicaljson to version 1.4.0 to fix an unicode encoding issue.

changelog.d/8265.bugfix

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix logstanding bug which could lead to incomplete database upgrades on SQLite.

synapse/python_dependencies.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
"jsonschema>=2.5.1",
4444
"frozendict>=1",
4545
"unpaddedbase64>=1.1.0",
46-
"canonicaljson>=1.3.0",
46+
"canonicaljson>=1.4.0",
4747
# we use the type definitions added in signedjson 1.1.
4848
"signedjson>=1.1.0",
4949
"pynacl>=1.2.1",

synapse/storage/prepare_database.py

+22-5
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@
1919
import os
2020
import re
2121
from collections import Counter
22-
from typing import TextIO
22+
from typing import Optional, TextIO
2323

2424
import attr
2525

26+
from synapse.config.homeserver import HomeServerConfig
27+
from synapse.storage.engines import BaseDatabaseEngine
2628
from synapse.storage.engines.postgres import PostgresEngine
27-
from synapse.storage.types import Cursor
29+
from synapse.storage.types import Connection, Cursor
30+
from synapse.types import Collection
2831

2932
logger = logging.getLogger(__name__)
3033

@@ -47,7 +50,12 @@ class UpgradeDatabaseException(PrepareDatabaseException):
4750
pass
4851

4952

50-
def prepare_database(db_conn, database_engine, config, databases=["main", "state"]):
53+
def prepare_database(
54+
db_conn: Connection,
55+
database_engine: BaseDatabaseEngine,
56+
config: Optional[HomeServerConfig],
57+
databases: Collection[str] = ["main", "state"],
58+
):
5159
"""Prepares a physical database for usage. Will either create all necessary tables
5260
or upgrade from an older schema version.
5361
@@ -57,15 +65,24 @@ def prepare_database(db_conn, database_engine, config, databases=["main", "state
5765
Args:
5866
db_conn:
5967
database_engine:
60-
config (synapse.config.homeserver.HomeServerConfig|None):
68+
config :
6169
application config, or None if we are connecting to an existing
6270
database which we expect to be configured already
63-
databases (list[str]): The name of the databases that will be used
71+
databases: The name of the databases that will be used
6472
with this physical database. Defaults to all databases.
6573
"""
6674

6775
try:
6876
cur = db_conn.cursor()
77+
78+
# sqlite does not automatically start transactions for DDL / SELECT statements,
79+
# so we start one before running anything. This ensures that any upgrades
80+
# are either applied completely, or not at all.
81+
#
82+
# (psycopg2 automatically starts a transaction as soon as we run any statements
83+
# at all, so this is redundant but harmless there.)
84+
cur.execute("BEGIN TRANSACTION")
85+
6986
version_info = _get_or_create_schema_state(cur, database_engine)
7087

7188
if version_info:

tests/federation/test_federation_catch_up.py

+76
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,24 @@ def prepare(self, reactor, clock, hs):
2828
return_value=make_awaitable(["test", "host2"])
2929
)
3030

31+
# whenever send_transaction is called, record the pdu data
32+
self.pdus = []
33+
self.failed_pdus = []
34+
self.is_online = True
35+
self.hs.get_federation_transport_client().send_transaction.side_effect = (
36+
self.record_transaction
37+
)
38+
39+
async def record_transaction(self, txn, json_cb):
40+
if self.is_online:
41+
data = json_cb()
42+
self.pdus.extend(data["pdus"])
43+
return {}
44+
else:
45+
data = json_cb()
46+
self.failed_pdus.extend(data["pdus"])
47+
raise IOError("Failed to connect because this is a test!")
48+
3149
def get_destination_room(self, room: str, destination: str = "host2") -> dict:
3250
"""
3351
Gets the destination_rooms entry for a (destination, room_id) pair.
@@ -80,3 +98,61 @@ def test_catch_up_destination_rooms_tracking(self):
8098
self.assertEqual(row_1["event_id"], event_id_1)
8199
self.assertEqual(row_2["event_id"], event_id_2)
82100
self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1)
101+
102+
@override_config({"send_federation": True})
103+
def test_catch_up_last_successful_stream_ordering_tracking(self):
104+
"""
105+
Tests that we populate the `destination_rooms` table as needed.
106+
"""
107+
self.register_user("u1", "you the one")
108+
u1_token = self.login("u1", "you the one")
109+
room = self.helper.create_room_as("u1", tok=u1_token)
110+
111+
# take the remote offline
112+
self.is_online = False
113+
114+
self.get_success(
115+
event_injection.inject_member_event(self.hs, room, "@user:host2", "join")
116+
)
117+
118+
self.helper.send(room, "wombats!", tok=u1_token)
119+
self.pump()
120+
121+
lsso_1 = self.get_success(
122+
self.hs.get_datastore().get_destination_last_successful_stream_ordering(
123+
"host2"
124+
)
125+
)
126+
127+
self.assertIsNone(
128+
lsso_1,
129+
"There should be no last successful stream ordering for an always-offline destination",
130+
)
131+
132+
# bring the remote online
133+
self.is_online = True
134+
135+
event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"]
136+
137+
lsso_2 = self.get_success(
138+
self.hs.get_datastore().get_destination_last_successful_stream_ordering(
139+
"host2"
140+
)
141+
)
142+
row_2 = self.get_destination_room(room)
143+
144+
self.assertEqual(
145+
self.pdus[0]["content"]["body"],
146+
"rabbits!",
147+
"Test fault: didn't receive the right PDU",
148+
)
149+
self.assertEqual(
150+
row_2["event_id"],
151+
event_id_2,
152+
"Test fault: destination_rooms not updated correctly",
153+
)
154+
self.assertEqual(
155+
lsso_2,
156+
row_2["stream_ordering"],
157+
"Send succeeded but not marked as last_successful_stream_ordering",
158+
)

0 commit comments

Comments
 (0)