From bd873e65718063b6c2099104ef46d7373c858693 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 28 Jun 2022 13:08:55 +0100 Subject: [PATCH 01/23] Define config for room-level join limiter but don't use it in tests --- .../complement/conf/workers-shared-extra.yaml.j2 | 4 ++++ docs/usage/configuration/config_documentation.md | 16 ++++++++++++++++ synapse/config/ratelimiting.py | 7 +++++++ tests/utils.py | 1 + 4 files changed, 28 insertions(+) diff --git a/docker/complement/conf/workers-shared-extra.yaml.j2 b/docker/complement/conf/workers-shared-extra.yaml.j2 index 7c6a0fd7567f..20f3a012a073 100644 --- a/docker/complement/conf/workers-shared-extra.yaml.j2 +++ b/docker/complement/conf/workers-shared-extra.yaml.j2 @@ -67,6 +67,10 @@ rc_joins: per_second: 9999 burst_count: 9999 +rc_joins_per_room: + per_second: 9999 + burst_count: 9999 + rc_3pid_validation: per_second: 1000 burst_count: 1000 diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 58a74ace48d3..ed874cd8531d 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -1380,6 +1380,22 @@ rc_joins: burst_count: 12 ``` --- +### `rc_joins_per_room` + +This option allows for ratelimiting joins to a room based on the number of recent +joins (local or remote) to that room. It is intended to mitigate mass-join spam +waves which target multiple homeservers. + +Sensible values for this option are provided by default; most server admins +won't need to adjust this setting. + +Example configuration: +```yaml +rc_joins_per_room: + per_second: 1 + burst_count: 10 +``` +--- ### `rc_3pid_validation` This option ratelimits how often a user or IP can attempt to validate a 3PID. diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py index d4090a1f9ad5..f99e35cafad3 100644 --- a/synapse/config/ratelimiting.py +++ b/synapse/config/ratelimiting.py @@ -112,6 +112,13 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: defaults={"per_second": 0.01, "burst_count": 10}, ) + # Track the rate of joins to a given room. If there are too many, temporarily + # prevent local joins and remote joins via this server. + self.rc_joins_per_room = RateLimitConfig( + config.get("rc_joins_per_room", {}), + defaults={"per_second": 1, "burst_count": 10}, + ) + # Ratelimit cross-user key requests: # * For local requests this is keyed by the sending device. # * For requests received over federation this is keyed by the origin. diff --git a/tests/utils.py b/tests/utils.py index cabb2c0decc7..4f097a97c236 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -151,6 +151,7 @@ def default_config(name, parse=False): "local": {"per_second": 10000, "burst_count": 10000}, "remote": {"per_second": 10000, "burst_count": 10000}, }, + "rc_joins_per_room": {"per_second": 10000, "burst_count": 10000}, "rc_invites": { "per_room": {"per_second": 10000, "burst_count": 10000}, "per_user": {"per_second": 10000, "burst_count": 10000}, From 453f621d23725d035677d4d728e63104a072264f Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 28 Jun 2022 14:27:16 +0100 Subject: [PATCH 02/23] Rate limiter: describe leaky bucket --- synapse/api/ratelimiting.py | 38 +++++++++++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/synapse/api/ratelimiting.py b/synapse/api/ratelimiting.py index 54d13026c9e5..965032e3af6a 100644 --- a/synapse/api/ratelimiting.py +++ b/synapse/api/ratelimiting.py @@ -27,6 +27,33 @@ class Ratelimiter: """ Ratelimit actions marked by arbitrary keys. + This is a "leaky bucket as a meter". For each key to be tracked there is a bucket + containing some number 0 <= T <= `burst_count` of tokens corresponding to previously + permitted requests for that key. Each bucket starts empty, and gradually leaks + tokens at a rate of `rate_hz`. + + Upon an incoming request, we must determine: + - the key that this request falls under (which bucket to inspect), and + - the cost C of this request in tokens. + Then, if there is room in the bucket for C tokens (T + C <= `burst_count`), + the request is permitted and `cost` tokens are added to the bucket. + Otherwise the request is denied, and the bucket continues to hold T tokens. + + This means that the limiter enforces an average request frequency of `rate_hz`, + while accumulating a buffer of up to `burst_count` requests which can be consumed + instantaneously. + + The tricky bit is the leaking. We do not want to have a periodic process which + leaks every bucket! Instead, we track + - the time point when the bucket was last completely empty, and + - how many tokens have added to the bucket permitted since then. + Then for each incoming request, we can calculate how many tokens have leaked + since this time point, and use that to decide if we should accept or reject the + request. + + Note that the source code speaks of "actions" and "burst_count" rather than "tokens" + and a "bucket_size". + Args: clock: A homeserver clock, for retrieving the current time rate_hz: The long term number of actions that can be performed in a second. @@ -41,12 +68,11 @@ def __init__( self.burst_count = burst_count self.store = store - # A ordered dictionary keeping track of actions, when they were last - # performed and how often. Each entry is a mapping from a key of arbitrary type - # to a tuple representing: - # * How many times an action has occurred since a point in time - # * The point in time - # * The rate_hz of this particular entry. This can vary per request + # An ordered dictionary representing the token buckets tracked by this rate + # limiter. Each entry maps a key of arbitrary type to a tuple representing: + # * The number of tokens currently in the bucket, + # * The time point when the bucket was last completely empty, and + # * The rate_hz (leak rate) of this particular bucket. self.actions: OrderedDict[Hashable, Tuple[float, float, float]] = OrderedDict() async def can_do_action( From c2e3025b3385d8770b22a5350db8e5b23011818f Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 28 Jun 2022 15:50:18 +0100 Subject: [PATCH 03/23] Rate limiter: Pull out some small methods --- synapse/api/ratelimiting.py | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/synapse/api/ratelimiting.py b/synapse/api/ratelimiting.py index 965032e3af6a..ef6f2377cf32 100644 --- a/synapse/api/ratelimiting.py +++ b/synapse/api/ratelimiting.py @@ -75,6 +75,29 @@ def __init__( # * The rate_hz (leak rate) of this particular bucket. self.actions: OrderedDict[Hashable, Tuple[float, float, float]] = OrderedDict() + def _get_key( + self, requester: Optional[Requester], key: Optional[Hashable] + ) -> Hashable: + """Use the requester's MXID as a fallback key if no key is provided. + + Pulled out so that `can_do_action` and `record_action` are consistent. + """ + if key is None: + if not requester: + raise ValueError("Must supply at least one of `requester` or `key`") + + key = requester.user.to_string() + return key + + def _get_action_counts( + self, key: Hashable, time_now_s: float + ) -> Tuple[float, float, float]: + """Retrieve the action counts, with a fallback representing an empty bucket. + + Pulled out so that `can_do_action` and `record_action` are consistent. + """ + return self.actions.get(key, (0.0, time_now_s, 0.0)) + async def can_do_action( self, requester: Optional[Requester], @@ -114,11 +137,7 @@ async def can_do_action( * The reactor timestamp for when the action can be performed next. -1 if rate_hz is less than or equal to zero """ - if key is None: - if not requester: - raise ValueError("Must supply at least one of `requester` or `key`") - - key = requester.user.to_string() + key = self._get_key(requester, key) if requester: # Disable rate limiting of users belonging to any AS that is configured @@ -147,7 +166,7 @@ async def can_do_action( self._prune_message_counts(time_now_s) # Check if there is an existing count entry for this key - action_count, time_start, _ = self.actions.get(key, (0.0, time_now_s, 0.0)) + action_count, time_start, _ = self._get_action_counts(key, time_now_s) # Check whether performing another action is allowed time_delta = time_now_s - time_start From c594ab774b325cac582aee511f2d7d5f1b76649f Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 28 Jun 2022 15:50:32 +0100 Subject: [PATCH 04/23] Rate limiter: Introduce `record_action` --- synapse/api/ratelimiting.py | 31 ++++++++++++++ tests/api/test_ratelimiting.py | 74 ++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+) diff --git a/synapse/api/ratelimiting.py b/synapse/api/ratelimiting.py index ef6f2377cf32..92c2f3b42a78 100644 --- a/synapse/api/ratelimiting.py +++ b/synapse/api/ratelimiting.py @@ -209,6 +209,37 @@ async def can_do_action( return allowed, time_allowed + def record_action( + self, + requester: Optional[Requester], + key: Optional[Hashable] = None, + n_actions: int = 1, + _time_now_s: Optional[float] = None, + ) -> None: + """Record that an action(s) took place, even if they violate the rate limit. + + This is useful for tracking the frequency of events that happen across + federation which we still want to impose local rate limits on. For instance, if + we are alice.com monitoring a particular room, we cannot prevent bob.com + from joining users to that room. However, we can track the number of recent + joins in the room and refuse to serve new joins ourselves if there have been too + many in the room across both homeservers. + + Args: + requester: The requester that is doing the action, if any. + key: An arbitrary key used to classify an action. Defaults to the + requester's user ID. + n_actions: The number of times the user wants to do this action. If the user + cannot do all of the actions, the user's action count is not incremented + at all. + _time_now_s: The current time. Optional, defaults to the current time according + to self.clock. Only used by tests. + """ + key = self._get_key(requester, key) + time_now_s = _time_now_s if _time_now_s is not None else self.clock.time() + action_count, time_start, rate_hz = self._get_action_counts(key, time_now_s) + self.actions[key] = (action_count + n_actions, time_start, rate_hz) + def _prune_message_counts(self, time_now_s: float) -> None: """Remove message count entries that have not exceeded their defined rate_hz limit diff --git a/tests/api/test_ratelimiting.py b/tests/api/test_ratelimiting.py index 18649c2c05dc..c86f783c5bd4 100644 --- a/tests/api/test_ratelimiting.py +++ b/tests/api/test_ratelimiting.py @@ -314,3 +314,77 @@ def consume_at(time: float) -> bool: # Check that we get rate limited after using that token. self.assertFalse(consume_at(11.1)) + + def test_record_action_which_doesnt_fill_bucket(self) -> None: + limiter = Ratelimiter( + store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=3 + ) + + # Observe two actions, leaving room in the bucket for one more. + limiter.record_action(requester=None, key="a", n_actions=2, _time_now_s=0.0) + + # We should be able to take a new action now. + success, _ = self.get_success_or_raise( + limiter.can_do_action(requester=None, key="a", _time_now_s=0.0) + ) + self.assertTrue(success) + + # ... but not two. + success, _ = self.get_success_or_raise( + limiter.can_do_action(requester=None, key="a", _time_now_s=0.0) + ) + self.assertFalse(success) + + def test_record_action_which_fills_bucket(self) -> None: + limiter = Ratelimiter( + store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=3 + ) + + # Observe three actions, filling up the bucket. + limiter.record_action(requester=None, key="a", n_actions=3, _time_now_s=0.0) + + # We should be unable to take a new action now. + success, _ = self.get_success_or_raise( + limiter.can_do_action(requester=None, key="a", _time_now_s=0.0) + ) + self.assertFalse(success) + + # If we wait 10 seconds to leak a token, we should be able to take one action... + success, _ = self.get_success_or_raise( + limiter.can_do_action(requester=None, key="a", _time_now_s=10.0) + ) + self.assertTrue(success) + + # ... but not two. + success, _ = self.get_success_or_raise( + limiter.can_do_action(requester=None, key="a", _time_now_s=10.0) + ) + self.assertFalse(success) + + def test_record_action_which_overfills_bucket(self) -> None: + limiter = Ratelimiter( + store=self.hs.get_datastores().main, clock=None, rate_hz=0.1, burst_count=3 + ) + + # Observe four actions, exceeding the bucket. + limiter.record_action(requester=None, key="a", n_actions=4, _time_now_s=0.0) + + # We should be prevented from taking a new action now. + success, _ = self.get_success_or_raise( + limiter.can_do_action(requester=None, key="a", _time_now_s=0.0) + ) + self.assertFalse(success) + + # If we wait 10 seconds to leak a token, we should be unable to take an action + # because the bucket is still full. + success, _ = self.get_success_or_raise( + limiter.can_do_action(requester=None, key="a", _time_now_s=10.0) + ) + self.assertFalse(success) + + # But after another 10 seconds we leak a second token, giving us room for + # action. + success, _ = self.get_success_or_raise( + limiter.can_do_action(requester=None, key="a", _time_now_s=20.0) + ) + self.assertTrue(success) From 9d4cdae33ae6de8b6b98fe1446add4fc101aa078 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 4 Jul 2022 13:53:58 +0100 Subject: [PATCH 05/23] Notifier: accept callbacks to fire on room joins --- synapse/notifier.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/synapse/notifier.py b/synapse/notifier.py index 54b0ec4b97b4..c42bb8266add 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -228,6 +228,7 @@ def __init__(self, hs: "HomeServer"): # Called when there are new things to stream over replication self.replication_callbacks: List[Callable[[], None]] = [] + self._new_join_in_room_callbacks: List[Callable[[str, str], None]] = [] self._federation_client = hs.get_federation_http_client() @@ -280,6 +281,19 @@ def add_replication_callback(self, cb: Callable[[], None]) -> None: """ self.replication_callbacks.append(cb) + def add_new_join_in_room_callback(self, cb: Callable[[str, str], None]) -> None: + """Add a callback that will be called when a user joins a room. + + This only fires on genuine membership changes, e.g. "invite" -> "join". + Membership transitions like "join" -> "join" (for e.g. displayname changes) do + not trigger the callback. + + When called, the callback receives two arguments: the event ID and the room ID. + It should *not* return a Deferred - if it needs to do any asynchronous work, a + background thread should be started and wrapped with run_as_background_process. + """ + self._new_join_in_room_callbacks.append(cb) + async def on_new_room_event( self, event: EventBase, @@ -723,6 +737,10 @@ def notify_replication(self) -> None: for cb in self.replication_callbacks: cb() + def notify_user_joined_room(self, event_id: str, room_id: str) -> None: + for cb in self._new_join_in_room_callbacks: + cb(event_id, room_id) + def notify_remote_server_up(self, server: str) -> None: """Notify any replication that a remote server has come back up""" # We call federation_sender directly rather than registering as a From 77de15927acd8343fd54309e3e9c109cc71fcf39 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 28 Jun 2022 17:15:16 +0100 Subject: [PATCH 06/23] Room member: drive-by-comment --- synapse/handlers/room_member.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index bf6bae123273..de1f1c1d4543 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -94,6 +94,9 @@ def __init__(self, hs: "HomeServer"): rate_hz=hs.config.ratelimiting.rc_joins_local.per_second, burst_count=hs.config.ratelimiting.rc_joins_local.burst_count, ) + # Tracks joins from local users to rooms this server isn't a member of. + # I.e. joins this server makes by requesting /make_join /send_join from + # another server. self._join_rate_limiter_remote = Ratelimiter( store=self.store, clock=self.clock, From ae788ca796f208b2b2a7999e1082707b7816a3d4 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 29 Jun 2022 18:25:54 +0100 Subject: [PATCH 07/23] Replication: include `outlier` in event rows Warn about replication problem in upgrade notes --- docs/upgrade.md | 12 ++++++++++ synapse/replication/tcp/streams/events.py | 1 + .../storage/databases/main/events_worker.py | 22 ++++++++++++------- 3 files changed, 27 insertions(+), 8 deletions(-) diff --git a/docs/upgrade.md b/docs/upgrade.md index 312f0b87fedc..d5753ae6f61f 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -89,6 +89,18 @@ process, for example: dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb ``` +# Upgrading to v1.63.0 + +## Changes to the event replication streams + +Synapse now includes a flag indicating if an event is an outlier when +replicating it to other workers. This is a forwards- and backwards-incompatible +change: v1.62 and workers cannot process events replicated by v1.63 workers, and +vice versa. + +Once all workers are upgraded to v1.63 (or downgraded to v1.62), event +replication will resume as normal. + # Upgrading to v1.62.0 ## New signatures for spam checker callbacks diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index 26f4fa7cfd16..14b6705862ac 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -98,6 +98,7 @@ class EventsStreamEventRow(BaseEventsStreamRow): relates_to: Optional[str] membership: Optional[str] rejected: bool + outlier: bool @attr.s(slots=True, frozen=True, auto_attribs=True) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index b99b10778490..fc874c1e6cd0 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1465,7 +1465,7 @@ async def get_room_complexity(self, room_id: str) -> Dict[str, float]: async def get_all_new_forward_event_rows( self, instance_name: str, last_id: int, current_id: int, limit: int - ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]: + ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]: """Returns new events, for the Events replication stream Args: @@ -1481,10 +1481,11 @@ async def get_all_new_forward_event_rows( def get_all_new_forward_event_rows( txn: LoggingTransaction, - ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]: + ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]: sql = ( "SELECT e.stream_ordering, e.event_id, e.room_id, e.type," - " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" + " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL," + " e.outlier" " FROM events AS e" " LEFT JOIN redactions USING (event_id)" " LEFT JOIN state_events AS se USING (event_id)" @@ -1498,7 +1499,8 @@ def get_all_new_forward_event_rows( ) txn.execute(sql, (last_id, current_id, instance_name, limit)) return cast( - List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall() + List[Tuple[int, str, str, str, str, str, str, str, bool, bool]], + txn.fetchall(), ) return await self.db_pool.runInteraction( @@ -1507,7 +1509,7 @@ def get_all_new_forward_event_rows( async def get_ex_outlier_stream_rows( self, instance_name: str, last_id: int, current_id: int - ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]: + ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]: """Returns de-outliered events, for the Events replication stream Args: @@ -1522,11 +1524,14 @@ async def get_ex_outlier_stream_rows( def get_ex_outlier_stream_rows_txn( txn: LoggingTransaction, - ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]: + ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]: sql = ( "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," - " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" + " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL," + " e.outlier" " FROM events AS e" + # NB: the next line (inner join) is what makes this query different from + # get_all_new_forward_event_rows. " INNER JOIN ex_outlier_stream AS out USING (event_id)" " LEFT JOIN redactions USING (event_id)" " LEFT JOIN state_events AS se USING (event_id)" @@ -1541,7 +1546,8 @@ def get_ex_outlier_stream_rows_txn( txn.execute(sql, (last_id, current_id, instance_name)) return cast( - List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall() + List[Tuple[int, str, str, str, str, str, str, str, bool, bool]], + txn.fetchall(), ) return await self.db_pool.runInteraction( From 42301125269edaf82125d27625f3116b85e3ec1c Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 4 Jul 2022 14:11:04 +0100 Subject: [PATCH 08/23] Add helper to determine if we persist event or not --- synapse/server.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/synapse/server.py b/synapse/server.py index 181984a1a491..4487c92fae7c 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -827,3 +827,12 @@ def get_request_ratelimiter(self) -> RequestRatelimiter: self.config.ratelimiting.rc_message, self.config.ratelimiting.rc_admin_redaction, ) + + def persists_events_for_room(self, room_id: str) -> bool: + """Is this worker responsible for persisting events in the given room? + + Or does it ask another worker to do that for us?""" + return ( + self.get_instance_name() + == self.config.worker.events_shard_config.get_instance(room_id) + ) From 6b47e82ca2e4b3709bb55e1ba98079aaf4e96589 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 29 Jun 2022 12:28:53 +0100 Subject: [PATCH 09/23] Track per-room join rates actioned by this worker and consult it when actioning joins. Only bump rate limit if we will persist the event; otherwise we'll see it over replication --- synapse/federation/federation_server.py | 16 +++++++++++ synapse/handlers/room_member.py | 35 +++++++++++++++++++++++++ synapse/replication/http/send_event.py | 12 +++++++++ 3 files changed, 63 insertions(+) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 3e1518f1f60e..41c226792549 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -117,6 +117,7 @@ def __init__(self, hs: "HomeServer"): self._federation_event_handler = hs.get_federation_event_handler() self.state = hs.get_state_handler() self._event_auth_handler = hs.get_event_auth_handler() + self._room_member_handler = hs.get_room_member_handler() self._state_storage_controller = hs.get_storage_controllers().state @@ -620,6 +621,15 @@ async def on_make_join_request( ) raise IncompatibleRoomVersionError(room_version=room_version) + # Refuse the request if that room has seen too many joins recently. + # This is in addition to the HS-level rate limiting applied by + # BaseFederationServlet. + # type-ignore: mypy doesn't seem able to deduce the type of the limiter(!?) + await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type] + requester=None, + key=room_id, + update=False, + ) pdu = await self.handler.on_make_join_request(origin, room_id, user_id) return {"event": pdu.get_templated_pdu_json(), "room_version": room_version} @@ -654,6 +664,12 @@ async def on_send_join_request( room_id: str, caller_supports_partial_state: bool = False, ) -> Dict[str, Any]: + await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type] + requester=None, + key=room_id, + update=self.hs.persists_events_for_room(room_id), + ) + event, context = await self._on_send_membership_event( origin, content, Membership.JOIN, room_id ) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index de1f1c1d4543..319c7a14e7d5 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -103,6 +103,19 @@ def __init__(self, hs: "HomeServer"): rate_hz=hs.config.ratelimiting.rc_joins_remote.per_second, burst_count=hs.config.ratelimiting.rc_joins_remote.burst_count, ) + # TODO: find a better place to keep this Ratelimiter. + # It needs to be + # - written to by something which can snoop on replication streams + # - read by the RoomMemberHandler to rate limit joins from local users + # - read by the FederationServer to rate limit make_joins and send_joins from + # other homeservers + # I wonder if a homeserver-wide collection of rate limiters might be cleaner? + self._join_rate_per_room_limiter = Ratelimiter( + store=self.store, + clock=self.clock, + rate_hz=hs.config.ratelimiting.rc_joins_per_room.per_second, + burst_count=hs.config.ratelimiting.rc_joins_per_room.burst_count, + ) self._invites_per_room_limiter = Ratelimiter( store=self.store, @@ -125,6 +138,18 @@ def __init__(self, hs: "HomeServer"): ) self.request_ratelimiter = hs.get_request_ratelimiter() + hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room) + + def _on_user_joined_room(self, event_id: str, room_id: str) -> None: + """Notify the rate limiter that a room join has occurred. + + Use this to inform the RoomMemberHandler about joins that have either + - taken place on another homeserver, or + - on another worker in this homeserver. + Joins actioned by this worker should use the usual `ratelimit` method, which + checks the limit and increments the counter in one go. + """ + self._join_rate_per_room_limiter.record_action(requester=None, key=room_id) @abc.abstractmethod async def _remote_join( @@ -378,6 +403,11 @@ async def _local_membership_update( # up blocking profile updates. if newly_joined and ratelimit: await self._join_rate_limiter_local.ratelimit(requester) + await self._join_rate_per_room_limiter.ratelimit( + requester, + key=room_id, + update=self.hs.persists_events_for_room(room_id), + ) result_event = await self.event_creation_handler.handle_new_client_event( requester, @@ -826,6 +856,11 @@ async def update_membership_locked( await self._join_rate_limiter_remote.ratelimit( requester, ) + await self._join_rate_per_room_limiter.ratelimit( + requester, + key=room_id, + update=self.hs.persists_events_for_room(room_id), + ) inviter = await self._get_inviter(target.to_string(), room_id) if inviter and not self.hs.is_mine(inviter): diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index c2b2588ea548..6af013cd0a5e 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -17,6 +17,7 @@ from twisted.web.server import Request +from synapse.api.constants import EventTypes, Membership from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import EventBase, make_event_from_dict from synapse.events.snapshot import EventContext @@ -72,6 +73,7 @@ def __init__(self, hs: "HomeServer"): self.store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() self.clock = hs.get_clock() + self._notifier = hs.get_notifier() @staticmethod async def _serialize_payload( # type: ignore[override] @@ -138,6 +140,16 @@ async def _handle_request( # type: ignore[override] "Got event to send with ID: %s into room: %s", event.event_id, event.room_id ) + if event.type == EventTypes.Member and event.membership == Membership.JOIN: + ( + current_membership, + _, + ) = await self.store.get_local_current_membership_for_user_in_room( + event.state_key, event.room_id + ) + if current_membership != Membership.JOIN: + self._notifier.notify_user_joined_room(event.event_id, event.room_id) + event = await self.event_creation_handler.persist_and_notify_client_event( requester, event, context, ratelimit=ratelimit, extra_users=extra_users ) From 0bb4122726588d36ffa5f569a7f3c797640e2b12 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 29 Jun 2022 12:17:49 +0100 Subject: [PATCH 10/23] Snoop on replication to learn about joins on other workers --- synapse/replication/tcp/client.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 2f59245058e7..e4f2201c922f 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -21,7 +21,7 @@ from twisted.internet.protocol import ReconnectingClientFactory from twisted.python.failure import Failure -from synapse.api.constants import EventTypes, ReceiptTypes +from synapse.api.constants import EventTypes, Membership, ReceiptTypes from synapse.federation import send_queue from synapse.federation.sender import FederationSender from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable @@ -219,6 +219,21 @@ async def on_rdata( membership=row.data.membership, ) + # If this event is a join, make a note of it so we have an accurate + # cross-worker room rate limit. + # TODO: Erik said we should exclude rows that came from ex_outliers + # here, but I don't see how we can determine that. I guess we could + # add a flag to row.data? + if ( + row.data.type == EventTypes.Member + and row.data.membership == Membership.JOIN + and not row.data.outlier + ): + # TODO retrieve the previous state, and exclude join -> join transitions + self.notifier.notify_user_joined_room( + row.data.event_id, row.data.room_id + ) + await self._presence_handler.process_replication_rows( stream_name, instance_name, token, rows ) From 7a14b94698124ce9713e98f27454c84fa0da2478 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 29 Jun 2022 19:49:57 +0100 Subject: [PATCH 11/23] Test cases --- tests/federation/test_federation_server.py | 66 ++++++- tests/handlers/test_room_member.py | 199 +++++++++++++++++++++ 2 files changed, 264 insertions(+), 1 deletion(-) create mode 100644 tests/handlers/test_room_member.py diff --git a/tests/federation/test_federation_server.py b/tests/federation/test_federation_server.py index 413b3c9426f4..afe67621fd95 100644 --- a/tests/federation/test_federation_server.py +++ b/tests/federation/test_federation_server.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from http import HTTPStatus from parameterized import parameterized @@ -148,7 +149,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer): tok2 = self.login("fozzie", "bear") self.helper.join(self._room_id, second_member_user_id, tok=tok2) - def _make_join(self, user_id) -> JsonDict: + def _make_join(self, user_id: str) -> JsonDict: channel = self.make_signed_federation_request( "GET", f"/_matrix/federation/v1/make_join/{self._room_id}/{user_id}" @@ -264,6 +265,69 @@ def test_send_join_partial_state(self): ) self.assertEqual(r[("m.room.member", joining_user)].membership, "join") + @override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 3}}) + def test_make_join_respects_room_join_rate_limit(self) -> None: + # In the test setup, two users join the room. Since the rate limiter burst + # count is 3, a new make_join request to the room should be accepted. + + joining_user = "@ronniecorbett:" + self.OTHER_SERVER_NAME + self._make_join(joining_user) + + # Now have a new local user join the room. This saturates the rate limiter + # bucket, so the next make_join should be denied. + new_local_user = self.register_user("animal", "animal") + token = self.login("animal", "animal") + self.helper.join(self._room_id, new_local_user, tok=token) + + joining_user = "@ronniebarker:" + self.OTHER_SERVER_NAME + channel = self.make_signed_federation_request( + "GET", + f"/_matrix/federation/v1/make_join/{self._room_id}/{joining_user}" + f"?ver={DEFAULT_ROOM_VERSION}", + ) + self.assertEqual(channel.code, HTTPStatus.TOO_MANY_REQUESTS, channel.json_body) + + @override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 3}}) + def test_send_join_contributes_to_room_join_rate_limit_and_is_limited(self) -> None: + # Make two make_join requests up front. (These are rate limited, but do not + # contribute to the rate limit.) + join_event_dicts = [] + for i in range(2): + joining_user = f"@misspiggy{i}:{self.OTHER_SERVER_NAME}" + join_result = self._make_join(joining_user) + join_event_dict = join_result["event"] + add_hashes_and_signatures( + KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION], + join_event_dict, + signature_name=self.OTHER_SERVER_NAME, + signing_key=self.OTHER_SERVER_SIGNATURE_KEY, + ) + join_event_dicts.append(join_event_dict) + + # In the test setup, two users join the room. Since the rate limiter burst + # count is 3, the first send_join should be accepted... + channel = self.make_signed_federation_request( + "PUT", + f"/_matrix/federation/v2/send_join/{self._room_id}/join0", + content=join_event_dicts[0], + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # ... but the second should be denied. + channel = self.make_signed_federation_request( + "PUT", + f"/_matrix/federation/v2/send_join/{self._room_id}/join1", + content=join_event_dicts[1], + ) + self.assertEqual(channel.code, HTTPStatus.TOO_MANY_REQUESTS, channel.json_body) + + # NB: we could write a test which checks that the send_join event is seen + # by other workers over replication, and that they update their rate limit + # buckets accordingly. I'm going to assume that the join event gets sent over + # replication, at which point the tests.handlers.room_member test + # test_local_users_joining_on_another_worker_contribute_to_rate_limit + # is probably sufficient to reassure that the bucket is updated. + def _create_acl_event(content): return make_event_from_dict( diff --git a/tests/handlers/test_room_member.py b/tests/handlers/test_room_member.py new file mode 100644 index 000000000000..5c927f0c232e --- /dev/null +++ b/tests/handlers/test_room_member.py @@ -0,0 +1,199 @@ +from http import HTTPStatus +from unittest.mock import patch + +from twisted.test.proto_helpers import MemoryReactor + +import synapse.rest.admin +import synapse.rest.client.login +import synapse.rest.client.room +from synapse.api.constants import Membership +from synapse.api.errors import LimitExceededError +from synapse.server import HomeServer +from synapse.types import UserID, create_requester +from synapse.util import Clock + +from tests.replication._base import RedisMultiWorkerStreamTestCase +from tests.server import make_request +from tests.test_utils import make_awaitable +from tests.unittest import HomeserverTestCase, override_config + + +class TestJoinsLimitedByPerRoomRateLimiter(HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets, + synapse.rest.client.login.register_servlets, + synapse.rest.client.room.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.handler = hs.get_room_member_handler() + + # Create three users. + self.alice = self.register_user("alice", "pass") + self.alice_token = self.login("alice", "pass") + self.bob = self.register_user("bob", "pass") + self.bob_token = self.login("bob", "pass") + self.chris = self.register_user("chris", "pass") + self.chris_token = self.login("chris", "pass") + + # Create a room on this homeserver. + # Note that this counts as a + self.room_id = self.helper.create_room_as(self.alice, tok=self.alice_token) + self.intially_unjoined_room_id = "!example:otherhs" + + @override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 2}}) + def test_local_user_local_joins_contribute_to_limit_and_are_limited(self) -> None: + # The rate limiter has accumulated one token from Alice's join after the create + # event. + # Try joining the room as Bob. + self.get_success( + self.handler.update_membership( + requester=create_requester(self.bob), + target=UserID.from_string(self.bob), + room_id=self.room_id, + action=Membership.JOIN, + ) + ) + + # The rate limiter bucket is full. A second join should be denied. + self.get_failure( + self.handler.update_membership( + requester=create_requester(self.chris), + target=UserID.from_string(self.chris), + room_id=self.room_id, + action=Membership.JOIN, + ), + LimitExceededError, + ) + + @override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 2}}) + def test_local_user_profile_edits_dont_contribute_to_limit(self) -> None: + # The rate limiter has accumulated one token from Alice's join after the create + # event. Alice should still be able to change her displayname. + self.get_success( + self.handler.update_membership( + requester=create_requester(self.alice), + target=UserID.from_string(self.alice), + room_id=self.room_id, + action=Membership.JOIN, + content={"displayname": "Alice Cooper"}, + ) + ) + + # Still room in the limiter bucket. Chris's join should be accepted. + self.get_success( + self.handler.update_membership( + requester=create_requester(self.chris), + target=UserID.from_string(self.chris), + room_id=self.room_id, + action=Membership.JOIN, + ) + ) + + @override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 1}}) + def test_remote_joins_contribute_to_rate_limit(self) -> None: + # Join once, to fill the rate limiter bucket. Patch out the `_remote_join" call + # because there is no other homeserver for us to join via. + with patch.object( + self.handler, + "_remote_join", + return_value=make_awaitable(("$dummy_event", 1000)), + ): + self.get_success( + self.handler.update_membership( + requester=create_requester(self.bob), + target=UserID.from_string(self.bob), + room_id=self.intially_unjoined_room_id, + action=Membership.JOIN, + ) + ) + + # Try to join as Chris. Should get denied. + self.get_failure( + self.handler.update_membership( + requester=create_requester(self.chris), + target=UserID.from_string(self.chris), + room_id=self.intially_unjoined_room_id, + action=Membership.JOIN, + ), + LimitExceededError, + ) + + # TODO: test that remote joins to a room are rate limited. + # Could do this by setting the burst count to 1, then: + # - remote-joining a room + # - immediately leaving + # - trying to remote-join again. + + +class TestReplicatedJoinsLimitedByPerRoomRateLimiter(RedisMultiWorkerStreamTestCase): + servlets = [ + synapse.rest.admin.register_servlets, + synapse.rest.client.login.register_servlets, + synapse.rest.client.room.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.handler = hs.get_room_member_handler() + + # Create three users. + self.alice = self.register_user("alice", "pass") + self.alice_token = self.login("alice", "pass") + self.bob = self.register_user("bob", "pass") + self.bob_token = self.login("bob", "pass") + self.chris = self.register_user("chris", "pass") + self.chris_token = self.login("chris", "pass") + + # Create a room on this homeserver. + # Note that this counts as a + self.room_id = self.helper.create_room_as(self.alice, tok=self.alice_token) + self.intially_unjoined_room_id = "!example:otherhs" + + @override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 2}}) + def test_local_users_joining_on_another_worker_contribute_to_rate_limit( + self, + ) -> None: + # The rate limiter has accumulated one token from Alice's join after the create + # event. + self.replicate() + + # Spawn another worker and have bob join via it. + worker_app = self.make_worker_hs( + "synapse.app.generic_worker", extra_config={"worker_name": "other worker"} + ) + worker_site = self._hs_to_site[worker_app] + channel = make_request( + self.reactor, + worker_site, + "POST", + f"/_matrix/client/v3/rooms/{self.room_id}/join", + access_token=self.bob_token, + ) + self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body) + + # wait for join to arrive over replication + self.replicate() + + # Try to join as Chris on the worker. Should get denied because Alice + # and Bob have both joined the room. + self.get_failure( + worker_app.get_room_member_handler().update_membership( + requester=create_requester(self.chris), + target=UserID.from_string(self.chris), + room_id=self.room_id, + action=Membership.JOIN, + ), + LimitExceededError, + ) + + # Try to join as Chris on the original worker. Should get denied because Alice + # and Bob have both joined the room. + self.get_failure( + self.handler.update_membership( + requester=create_requester(self.chris), + target=UserID.from_string(self.chris), + room_id=self.room_id, + action=Membership.JOIN, + ), + LimitExceededError, + ) From 240e32f264acdca2954c9689429d7dd6ba0ee83b Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 4 Jul 2022 11:37:51 +0100 Subject: [PATCH 12/23] Fixes to unit tests - FederatingHomeserverTestCase: keys last 10x longer I am guessing that this is the first example of a test which calls `make_signed_federation_request` after the reactor has advanced by >= 1 second until now. - Increase max_request_body_size to 4KB in tests The previous limit prevented the master from accepting some replication requests which were 1.3kB in size. --- tests/test_server.py | 2 +- tests/unittest.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_server.py b/tests/test_server.py index 847432f7915f..038a7b8a0b6a 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -225,7 +225,7 @@ def _make_request(self, method, path): parse_listener_def({"type": "http", "port": 0}), self.resource, "1.0", - max_request_body_size=1234, + max_request_body_size=4096, reactor=self.reactor, ) diff --git a/tests/unittest.py b/tests/unittest.py index c645dd35630a..b6c0b83a0cbe 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -285,7 +285,7 @@ def setUp(self): config=self.hs.config.server.listeners[0], resource=self.resource, server_version_string="1", - max_request_body_size=1234, + max_request_body_size=4096, reactor=self.reactor, ) @@ -780,7 +780,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer): verify_key_id, FetchKeyResult( verify_key=verify_key, - valid_until_ts=clock.time_msec() + 1000, + valid_until_ts=clock.time_msec() + 10000, ), ) ], From 121590a0c98ab8c703850285e40884d53a655794 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 4 Jul 2022 13:07:56 +0100 Subject: [PATCH 13/23] Changelog --- changelog.d/13169.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/13169.feature diff --git a/changelog.d/13169.feature b/changelog.d/13169.feature new file mode 100644 index 000000000000..068d158ed5d8 --- /dev/null +++ b/changelog.d/13169.feature @@ -0,0 +1 @@ +Add per-room rate limiting for room joins. For each room, Synapse now monitors the rate of join events in that room, and throttle additional joins if that rate grows too large. From dcb16831e87e519a1b37e904a0393a29bc1b8dcb Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 8 Jul 2022 17:23:41 +0100 Subject: [PATCH 14/23] Move comment translating between bucket terminology --- synapse/api/ratelimiting.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/api/ratelimiting.py b/synapse/api/ratelimiting.py index 92c2f3b42a78..930e96369d92 100644 --- a/synapse/api/ratelimiting.py +++ b/synapse/api/ratelimiting.py @@ -27,6 +27,9 @@ class Ratelimiter: """ Ratelimit actions marked by arbitrary keys. + (Note that the source code speaks of "actions" and "burst_count" rather than + "tokens" and a "bucket_size".) + This is a "leaky bucket as a meter". For each key to be tracked there is a bucket containing some number 0 <= T <= `burst_count` of tokens corresponding to previously permitted requests for that key. Each bucket starts empty, and gradually leaks @@ -51,9 +54,6 @@ class Ratelimiter: since this time point, and use that to decide if we should accept or reject the request. - Note that the source code speaks of "actions" and "burst_count" rather than "tokens" - and a "bucket_size". - Args: clock: A homeserver clock, for retrieving the current time rate_hz: The long term number of actions that can be performed in a second. From 4da8f29ff686e8f3405c5e3df59558f45ab0b6b0 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 8 Jul 2022 17:24:18 +0100 Subject: [PATCH 15/23] Tweak phrasing; note the version adding the option --- docs/usage/configuration/config_documentation.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index ed874cd8531d..ad0009f1f226 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -1382,7 +1382,7 @@ rc_joins: --- ### `rc_joins_per_room` -This option allows for ratelimiting joins to a room based on the number of recent +This option allows admins to ratelimit joins to a room based on the number of recent joins (local or remote) to that room. It is intended to mitigate mass-join spam waves which target multiple homeservers. @@ -1395,6 +1395,9 @@ rc_joins_per_room: per_second: 1 burst_count: 10 ``` + +_Added in Synapse 1.63.0._ + --- ### `rc_3pid_validation` From 9d9253109bf41607ae742beec3b278762a68f2c5 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 8 Jul 2022 17:28:51 +0100 Subject: [PATCH 16/23] Announce new config option in the changelog --- changelog.d/13169.config | 1 + changelog.d/13169.feature | 2 +- pyproject.toml | 5 +++++ 3 files changed, 7 insertions(+), 1 deletion(-) create mode 100644 changelog.d/13169.config diff --git a/changelog.d/13169.config b/changelog.d/13169.config new file mode 100644 index 000000000000..560592d1c812 --- /dev/null +++ b/changelog.d/13169.config @@ -0,0 +1 @@ +[`rc_joins_per_room`](https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#rc_joins_per_room): parameters for the per-room join rate limit. \ No newline at end of file diff --git a/changelog.d/13169.feature b/changelog.d/13169.feature index 068d158ed5d8..22a6aa140da7 100644 --- a/changelog.d/13169.feature +++ b/changelog.d/13169.feature @@ -1 +1 @@ -Add per-room rate limiting for room joins. For each room, Synapse now monitors the rate of join events in that room, and throttle additional joins if that rate grows too large. +Add per-room rate limiting for room joins. For each room, Synapse now monitors the rate of join events in that room and throttles additional joins if that rate grows too large. diff --git a/pyproject.toml b/pyproject.toml index 3a56c42c0b1f..475973deac4f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,11 @@ name = "Features" showcontent = true + [[tool.towncrier.type]] + directory = "config" + name = "Config options" + showcontent = true + [[tool.towncrier.type]] directory = "bugfix" name = "Bugfixes" From e16294ef3b6d51186c8329e603660542423fbb93 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 8 Jul 2022 17:31:59 +0100 Subject: [PATCH 17/23] Be more explicit about the default values --- docs/usage/configuration/config_documentation.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index ad0009f1f226..a7962dc03942 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -1386,10 +1386,10 @@ This option allows admins to ratelimit joins to a room based on the number of re joins (local or remote) to that room. It is intended to mitigate mass-join spam waves which target multiple homeservers. -Sensible values for this option are provided by default; most server admins -won't need to adjust this setting. +By default, one join is permitted to a room every second, with an accumulating +buffer of up to ten instantaneous joins. -Example configuration: +Example configuration (default values): ```yaml rc_joins_per_room: per_second: 1 From 81eb4ab86af6bfa9f67946b66e91b9964cb19c8a Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 8 Jul 2022 18:06:19 +0100 Subject: [PATCH 18/23] Update rate limiter in the event persister logic Simpler, cleaner, faster, stronger. --- synapse/federation/federation_server.py | 2 +- synapse/handlers/message.py | 13 +++++++++++++ synapse/handlers/room_member.py | 7 +++---- synapse/replication/http/send_event.py | 12 ------------ synapse/server.py | 9 --------- 5 files changed, 17 insertions(+), 26 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 41c226792549..73a6de048a0e 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -667,7 +667,7 @@ async def on_send_join_request( await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type] requester=None, key=room_id, - update=self.hs.persists_events_for_room(room_id), + update=False, ) event, context = await self._on_send_membership_event( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 189f52fe5a7a..acef60075ef7 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -461,6 +461,7 @@ def __init__(self, hs: "HomeServer"): ) self._events_shard_config = self.config.worker.events_shard_config self._instance_name = hs.get_instance_name() + self._notifier = hs.get_notifier() self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state @@ -1511,6 +1512,18 @@ async def persist_and_notify_client_event( original_event and event.sender != original_event.sender ) + if event.type == EventTypes.Member and event.membership == Membership.JOIN: + ( + current_membership, + _, + ) = await self.store.get_local_current_membership_for_user_in_room( + event.state_key, event.room_id + ) + if current_membership != Membership.JOIN: + self._notifier.notify_user_joined_room( + event.event_id, event.room_id + ) + await self.request_ratelimiter.ratelimit( requester, is_admin_redaction=is_admin_redaction ) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 319c7a14e7d5..3ff40c32648f 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -105,6 +105,7 @@ def __init__(self, hs: "HomeServer"): ) # TODO: find a better place to keep this Ratelimiter. # It needs to be + # - written to by event persistence code # - written to by something which can snoop on replication streams # - read by the RoomMemberHandler to rate limit joins from local users # - read by the FederationServer to rate limit make_joins and send_joins from @@ -404,9 +405,7 @@ async def _local_membership_update( if newly_joined and ratelimit: await self._join_rate_limiter_local.ratelimit(requester) await self._join_rate_per_room_limiter.ratelimit( - requester, - key=room_id, - update=self.hs.persists_events_for_room(room_id), + requester, key=room_id, update=False ) result_event = await self.event_creation_handler.handle_new_client_event( @@ -859,7 +858,7 @@ async def update_membership_locked( await self._join_rate_per_room_limiter.ratelimit( requester, key=room_id, - update=self.hs.persists_events_for_room(room_id), + update=False, ) inviter = await self._get_inviter(target.to_string(), room_id) diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 6af013cd0a5e..c2b2588ea548 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -17,7 +17,6 @@ from twisted.web.server import Request -from synapse.api.constants import EventTypes, Membership from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import EventBase, make_event_from_dict from synapse.events.snapshot import EventContext @@ -73,7 +72,6 @@ def __init__(self, hs: "HomeServer"): self.store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() self.clock = hs.get_clock() - self._notifier = hs.get_notifier() @staticmethod async def _serialize_payload( # type: ignore[override] @@ -140,16 +138,6 @@ async def _handle_request( # type: ignore[override] "Got event to send with ID: %s into room: %s", event.event_id, event.room_id ) - if event.type == EventTypes.Member and event.membership == Membership.JOIN: - ( - current_membership, - _, - ) = await self.store.get_local_current_membership_for_user_in_room( - event.state_key, event.room_id - ) - if current_membership != Membership.JOIN: - self._notifier.notify_user_joined_room(event.event_id, event.room_id) - event = await self.event_creation_handler.persist_and_notify_client_event( requester, event, context, ratelimit=ratelimit, extra_users=extra_users ) diff --git a/synapse/server.py b/synapse/server.py index 4487c92fae7c..181984a1a491 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -827,12 +827,3 @@ def get_request_ratelimiter(self) -> RequestRatelimiter: self.config.ratelimiting.rc_message, self.config.ratelimiting.rc_admin_redaction, ) - - def persists_events_for_room(self, room_id: str) -> bool: - """Is this worker responsible for persisting events in the given room? - - Or does it ask another worker to do that for us?""" - return ( - self.get_instance_name() - == self.config.worker.events_shard_config.get_instance(room_id) - ) From 8377172c4cc7d9764a4dca98e96660261613f8ee Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 11 Jul 2022 15:44:51 +0100 Subject: [PATCH 19/23] Use `add_hashes_and_signatures_from_other_server` - rename the method to distinguish it from `add_hashes_and_signatures` - use it in a few other places where it makes sense --- tests/federation/test_federation_client.py | 6 +++--- tests/federation/test_federation_server.py | 19 ++++++------------- tests/handlers/test_federation.py | 2 +- tests/handlers/test_federation_event.py | 6 +++--- tests/test_visibility.py | 2 +- tests/unittest.py | 2 +- 6 files changed, 15 insertions(+), 22 deletions(-) diff --git a/tests/federation/test_federation_client.py b/tests/federation/test_federation_client.py index 268a48d7ba5f..d2bda0719872 100644 --- a/tests/federation/test_federation_client.py +++ b/tests/federation/test_federation_client.py @@ -45,7 +45,7 @@ def test_get_room_state(self): # mock up some events to use in the response. # In real life, these would have things in `prev_events` and `auth_events`, but that's # a bit annoying to mock up, and the code under test doesn't care, so we don't bother. - create_event_dict = self.add_hashes_and_signatures( + create_event_dict = self.add_hashes_and_signatures_from_other_server( { "room_id": test_room_id, "type": "m.room.create", @@ -57,7 +57,7 @@ def test_get_room_state(self): "origin_server_ts": 500, } ) - member_event_dict = self.add_hashes_and_signatures( + member_event_dict = self.add_hashes_and_signatures_from_other_server( { "room_id": test_room_id, "type": "m.room.member", @@ -69,7 +69,7 @@ def test_get_room_state(self): "origin_server_ts": 600, } ) - pl_event_dict = self.add_hashes_and_signatures( + pl_event_dict = self.add_hashes_and_signatures_from_other_server( { "room_id": test_room_id, "type": "m.room.power_levels", diff --git a/tests/federation/test_federation_server.py b/tests/federation/test_federation_server.py index afe67621fd95..7be50dea6713 100644 --- a/tests/federation/test_federation_server.py +++ b/tests/federation/test_federation_server.py @@ -21,7 +21,6 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.config.server import DEFAULT_ROOM_VERSION -from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events import make_event_from_dict from synapse.federation.federation_server import server_matches_acl_event from synapse.rest import admin @@ -164,11 +163,9 @@ def test_send_join(self): join_result = self._make_join(joining_user) join_event_dict = join_result["event"] - add_hashes_and_signatures( - KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION], + self.add_hashes_and_signatures_from_other_server( join_event_dict, - signature_name=self.OTHER_SERVER_NAME, - signing_key=self.OTHER_SERVER_SIGNATURE_KEY, + KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION], ) channel = self.make_signed_federation_request( "PUT", @@ -221,11 +218,9 @@ def test_send_join_partial_state(self): join_result = self._make_join(joining_user) join_event_dict = join_result["event"] - add_hashes_and_signatures( - KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION], + self.add_hashes_and_signatures_from_other_server( join_event_dict, - signature_name=self.OTHER_SERVER_NAME, - signing_key=self.OTHER_SERVER_SIGNATURE_KEY, + KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION], ) channel = self.make_signed_federation_request( "PUT", @@ -296,11 +291,9 @@ def test_send_join_contributes_to_room_join_rate_limit_and_is_limited(self) -> N joining_user = f"@misspiggy{i}:{self.OTHER_SERVER_NAME}" join_result = self._make_join(joining_user) join_event_dict = join_result["event"] - add_hashes_and_signatures( - KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION], + self.add_hashes_and_signatures_from_other_server( join_event_dict, - signature_name=self.OTHER_SERVER_NAME, - signing_key=self.OTHER_SERVER_SIGNATURE_KEY, + KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION], ) join_event_dicts.append(join_event_dict) diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 9b9c11fab73d..712933f9cac8 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -256,7 +256,7 @@ def test_backfill_with_many_backward_extremities(self) -> None: ] for _ in range(0, 8): event = make_event_from_dict( - self.add_hashes_and_signatures( + self.add_hashes_and_signatures_from_other_server( { "origin_server_ts": 1, "type": "m.room.message", diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py index 4b1a8f04dbde..51c8dd649822 100644 --- a/tests/handlers/test_federation_event.py +++ b/tests/handlers/test_federation_event.py @@ -104,7 +104,7 @@ def _test_process_pulled_event_with_missing_state( # mock up a load of state events which we are missing state_events = [ make_event_from_dict( - self.add_hashes_and_signatures( + self.add_hashes_and_signatures_from_other_server( { "type": "test_state_type", "state_key": f"state_{i}", @@ -131,7 +131,7 @@ def _test_process_pulled_event_with_missing_state( # Depending on the test, we either persist this upfront (as an outlier), # or let the server request it. prev_event = make_event_from_dict( - self.add_hashes_and_signatures( + self.add_hashes_and_signatures_from_other_server( { "type": "test_regular_type", "room_id": room_id, @@ -165,7 +165,7 @@ async def get_event(destination: str, event_id: str, timeout=None): # mock up a regular event to pass into _process_pulled_event pulled_event = make_event_from_dict( - self.add_hashes_and_signatures( + self.add_hashes_and_signatures_from_other_server( { "type": "test_regular_type", "room_id": room_id, diff --git a/tests/test_visibility.py b/tests/test_visibility.py index f338af6c36d0..c385b2f8d479 100644 --- a/tests/test_visibility.py +++ b/tests/test_visibility.py @@ -272,7 +272,7 @@ def test_out_of_band_invite_rejection(self): "state_key": "@user:test", "content": {"membership": "invite"}, } - self.add_hashes_and_signatures(invite_pdu) + self.add_hashes_and_signatures_from_other_server(invite_pdu) invite_event_id = make_event_from_dict(invite_pdu, RoomVersions.V9).event_id self.get_success( diff --git a/tests/unittest.py b/tests/unittest.py index b6c0b83a0cbe..a58bf814a81a 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -838,7 +838,7 @@ def make_signed_federation_request( client_ip=client_ip, ) - def add_hashes_and_signatures( + def add_hashes_and_signatures_from_other_server( self, event_dict: JsonDict, room_version: RoomVersion = KNOWN_ROOM_VERSIONS[DEFAULT_ROOM_VERSION], From 47acf465efe2c60d1cc8fb5fcc1be5cccfc0a7d6 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 11 Jul 2022 20:58:12 +0100 Subject: [PATCH 20/23] Don't require `ratelimit` to notify for joins --- synapse/handlers/message.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index acef60075ef7..59f99ae8b515 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1512,22 +1512,20 @@ async def persist_and_notify_client_event( original_event and event.sender != original_event.sender ) - if event.type == EventTypes.Member and event.membership == Membership.JOIN: - ( - current_membership, - _, - ) = await self.store.get_local_current_membership_for_user_in_room( - event.state_key, event.room_id - ) - if current_membership != Membership.JOIN: - self._notifier.notify_user_joined_room( - event.event_id, event.room_id - ) - await self.request_ratelimiter.ratelimit( requester, is_admin_redaction=is_admin_redaction ) + if event.type == EventTypes.Member and event.membership == Membership.JOIN: + ( + current_membership, + _, + ) = await self.store.get_local_current_membership_for_user_in_room( + event.state_key, event.room_id + ) + if current_membership != Membership.JOIN: + self._notifier.notify_user_joined_room(event.event_id, event.room_id) + await self._maybe_kick_guest_users(event, context) if event.type == EventTypes.CanonicalAlias: From fda6252233db6fe78e6774ff4f43857ab90fde16 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 11 Jul 2022 21:02:57 +0100 Subject: [PATCH 21/23] Also notify of join events via another path --- synapse/handlers/federation_event.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index b7c54e642fc9..7245d3697850 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -2063,6 +2063,10 @@ async def _notify_persisted_event( event, event_pos, max_stream_token, extra_users=extra_users ) + if event.type == EventTypes.Member and event.membership == Membership.JOIN: + # TODO retrieve the previous state, and exclude join -> join transitions + self._notifier.notify_user_joined_room(event.event_id, event.room_id) + def _sanity_check_event(self, ev: EventBase) -> None: """ Do some early sanity checks of a received event From 5cf6700c2e91a9dddd00b7f0186885e4138b6911 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 11 Jul 2022 21:09:52 +0100 Subject: [PATCH 22/23] Monstrous test update of doom --- tests/handlers/test_room_member.py | 135 ++++++++++++++++++++++++----- 1 file changed, 113 insertions(+), 22 deletions(-) diff --git a/tests/handlers/test_room_member.py b/tests/handlers/test_room_member.py index 5c927f0c232e..254e7e4b80ac 100644 --- a/tests/handlers/test_room_member.py +++ b/tests/handlers/test_room_member.py @@ -1,13 +1,16 @@ from http import HTTPStatus -from unittest.mock import patch +from unittest.mock import Mock, patch from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin import synapse.rest.client.login import synapse.rest.client.room -from synapse.api.constants import Membership +from synapse.api.constants import EventTypes, Membership from synapse.api.errors import LimitExceededError +from synapse.crypto.event_signing import add_hashes_and_signatures +from synapse.events import FrozenEventV3 +from synapse.federation.federation_client import SendJoinResult from synapse.server import HomeServer from synapse.types import UserID, create_requester from synapse.util import Clock @@ -15,10 +18,10 @@ from tests.replication._base import RedisMultiWorkerStreamTestCase from tests.server import make_request from tests.test_utils import make_awaitable -from tests.unittest import HomeserverTestCase, override_config +from tests.unittest import FederatingHomeserverTestCase, override_config -class TestJoinsLimitedByPerRoomRateLimiter(HomeserverTestCase): +class TestJoinsLimitedByPerRoomRateLimiter(FederatingHomeserverTestCase): servlets = [ synapse.rest.admin.register_servlets, synapse.rest.client.login.register_servlets, @@ -36,10 +39,11 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.chris = self.register_user("chris", "pass") self.chris_token = self.login("chris", "pass") - # Create a room on this homeserver. - # Note that this counts as a + # Create a room on this homeserver. Note that this counts as a join: it + # contributes to the rate limter's count of actions self.room_id = self.helper.create_room_as(self.alice, tok=self.alice_token) - self.intially_unjoined_room_id = "!example:otherhs" + + self.intially_unjoined_room_id = f"!example:{self.OTHER_SERVER_NAME}" @override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 2}}) def test_local_user_local_joins_contribute_to_limit_and_are_limited(self) -> None: @@ -92,12 +96,97 @@ def test_local_user_profile_edits_dont_contribute_to_limit(self) -> None: @override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 1}}) def test_remote_joins_contribute_to_rate_limit(self) -> None: - # Join once, to fill the rate limiter bucket. Patch out the `_remote_join" call - # because there is no other homeserver for us to join via. + # Join once, to fill the rate limiter bucket. + # + # To do this we have to mock the responses from the remote homeserver. + # We also patch out a bunch of event checks on our end. All we're really + # trying to check here is that remote joins will bump the rate limter when + # they are persisted. + create_event_source = { + "auth_events": [], + "content": { + "creator": f"@creator:{self.OTHER_SERVER_NAME}", + "room_version": self.hs.config.server.default_room_version.identifier, + }, + "depth": 0, + "origin_server_ts": 0, + "prev_events": [], + "room_id": self.intially_unjoined_room_id, + "sender": f"@creator:{self.OTHER_SERVER_NAME}", + "state_key": "", + "type": EventTypes.Create, + } + self.add_hashes_and_signatures_from_other_server( + create_event_source, + self.hs.config.server.default_room_version, + ) + create_event = FrozenEventV3( + create_event_source, + self.hs.config.server.default_room_version, + {}, + None, + ) + + join_event_source = { + "auth_events": [create_event.event_id], + "content": {"membership": "join"}, + "depth": 1, + "origin_server_ts": 100, + "prev_events": [create_event.event_id], + "sender": self.bob, + "state_key": self.bob, + "room_id": self.intially_unjoined_room_id, + "type": EventTypes.Member, + } + add_hashes_and_signatures( + self.hs.config.server.default_room_version, + join_event_source, + self.hs.hostname, + self.hs.signing_key, + ) + join_event = FrozenEventV3( + join_event_source, + self.hs.config.server.default_room_version, + {}, + None, + ) + + mock_make_membership_event = Mock( + return_value=make_awaitable( + ( + self.OTHER_SERVER_NAME, + join_event, + self.hs.config.server.default_room_version, + ) + ) + ) + mock_send_join = Mock( + return_value=make_awaitable( + SendJoinResult( + join_event, + self.OTHER_SERVER_NAME, + state=[create_event], + auth_chain=[create_event], + partial_state=False, + servers_in_room=[], + ) + ) + ) + with patch.object( - self.handler, - "_remote_join", - return_value=make_awaitable(("$dummy_event", 1000)), + self.handler.federation_handler.federation_client, + "make_membership_event", + mock_make_membership_event, + ), patch.object( + self.handler.federation_handler.federation_client, + "send_join", + mock_send_join, + ), patch( + "synapse.event_auth._is_membership_change_allowed", + return_value=None, + ), patch( + "synapse.handlers.federation_event.check_state_dependent_auth_rules", + return_value=None, ): self.get_success( self.handler.update_membership( @@ -105,19 +194,21 @@ def test_remote_joins_contribute_to_rate_limit(self) -> None: target=UserID.from_string(self.bob), room_id=self.intially_unjoined_room_id, action=Membership.JOIN, + remote_room_hosts=[self.OTHER_SERVER_NAME], ) ) - # Try to join as Chris. Should get denied. - self.get_failure( - self.handler.update_membership( - requester=create_requester(self.chris), - target=UserID.from_string(self.chris), - room_id=self.intially_unjoined_room_id, - action=Membership.JOIN, - ), - LimitExceededError, - ) + # Try to join as Chris. Should get denied. + self.get_failure( + self.handler.update_membership( + requester=create_requester(self.chris), + target=UserID.from_string(self.chris), + room_id=self.intially_unjoined_room_id, + action=Membership.JOIN, + remote_room_hosts=[self.OTHER_SERVER_NAME], + ), + LimitExceededError, + ) # TODO: test that remote joins to a room are rate limited. # Could do this by setting the burst count to 1, then: From 80e2e0b4f2af1a547574805bf79ee254dc74e83c Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 11 Jul 2022 21:12:28 +0100 Subject: [PATCH 23/23] Don't use newsfiles for config changes per discussion today --- changelog.d/13169.config | 1 - pyproject.toml | 5 ----- 2 files changed, 6 deletions(-) delete mode 100644 changelog.d/13169.config diff --git a/changelog.d/13169.config b/changelog.d/13169.config deleted file mode 100644 index 560592d1c812..000000000000 --- a/changelog.d/13169.config +++ /dev/null @@ -1 +0,0 @@ -[`rc_joins_per_room`](https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#rc_joins_per_room): parameters for the per-room join rate limit. \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 475973deac4f..3a56c42c0b1f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,11 +9,6 @@ name = "Features" showcontent = true - [[tool.towncrier.type]] - directory = "config" - name = "Config options" - showcontent = true - [[tool.towncrier.type]] directory = "bugfix" name = "Bugfixes"