Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 0021f74

Browse files
clokephughns
authored andcommitted
Stop writing to the event_txn_id table (#16175)
1 parent 8149880 commit 0021f74

File tree

6 files changed

+26
-95
lines changed

6 files changed

+26
-95
lines changed

changelog.d/16175.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Stop using the `event_txn_id` table.

synapse/handlers/message.py

-13
Original file line numberDiff line numberDiff line change
@@ -908,19 +908,6 @@ async def get_event_id_from_transaction(
908908
if existing_event_id:
909909
return existing_event_id
910910

911-
# Some requsters don't have device IDs (appservice, guests, and access
912-
# tokens minted with the admin API), fallback to checking the access token
913-
# ID, which should be close enough.
914-
if requester.access_token_id:
915-
existing_event_id = (
916-
await self.store.get_event_id_from_transaction_id_and_token_id(
917-
room_id,
918-
requester.user.to_string(),
919-
requester.access_token_id,
920-
txn_id,
921-
)
922-
)
923-
924911
return existing_event_id
925912

926913
async def get_event_from_transaction(

synapse/storage/databases/main/events.py

+1-34
Original file line numberDiff line numberDiff line change
@@ -978,26 +978,12 @@ def _persist_transaction_ids_txn(
978978
"""Persist the mapping from transaction IDs to event IDs (if defined)."""
979979

980980
inserted_ts = self._clock.time_msec()
981-
to_insert_token_id: List[Tuple[str, str, str, int, str, int]] = []
982981
to_insert_device_id: List[Tuple[str, str, str, str, str, int]] = []
983982
for event, _ in events_and_contexts:
984983
txn_id = getattr(event.internal_metadata, "txn_id", None)
985-
token_id = getattr(event.internal_metadata, "token_id", None)
986984
device_id = getattr(event.internal_metadata, "device_id", None)
987985

988986
if txn_id is not None:
989-
if token_id is not None:
990-
to_insert_token_id.append(
991-
(
992-
event.event_id,
993-
event.room_id,
994-
event.sender,
995-
token_id,
996-
txn_id,
997-
inserted_ts,
998-
)
999-
)
1000-
1001987
if device_id is not None:
1002988
to_insert_device_id.append(
1003989
(
@@ -1010,26 +996,7 @@ def _persist_transaction_ids_txn(
1010996
)
1011997
)
1012998

1013-
# Synapse usually relies on the device_id to scope transactions for events,
1014-
# except for users without device IDs (appservice, guests, and access
1015-
# tokens minted with the admin API) which use the access token ID instead.
1016-
#
1017-
# TODO https://github.com/matrix-org/synapse/issues/16042
1018-
if to_insert_token_id:
1019-
self.db_pool.simple_insert_many_txn(
1020-
txn,
1021-
table="event_txn_id",
1022-
keys=(
1023-
"event_id",
1024-
"room_id",
1025-
"user_id",
1026-
"token_id",
1027-
"txn_id",
1028-
"inserted_ts",
1029-
),
1030-
values=to_insert_token_id,
1031-
)
1032-
999+
# Synapse relies on the device_id to scope transactions for events..
10331000
if to_insert_device_id:
10341001
self.db_pool.simple_insert_many_txn(
10351002
txn,

synapse/storage/databases/main/events_worker.py

+14-27
Original file line numberDiff line numberDiff line change
@@ -2022,25 +2022,6 @@ def get_next_event_to_expire_txn(
20222022
desc="get_next_event_to_expire", func=get_next_event_to_expire_txn
20232023
)
20242024

2025-
async def get_event_id_from_transaction_id_and_token_id(
2026-
self, room_id: str, user_id: str, token_id: int, txn_id: str
2027-
) -> Optional[str]:
2028-
"""Look up if we have already persisted an event for the transaction ID,
2029-
returning the event ID if so.
2030-
"""
2031-
return await self.db_pool.simple_select_one_onecol(
2032-
table="event_txn_id",
2033-
keyvalues={
2034-
"room_id": room_id,
2035-
"user_id": user_id,
2036-
"token_id": token_id,
2037-
"txn_id": txn_id,
2038-
},
2039-
retcol="event_id",
2040-
allow_none=True,
2041-
desc="get_event_id_from_transaction_id_and_token_id",
2042-
)
2043-
20442025
async def get_event_id_from_transaction_id_and_device_id(
20452026
self, room_id: str, user_id: str, device_id: str, txn_id: str
20462027
) -> Optional[str]:
@@ -2072,29 +2053,35 @@ async def get_already_persisted_events(
20722053
"""
20732054

20742055
mapping = {}
2075-
txn_id_to_event: Dict[Tuple[str, int, str], str] = {}
2056+
txn_id_to_event: Dict[Tuple[str, str, str, str], str] = {}
20762057

20772058
for event in events:
2078-
token_id = getattr(event.internal_metadata, "token_id", None)
2059+
device_id = getattr(event.internal_metadata, "device_id", None)
20792060
txn_id = getattr(event.internal_metadata, "txn_id", None)
20802061

2081-
if token_id and txn_id:
2062+
if device_id and txn_id:
20822063
# Check if this is a duplicate of an event in the given events.
2083-
existing = txn_id_to_event.get((event.room_id, token_id, txn_id))
2064+
existing = txn_id_to_event.get(
2065+
(event.room_id, event.sender, device_id, txn_id)
2066+
)
20842067
if existing:
20852068
mapping[event.event_id] = existing
20862069
continue
20872070

20882071
# Check if this is a duplicate of an event we've already
20892072
# persisted.
2090-
existing = await self.get_event_id_from_transaction_id_and_token_id(
2091-
event.room_id, event.sender, token_id, txn_id
2073+
existing = await self.get_event_id_from_transaction_id_and_device_id(
2074+
event.room_id, event.sender, device_id, txn_id
20922075
)
20932076
if existing:
20942077
mapping[event.event_id] = existing
2095-
txn_id_to_event[(event.room_id, token_id, txn_id)] = existing
2078+
txn_id_to_event[
2079+
(event.room_id, event.sender, device_id, txn_id)
2080+
] = existing
20962081
else:
2097-
txn_id_to_event[(event.room_id, token_id, txn_id)] = event.event_id
2082+
txn_id_to_event[
2083+
(event.room_id, event.sender, device_id, txn_id)
2084+
] = event.event_id
20982085

20992086
return mapping
21002087

synapse/storage/schema/__init__.py

+6-10
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
SCHEMA_VERSION = 80 # remember to update the list below when updating
15+
SCHEMA_VERSION = 81 # remember to update the list below when updating
1616
"""Represents the expectations made by the codebase about the database schema
1717
1818
This should be incremented whenever the codebase changes its requirements on the
@@ -114,19 +114,15 @@
114114
Changes in SCHEMA_VERSION = 80
115115
- The event_txn_id_device_id is always written to for new events.
116116
- Add tables for the task scheduler.
117+
118+
Changes in SCHEMA_VERSION = 81
119+
- The event_txn_id is no longer written to for new events.
117120
"""
118121

119122

120123
SCHEMA_COMPAT_VERSION = (
121-
# Queries against `event_stream_ordering` columns in membership tables must
122-
# be disambiguated.
123-
#
124-
# The threads_id column must written to with non-null values for the
125-
# event_push_actions, event_push_actions_staging, and event_push_summary tables.
126-
#
127-
# insertions to the column `full_user_id` of tables profiles and user_filters can no
128-
# longer be null
129-
76
124+
# The `event_txn_id_device_id` must be written to for new events.
125+
80
130126
)
131127
"""Limit on how far the synapse codebase can be rolled back without breaking db compat
132128

tests/handlers/test_message.py

+4-11
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,11 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
4646
self._persist_event_storage_controller = persistence
4747

4848
self.user_id = self.register_user("tester", "foobar")
49-
self.access_token = self.login("tester", "foobar")
50-
self.room_id = self.helper.create_room_as(self.user_id, tok=self.access_token)
51-
52-
info = self.get_success(
53-
self.hs.get_datastores().main.get_user_by_access_token(
54-
self.access_token,
55-
)
56-
)
57-
assert info is not None
58-
self.token_id = info.token_id
49+
device_id = "dev-1"
50+
access_token = self.login("tester", "foobar", device_id=device_id)
51+
self.room_id = self.helper.create_room_as(self.user_id, tok=access_token)
5952

60-
self.requester = create_requester(self.user_id, access_token_id=self.token_id)
53+
self.requester = create_requester(self.user_id, device_id=device_id)
6154

6255
def _create_and_persist_member_event(self) -> Tuple[EventBase, EventContext]:
6356
# Create a member event we can use as an auth_event

0 commit comments

Comments
 (0)