From 1d81d4efe7a16f0f9f95209ba745eab1c56cbb43 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Sat, 21 May 2022 16:42:51 +0100 Subject: [PATCH 1/6] Pull current hosts out from current_state table --- synapse/federation/sender/__init__.py | 6 +++- synapse/state/__init__.py | 4 --- synapse/storage/_base.py | 1 + synapse/storage/controllers/state.py | 8 +++++ synapse/storage/databases/main/roommember.py | 37 ++++++++++++++++++++ tests/federation/test_federation_sender.py | 14 ++++---- 6 files changed, 58 insertions(+), 12 deletions(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index dbe303ed9be8..99a794c04288 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -245,6 +245,8 @@ def __init__(self, hs: "HomeServer"): self.store = hs.get_datastores().main self.state = hs.get_state_handler() + self._storage_controllers = hs.get_storage_controllers() + self.clock = hs.get_clock() self.is_mine_id = hs.is_mine_id @@ -602,7 +604,9 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None: room_id = receipt.room_id # Work out which remote servers should be poked and poke them. - domains_set = await self.state.get_current_hosts_in_room(room_id) + domains_set = await self._storage_controllers.state.get_current_hosts_in_room( + room_id + ) domains = [ d for d in domains_set diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index ab68e2b6a403..da25f20ae573 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -172,10 +172,6 @@ async def get_current_users_in_room( entry = await self.resolve_state_groups_for_events(room_id, latest_event_ids) return await self.store.get_joined_users_from_state(room_id, entry) - async def get_current_hosts_in_room(self, room_id: str) -> FrozenSet[str]: - event_ids = await self.store.get_latest_event_ids_in_room(room_id) - return await self.get_hosts_in_room_at_events(room_id, event_ids) - async def get_hosts_in_room_at_events( self, room_id: str, event_ids: Collection[str] ) -> FrozenSet[str]: diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 57bd74700e0c..abfc56b0616c 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -71,6 +71,7 @@ def _invalidate_state_caches( self._attempt_to_invalidate_cache("is_host_joined", (room_id, host)) if members_changed: self._attempt_to_invalidate_cache("get_users_in_room", (room_id,)) + self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,)) self._attempt_to_invalidate_cache( "get_users_in_room_with_profiles", (room_id,) ) diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index 63a78ebc87d7..3b4cdb67eb27 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -23,6 +23,7 @@ List, Mapping, Optional, + Set, Tuple, ) @@ -482,3 +483,10 @@ async def get_current_state_event( room_id, StateFilter.from_types((key,)) ) return state_map.get(key) + + async def get_current_hosts_in_room(self, room_id: str) -> Set[str]: + """Get current hosts in room based on current state.""" + + await self._partial_state_room_tracker.await_full_state(room_id) + + return await self.stores.main.get_current_hosts_in_room(room_id) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index e222b7bd1f88..31bc8c56011a 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -893,6 +893,43 @@ async def _check_host_room_membership( return True + @cached(iterable=True, max_entries=10000) + async def get_current_hosts_in_room(self, room_id: str) -> Set[str]: + """Get current hosts in room based on current state.""" + + # First we check if we already have `get_users_in_room` in the cache, as + # we can just calculate result from that + users = self.get_users_in_room.cache.get_immediate( + (room_id,), None, update_metrics=False + ) + if users is not None: + return {get_domain_from_id(u) for u in users} + + if isinstance(self.database_engine, Sqlite3Engine): + # If we're using SQLite then let's just always use + # `get_users_in_room` rather than funky SQL. + users = await self.get_users_in_room(room_id) + return {get_domain_from_id(u) for u in users} + + # For PostgreSQL we can use a regex to pull out the domains from the + # joined users in `current_state_events` via regex. + + def get_current_hosts_in_room_txn(txn: LoggingTransaction) -> Set[str]: + sql = """ + SELECT DISTINCT substring(state_key FROM '@[^:]*:(.*)$') + FROM current_state_events + WHERE + type = 'm.room.member' + AND membership = 'join' + AND room_id = ? + """ + txn.execute(sql, (room_id,)) + return {d for d, in txn} + + return await self.db_pool.runInteraction( + "get_current_hosts_in_room", get_current_hosts_in_room_txn + ) + async def get_joined_hosts( self, room_id: str, state_entry: "_StateCacheEntry" ) -> FrozenSet[str]: diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py index b5be727fe48b..8208736ab4b4 100644 --- a/tests/federation/test_federation_sender.py +++ b/tests/federation/test_federation_sender.py @@ -30,16 +30,16 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): def make_homeserver(self, reactor, clock): - mock_state_handler = Mock(spec=["get_current_hosts_in_room"]) - # Ensure a new Awaitable is created for each call. - mock_state_handler.get_current_hosts_in_room.return_value = make_awaitable( - ["test", "host2"] - ) - return self.setup_test_homeserver( - state_handler=mock_state_handler, + hs = self.setup_test_homeserver( federation_transport_client=Mock(spec=["send_transaction"]), ) + hs.get_datastores().main.get_current_hosts_in_room = Mock( + return_value=make_awaitable(["test", "host2"]) + ) + + return hs + @override_config({"send_federation": True}) def test_send_receipts(self): mock_send_transaction = ( From f10144bca43d2f9f32f1b7b66ddfd288e949bc97 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Jun 2022 11:25:56 +0100 Subject: [PATCH 2/6] Use get_current_hosts_in_room in typing handler --- synapse/handlers/typing.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 0aeab86bbbd2..d104ea07fedf 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -59,6 +59,7 @@ class FollowerTypingHandler: def __init__(self, hs: "HomeServer"): self.store = hs.get_datastores().main + self._storage_controllers = hs.get_storage_controllers() self.server_name = hs.config.server.server_name self.clock = hs.get_clock() self.is_mine_id = hs.is_mine_id @@ -131,7 +132,6 @@ async def _push_remote(self, member: RoomMember, typing: bool) -> None: return try: - users = await self.store.get_users_in_room(member.room_id) self._member_last_federation_poke[member] = self.clock.time_msec() now = self.clock.time_msec() @@ -139,7 +139,10 @@ async def _push_remote(self, member: RoomMember, typing: bool) -> None: now=now, obj=member, then=now + FEDERATION_PING_INTERVAL ) - for domain in {get_domain_from_id(u) for u in users}: + hosts = await self._storage_controllers.state.get_current_hosts_in_room( + member.room_id + ) + for domain in hosts: if domain != self.server_name: logger.debug("sending typing update to %s", domain) self.federation.build_and_send_edu( From 84eb1c8e8e3654829314fd62d34701aa20a71f9a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Jun 2022 11:32:24 +0100 Subject: [PATCH 3/6] Newsfile --- changelog.d/12964.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/12964.misc diff --git a/changelog.d/12964.misc b/changelog.d/12964.misc new file mode 100644 index 000000000000..8f970492b346 --- /dev/null +++ b/changelog.d/12964.misc @@ -0,0 +1 @@ +Reduce state pulled from DB due to sending typing and receipts over federation. From 0438d1764ae1820587cb92a3a947ac2f57ad2283 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Jun 2022 13:58:44 +0100 Subject: [PATCH 4/6] Fix tests --- tests/federation/test_federation_sender.py | 2 +- tests/handlers/test_typing.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py index 8208736ab4b4..a75ba249136d 100644 --- a/tests/federation/test_federation_sender.py +++ b/tests/federation/test_federation_sender.py @@ -34,7 +34,7 @@ def make_homeserver(self, reactor, clock): federation_transport_client=Mock(spec=["send_transaction"]), ) - hs.get_datastores().main.get_current_hosts_in_room = Mock( + hs.get_storage_controllers().state.get_current_hosts_in_room = Mock( return_value=make_awaitable(["test", "host2"]) ) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 14a0ee4922ed..7af1333126ac 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -129,10 +129,12 @@ async def check_host_in_room(room_id: str, server_name: str) -> bool: hs.get_event_auth_handler().check_host_in_room = check_host_in_room - def get_joined_hosts_for_room(room_id: str): + async def get_current_hosts_in_room(room_id: str): return {member.domain for member in self.room_members} - self.datastore.get_joined_hosts_for_room = get_joined_hosts_for_room + hs.get_storage_controllers().state.get_current_hosts_in_room = ( + get_current_hosts_in_room + ) async def get_users_in_room(room_id: str): return {str(u) for u in self.room_members} From 4e3bca9d9cbf45c736c04ef7ec4bd352f749720b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Jun 2022 14:17:21 +0100 Subject: [PATCH 5/6] Update tests/federation/test_federation_sender.py Co-authored-by: Patrick Cloke --- tests/federation/test_federation_sender.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py index a75ba249136d..01a1db611538 100644 --- a/tests/federation/test_federation_sender.py +++ b/tests/federation/test_federation_sender.py @@ -35,7 +35,7 @@ def make_homeserver(self, reactor, clock): ) hs.get_storage_controllers().state.get_current_hosts_in_room = Mock( - return_value=make_awaitable(["test", "host2"]) + return_value=make_awaitable({"test", "host2"}) ) return hs From 9b1c2a87d41306c61ab76c694667c7a9f7cab1b0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Jun 2022 14:29:49 +0100 Subject: [PATCH 6/6] Merge newsfile --- changelog.d/12964.misc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/12964.misc b/changelog.d/12964.misc index 8f970492b346..d57e1aca6bf0 100644 --- a/changelog.d/12964.misc +++ b/changelog.d/12964.misc @@ -1 +1 @@ -Reduce state pulled from DB due to sending typing and receipts over federation. +Reduce the amount of state we pull from the DB.