From 82051c2fa74343366bcb84d1faee565133cf0243 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Oct 2020 11:01:30 +0100 Subject: [PATCH 1/5] Add basic tests for sync/pagination with vector clock tokens. --- synapse/handlers/room.py | 20 +- .../test_sharded_event_persister.py | 171 ++++++++++++++++++ tests/unittest.py | 32 +++- 3 files changed, 219 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index d0530a446c83..bfd5049232be 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -560,6 +560,7 @@ async def create_room( config: JsonDict, ratelimit: bool = True, creator_join_profile: Optional[JsonDict] = None, + requested_room_id: Optional[str] = None, ) -> Tuple[dict, int]: """ Creates a new room. @@ -576,6 +577,10 @@ async def create_room( values to go in the body of the 'join' event (typically `avatar_url` and/or `displayname`. + requested_room_id: Allow callees to request a particular room ID. + This is useful for testing event persistence sharding, and + *should not* be exposed to clients. + Returns: First, a dict containing the keys `room_id` and, if an alias was, requested, `room_alias`. Secondly, the stream_id of the @@ -678,9 +683,18 @@ async def create_room( visibility = config.get("visibility", None) is_public = visibility == "public" - room_id = await self._generate_room_id( - creator_id=user_id, is_public=is_public, room_version=room_version, - ) + if requested_room_id: + await self.store.store_room( + room_id=requested_room_id, + room_creator_user_id=user_id, + is_public=is_public, + room_version=room_version, + ) + room_id = requested_room_id + else: + room_id = await self._generate_room_id( + creator_id=user_id, is_public=is_public, room_version=room_version, + ) # Check whether this visibility value is blocked by a third party module allowed_by_third_party_rules = await ( diff --git a/tests/replication/test_sharded_event_persister.py b/tests/replication/test_sharded_event_persister.py index 6068d1490538..9e64a0401956 100644 --- a/tests/replication/test_sharded_event_persister.py +++ b/tests/replication/test_sharded_event_persister.py @@ -16,6 +16,8 @@ from synapse.rest import admin from synapse.rest.client.v1 import login, room +from synapse.rest.client.v2_alpha import sync +from synapse.types import create_requester from tests.replication._base import BaseMultiWorkerStreamTestCase from tests.utils import USE_POSTGRES_FOR_TESTS @@ -36,6 +38,7 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase): admin.register_servlets_for_client_rest_resource, room.register_servlets, login.register_servlets, + sync.register_servlets, ] def prepare(self, reactor, clock, hs): @@ -100,3 +103,171 @@ def test_basic(self): self.assertTrue(persisted_on_1) self.assertTrue(persisted_on_2) + + def test_vector_clock_token(self): + """Tests that using a stream token with a vector clock component works + correctly with basic /sync and /messages usage. + """ + + self.make_worker_hs( + "synapse.app.generic_worker", {"worker_name": "worker1"}, + ) + + self.make_worker_hs( + "synapse.app.generic_worker", {"worker_name": "worker2"}, + ) + + sync_hs = self.make_worker_hs( + "synapse.app.generic_worker", {"worker_name": "sync"}, + ) + + # Specially selected room IDs that get persisted on different workers. + room_id1 = "!foo:test" + room_id2 = "!baz:test" + + self.assertEqual( + self.hs.config.worker.events_shard_config.get_instance(room_id1), "worker1" + ) + self.assertEqual( + self.hs.config.worker.events_shard_config.get_instance(room_id2), "worker2" + ) + + user_id = self.register_user("user", "pass") + access_token = self.login("user", "pass") + requester = create_requester(user_id, access_token) + + store = self.hs.get_datastore() + + # Create two room on the different workers. + room_creator = self.hs.get_room_creation_handler() + self.get_success( + room_creator.create_room(requester, {}, requested_room_id=room_id1), by=0.1 + ) + self.get_success( + room_creator.create_room(requester, {}, requested_room_id=room_id2), by=0.1 + ) + + # The other user joins + self.helper.join( + room=room_id1, user=self.other_user_id, tok=self.other_access_token + ) + self.helper.join( + room=room_id2, user=self.other_user_id, tok=self.other_access_token + ) + + # Do an initial sync so that we're up to date. + request, channel = self.make_request("GET", "/sync", access_token=access_token) + self.render_on_worker(sync_hs, request) + next_batch = channel.json_body["next_batch"] + + # We now fetch and throw away a stream ID so that there will be a gap in + # the stream orderings. This means that the MultiWriterIdGenerators + # won't be able to intelligently "roll foward" the persisted upto + # position, resulting in a RoomStreamToken that has non-empty instance + # map component. + # + # Note: ideally we'd try to simulate one event persister getting behind + # in a more realistic way, but that involves adding quite a bit of code + # to support doing that. + self.get_success( + store.db_pool.execute( + "test", None, "SELECT nextval(?)", "events_stream_seq" + ) + ) + + response = self.helper.send(room_id1, body="Hi!", tok=self.other_access_token) + first_event_in_room1 = response["event_id"] + + # Assert that the current stream token has an instance map component, as + # we are trying to test vector clock tokens. + room_stream_token = store.get_room_max_token() + self.assertNotEqual(len(room_stream_token.instance_map), 0) + + # Check that syncing still gets the new event, despite the gap in the + # stream IDs. + request, channel = self.make_request( + "GET", "/sync?since={}".format(next_batch), access_token=access_token + ) + self.render_on_worker(sync_hs, request) + + # We should only see the new event and nothing else + self.assertIn(room_id1, channel.json_body["rooms"]["join"]) + self.assertNotIn(room_id2, channel.json_body["rooms"]["join"]) + + events = channel.json_body["rooms"]["join"][room_id1]["timeline"]["events"] + self.assertListEqual( + [first_event_in_room1], [event["event_id"] for event in events] + ) + + # Get the next batch and makes sure its a vector clock style token. + vector_clock_token = channel.json_body["next_batch"] + self.assertTrue(vector_clock_token.startswith("m")) + + # Now try and send an event to the other rooom so that we can test that + # the vector clock style token works as a `since` token. + response = self.helper.send(room_id2, body="Hi!", tok=self.other_access_token) + first_event_in_room2 = response["event_id"] + + request, channel = self.make_request( + "GET", + "/sync?since={}".format(vector_clock_token), + access_token=access_token, + ) + self.render_on_worker(sync_hs, request) + + self.assertNotIn(room_id1, channel.json_body["rooms"]["join"]) + self.assertIn(room_id2, channel.json_body["rooms"]["join"]) + + events = channel.json_body["rooms"]["join"][room_id2]["timeline"]["events"] + self.assertListEqual( + [first_event_in_room2], [event["event_id"] for event in events] + ) + + next_batch = channel.json_body["next_batch"] + + # We also want to test that the vector clock style token works with + # pagination. We do this by sending a couple of new events into the room + # and syncing again to get a prev_batch token for each room, then + # paginating from there back to the vector clock token. + self.helper.send(room_id1, body="Hi again!", tok=self.other_access_token) + self.helper.send(room_id2, body="Hi again!", tok=self.other_access_token) + + request, channel = self.make_request( + "GET", "/sync?since={}".format(next_batch), access_token=access_token + ) + self.render_on_worker(sync_hs, request) + + prev_batch1 = channel.json_body["rooms"]["join"][room_id1]["timeline"][ + "prev_batch" + ] + prev_batch2 = channel.json_body["rooms"]["join"][room_id2]["timeline"][ + "prev_batch" + ] + + # Paginating back in the first room should not produce any results, as + # no events have happened in it. This tests that we are correctly + # filtering results based on the vector clock portion. + request, channel = self.make_request( + "GET", + "/rooms/{}/messages?from={}&to={}&dir=b".format( + room_id1, prev_batch1, vector_clock_token + ), + access_token=access_token, + ) + self.render_on_worker(sync_hs, request) + self.assertListEqual([], channel.json_body["chunk"]) + + # Paginating back on the second room should produce the first event + # again. This tests that pagination isn't completely broken. + request, channel = self.make_request( + "GET", + "/rooms/{}/messages?from={}&to={}&dir=b".format( + room_id2, prev_batch2, vector_clock_token + ), + access_token=access_token, + ) + self.render_on_worker(sync_hs, request) + self.assertEqual(len(channel.json_body["chunk"]), 1) + self.assertEqual( + channel.json_body["chunk"][0]["event_id"], first_event_in_room2 + ) diff --git a/tests/unittest.py b/tests/unittest.py index 5c87f6097ec8..f977e7a7d069 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -20,7 +20,7 @@ import inspect import logging import time -from typing import Optional, Tuple, Type, TypeVar, Union +from typing import Optional, Tuple, Type, TypeVar, Union, overload from mock import Mock, patch @@ -357,6 +357,36 @@ def prepare(self, reactor, clock, homeserver): Function to optionally be overridden in subclasses. """ + # Annoyingly mypy doesn't seem to pick up the fact that T is SynapseRequest + # when the `request` arg isn't given, so we define an explicit override to + # cover that case. + @overload + def make_request( + self, + method: Union[bytes, str], + path: Union[bytes, str], + content: Union[bytes, dict] = b"", + access_token: Optional[str] = None, + shorthand: bool = True, + federation_auth_origin: str = None, + content_is_form: bool = False, + ) -> Tuple[SynapseRequest, FakeChannel]: + ... + + @overload + def make_request( + self, + method: Union[bytes, str], + path: Union[bytes, str], + content: Union[bytes, dict] = b"", + access_token: Optional[str] = None, + request: Type[T] = SynapseRequest, + shorthand: bool = True, + federation_auth_origin: str = None, + content_is_form: bool = False, + ) -> Tuple[T, FakeChannel]: + ... + def make_request( self, method: Union[bytes, str], From a5e09c5327077c53876cd66baa3e4e72793295f6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Oct 2020 15:20:01 +0100 Subject: [PATCH 2/5] Newsfile --- changelog.d/8488.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8488.misc diff --git a/changelog.d/8488.misc b/changelog.d/8488.misc new file mode 100644 index 000000000000..237cb3b31135 --- /dev/null +++ b/changelog.d/8488.misc @@ -0,0 +1 @@ +Allow events to be sent to clients sooner when using sharded event persisters. From 0f7c0ae4cecff067a49a9b8d4fdcf65b19221088 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Oct 2020 11:12:13 +0100 Subject: [PATCH 3/5] Fix test after merge of develop --- .../test_sharded_event_persister.py | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/replication/test_sharded_event_persister.py b/tests/replication/test_sharded_event_persister.py index 9e64a0401956..34595f4c9d2e 100644 --- a/tests/replication/test_sharded_event_persister.py +++ b/tests/replication/test_sharded_event_persister.py @@ -113,7 +113,7 @@ def test_vector_clock_token(self): "synapse.app.generic_worker", {"worker_name": "worker1"}, ) - self.make_worker_hs( + worker_hs2 = self.make_worker_hs( "synapse.app.generic_worker", {"worker_name": "worker2"}, ) @@ -160,20 +160,16 @@ def test_vector_clock_token(self): self.render_on_worker(sync_hs, request) next_batch = channel.json_body["next_batch"] - # We now fetch and throw away a stream ID so that there will be a gap in - # the stream orderings. This means that the MultiWriterIdGenerators - # won't be able to intelligently "roll foward" the persisted upto - # position, resulting in a RoomStreamToken that has non-empty instance - # map component. + # We now gut wrench into the events stream MultiWriterIdGenerator on + # worker2 to mimic it getting stuck persisting an event. This ensures + # that when we send an event on worker1 we end up in a state where + # worker2 events stream position lags that on worker1, resulting in a + # RoomStreamToken with a non-empty instance map component. # - # Note: ideally we'd try to simulate one event persister getting behind - # in a more realistic way, but that involves adding quite a bit of code - # to support doing that. - self.get_success( - store.db_pool.execute( - "test", None, "SELECT nextval(?)", "events_stream_seq" - ) - ) + # Worker2's event stream position will not advance until we call + # __aexit__ again. + actx = worker_hs2.get_datastore()._stream_id_gen.get_next() + self.get_success(actx.__aenter__()) response = self.helper.send(room_id1, body="Hi!", tok=self.other_access_token) first_event_in_room1 = response["event_id"] @@ -203,6 +199,10 @@ def test_vector_clock_token(self): vector_clock_token = channel.json_body["next_batch"] self.assertTrue(vector_clock_token.startswith("m")) + # Now that we've got a vector clock token we finish the fake persisting + # an event we started above. + self.get_success(actx.__aexit__(None, None, None)) + # Now try and send an event to the other rooom so that we can test that # the vector clock style token works as a `since` token. response = self.helper.send(room_id2, body="Hi!", tok=self.other_access_token) From 790c02df2ef94b709678926b8dd3f5a4a12c91ff Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Oct 2020 11:12:25 +0100 Subject: [PATCH 4/5] Test that paginating forwards also works --- .../test_sharded_event_persister.py | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tests/replication/test_sharded_event_persister.py b/tests/replication/test_sharded_event_persister.py index 34595f4c9d2e..7226d9d5b292 100644 --- a/tests/replication/test_sharded_event_persister.py +++ b/tests/replication/test_sharded_event_persister.py @@ -271,3 +271,27 @@ def test_vector_clock_token(self): self.assertEqual( channel.json_body["chunk"][0]["event_id"], first_event_in_room2 ) + + # Paginating forwards should give the same results + request, channel = self.make_request( + "GET", + "/rooms/{}/messages?from={}&to={}&dir=f".format( + room_id1, vector_clock_token, prev_batch1 + ), + access_token=access_token, + ) + self.render_on_worker(sync_hs, request) + self.assertListEqual([], channel.json_body["chunk"]) + + request, channel = self.make_request( + "GET", + "/rooms/{}/messages?from={}&to={}&dir=f".format( + room_id2, vector_clock_token, prev_batch2, + ), + access_token=access_token, + ) + self.render_on_worker(sync_hs, request) + self.assertEqual(len(channel.json_body["chunk"]), 1) + self.assertEqual( + channel.json_body["chunk"][0]["event_id"], first_event_in_room2 + ) From a83882bf2124a856297969ef7b2a1c6b6f8b4a5f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Oct 2020 11:45:36 +0100 Subject: [PATCH 5/5] Patch out _generate_id rather than changing prod code --- synapse/handlers/room.py | 20 ++-------- .../test_sharded_event_persister.py | 40 ++++++++++++++----- 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 0a1f2c2eb32d..93ed51063ac2 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -560,7 +560,6 @@ async def create_room( config: JsonDict, ratelimit: bool = True, creator_join_profile: Optional[JsonDict] = None, - requested_room_id: Optional[str] = None, ) -> Tuple[dict, int]: """ Creates a new room. @@ -577,10 +576,6 @@ async def create_room( values to go in the body of the 'join' event (typically `avatar_url` and/or `displayname`. - requested_room_id: Allow callees to request a particular room ID. - This is useful for testing event persistence sharding, and - *should not* be exposed to clients. - Returns: First, a dict containing the keys `room_id` and, if an alias was, requested, `room_alias`. Secondly, the stream_id of the @@ -683,18 +678,9 @@ async def create_room( visibility = config.get("visibility", None) is_public = visibility == "public" - if requested_room_id: - await self.store.store_room( - room_id=requested_room_id, - room_creator_user_id=user_id, - is_public=is_public, - room_version=room_version, - ) - room_id = requested_room_id - else: - room_id = await self._generate_room_id( - creator_id=user_id, is_public=is_public, room_version=room_version, - ) + room_id = await self._generate_room_id( + creator_id=user_id, is_public=is_public, room_version=room_version, + ) # Check whether this visibility value is blocked by a third party module allowed_by_third_party_rules = await ( diff --git a/tests/replication/test_sharded_event_persister.py b/tests/replication/test_sharded_event_persister.py index 7226d9d5b292..82cf033d4eca 100644 --- a/tests/replication/test_sharded_event_persister.py +++ b/tests/replication/test_sharded_event_persister.py @@ -14,10 +14,12 @@ # limitations under the License. import logging +from mock import patch + +from synapse.api.room_versions import RoomVersion from synapse.rest import admin from synapse.rest.client.v1 import login, room from synapse.rest.client.v2_alpha import sync -from synapse.types import create_requester from tests.replication._base import BaseMultiWorkerStreamTestCase from tests.utils import USE_POSTGRES_FOR_TESTS @@ -46,6 +48,9 @@ def prepare(self, reactor, clock, hs): self.other_user_id = self.register_user("otheruser", "pass") self.other_access_token = self.login("otheruser", "pass") + self.room_creator = self.hs.get_room_creation_handler() + self.store = hs.get_datastore() + def default_config(self): conf = super().default_config() conf["redis"] = {"enabled": "true"} @@ -56,6 +61,29 @@ def default_config(self): } return conf + def _create_room(self, room_id: str, user_id: str, tok: str): + """Create a room with given room_id + """ + + # We control the room ID generation by patching out the + # `_generate_room_id` method + async def generate_room( + creator_id: str, is_public: bool, room_version: RoomVersion + ): + await self.store.store_room( + room_id=room_id, + room_creator_user_id=creator_id, + is_public=is_public, + room_version=room_version, + ) + return room_id + + with patch( + "synapse.handlers.room.RoomCreationHandler._generate_room_id" + ) as mock: + mock.side_effect = generate_room + self.helper.create_room_as(user_id, tok=tok) + def test_basic(self): """Simple test to ensure that multiple rooms can be created and joined, and that different rooms get handled by different instances. @@ -134,18 +162,12 @@ def test_vector_clock_token(self): user_id = self.register_user("user", "pass") access_token = self.login("user", "pass") - requester = create_requester(user_id, access_token) store = self.hs.get_datastore() # Create two room on the different workers. - room_creator = self.hs.get_room_creation_handler() - self.get_success( - room_creator.create_room(requester, {}, requested_room_id=room_id1), by=0.1 - ) - self.get_success( - room_creator.create_room(requester, {}, requested_room_id=room_id2), by=0.1 - ) + self._create_room(room_id1, user_id, access_token) + self._create_room(room_id2, user_id, access_token) # The other user joins self.helper.join(