Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Rate limiti joins per-room, accounting for joins created by other servers #13169

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13169.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add per-room rate limiting for room joins. For each room, Synapse now monitors the rate of join events in that room and throttles additional joins if that rate grows too large.
4 changes: 4 additions & 0 deletions docker/complement/conf/workers-shared-extra.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ rc_joins:
per_second: 9999
burst_count: 9999

rc_joins_per_room:
per_second: 9999
burst_count: 9999

rc_3pid_validation:
per_second: 1000
burst_count: 1000
Expand Down
12 changes: 12 additions & 0 deletions docs/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ process, for example:
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
```

# Upgrading to v1.63.0

## Changes to the event replication streams

Synapse now includes a flag indicating if an event is an outlier when
replicating it to other workers. This is a forwards- and backwards-incompatible
change: v1.62 and workers cannot process events replicated by v1.63 workers, and
vice versa.

Once all workers are upgraded to v1.63 (or downgraded to v1.62), event
replication will resume as normal.

# Upgrading to v1.62.0

## New signatures for spam checker callbacks
Expand Down
19 changes: 19 additions & 0 deletions docs/usage/configuration/config_documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,25 @@ rc_joins:
per_second: 0.03
burst_count: 12
```
---
### `rc_joins_per_room`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind adding info here about what release this config option will be added in?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can do.


This option allows admins to ratelimit joins to a room based on the number of recent
joins (local or remote) to that room. It is intended to mitigate mass-join spam
waves which target multiple homeservers.

By default, one join is permitted to a room every second, with an accumulating
buffer of up to ten instantaneous joins.

Example configuration (default values):
```yaml
rc_joins_per_room:
per_second: 1
burst_count: 10
```

_Added in Synapse 1.63.0._

---
### `rc_3pid_validation`

Expand Down
100 changes: 88 additions & 12 deletions synapse/api/ratelimiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,33 @@ class Ratelimiter:
"""
Ratelimit actions marked by arbitrary keys.

(Note that the source code speaks of "actions" and "burst_count" rather than
"tokens" and a "bucket_size".)

This is a "leaky bucket as a meter". For each key to be tracked there is a bucket
containing some number 0 <= T <= `burst_count` of tokens corresponding to previously
permitted requests for that key. Each bucket starts empty, and gradually leaks
tokens at a rate of `rate_hz`.

Upon an incoming request, we must determine:
- the key that this request falls under (which bucket to inspect), and
- the cost C of this request in tokens.
Then, if there is room in the bucket for C tokens (T + C <= `burst_count`),
the request is permitted and `cost` tokens are added to the bucket.
Otherwise the request is denied, and the bucket continues to hold T tokens.

This means that the limiter enforces an average request frequency of `rate_hz`,
while accumulating a buffer of up to `burst_count` requests which can be consumed
instantaneously.

The tricky bit is the leaking. We do not want to have a periodic process which
leaks every bucket! Instead, we track
- the time point when the bucket was last completely empty, and
- how many tokens have added to the bucket permitted since then.
Then for each incoming request, we can calculate how many tokens have leaked
since this time point, and use that to decide if we should accept or reject the
request.

Args:
clock: A homeserver clock, for retrieving the current time
rate_hz: The long term number of actions that can be performed in a second.
Expand All @@ -41,14 +68,36 @@ def __init__(
self.burst_count = burst_count
self.store = store

# A ordered dictionary keeping track of actions, when they were last
# performed and how often. Each entry is a mapping from a key of arbitrary type
# to a tuple representing:
# * How many times an action has occurred since a point in time
# * The point in time
# * The rate_hz of this particular entry. This can vary per request
# An ordered dictionary representing the token buckets tracked by this rate
# limiter. Each entry maps a key of arbitrary type to a tuple representing:
# * The number of tokens currently in the bucket,
# * The time point when the bucket was last completely empty, and
# * The rate_hz (leak rate) of this particular bucket.
self.actions: OrderedDict[Hashable, Tuple[float, float, float]] = OrderedDict()

def _get_key(
self, requester: Optional[Requester], key: Optional[Hashable]
) -> Hashable:
"""Use the requester's MXID as a fallback key if no key is provided.

Pulled out so that `can_do_action` and `record_action` are consistent.
"""
if key is None:
if not requester:
raise ValueError("Must supply at least one of `requester` or `key`")

key = requester.user.to_string()
return key

def _get_action_counts(
self, key: Hashable, time_now_s: float
) -> Tuple[float, float, float]:
"""Retrieve the action counts, with a fallback representing an empty bucket.

Pulled out so that `can_do_action` and `record_action` are consistent.
"""
return self.actions.get(key, (0.0, time_now_s, 0.0))

async def can_do_action(
self,
requester: Optional[Requester],
Expand Down Expand Up @@ -88,11 +137,7 @@ async def can_do_action(
* The reactor timestamp for when the action can be performed next.
-1 if rate_hz is less than or equal to zero
"""
if key is None:
if not requester:
raise ValueError("Must supply at least one of `requester` or `key`")

key = requester.user.to_string()
key = self._get_key(requester, key)

if requester:
# Disable rate limiting of users belonging to any AS that is configured
Expand Down Expand Up @@ -121,7 +166,7 @@ async def can_do_action(
self._prune_message_counts(time_now_s)

# Check if there is an existing count entry for this key
action_count, time_start, _ = self.actions.get(key, (0.0, time_now_s, 0.0))
action_count, time_start, _ = self._get_action_counts(key, time_now_s)

# Check whether performing another action is allowed
time_delta = time_now_s - time_start
Expand Down Expand Up @@ -164,6 +209,37 @@ async def can_do_action(

return allowed, time_allowed

def record_action(
self,
requester: Optional[Requester],
key: Optional[Hashable] = None,
n_actions: int = 1,
_time_now_s: Optional[float] = None,
) -> None:
"""Record that an action(s) took place, even if they violate the rate limit.

This is useful for tracking the frequency of events that happen across
federation which we still want to impose local rate limits on. For instance, if
we are alice.com monitoring a particular room, we cannot prevent bob.com
from joining users to that room. However, we can track the number of recent
joins in the room and refuse to serve new joins ourselves if there have been too
many in the room across both homeservers.

Args:
requester: The requester that is doing the action, if any.
key: An arbitrary key used to classify an action. Defaults to the
requester's user ID.
n_actions: The number of times the user wants to do this action. If the user
cannot do all of the actions, the user's action count is not incremented
at all.
_time_now_s: The current time. Optional, defaults to the current time according
to self.clock. Only used by tests.
"""
key = self._get_key(requester, key)
time_now_s = _time_now_s if _time_now_s is not None else self.clock.time()
action_count, time_start, rate_hz = self._get_action_counts(key, time_now_s)
self.actions[key] = (action_count + n_actions, time_start, rate_hz)

def _prune_message_counts(self, time_now_s: float) -> None:
"""Remove message count entries that have not exceeded their defined
rate_hz limit
Expand Down
7 changes: 7 additions & 0 deletions synapse/config/ratelimiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
defaults={"per_second": 0.01, "burst_count": 10},
)

# Track the rate of joins to a given room. If there are too many, temporarily
# prevent local joins and remote joins via this server.
self.rc_joins_per_room = RateLimitConfig(
config.get("rc_joins_per_room", {}),
defaults={"per_second": 1, "burst_count": 10},
)

# Ratelimit cross-user key requests:
# * For local requests this is keyed by the sending device.
# * For requests received over federation this is keyed by the origin.
Expand Down
16 changes: 16 additions & 0 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def __init__(self, hs: "HomeServer"):
self._federation_event_handler = hs.get_federation_event_handler()
self.state = hs.get_state_handler()
self._event_auth_handler = hs.get_event_auth_handler()
self._room_member_handler = hs.get_room_member_handler()

self._state_storage_controller = hs.get_storage_controllers().state

Expand Down Expand Up @@ -620,6 +621,15 @@ async def on_make_join_request(
)
raise IncompatibleRoomVersionError(room_version=room_version)

# Refuse the request if that room has seen too many joins recently.
# This is in addition to the HS-level rate limiting applied by
# BaseFederationServlet.
# type-ignore: mypy doesn't seem able to deduce the type of the limiter(!?)
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
requester=None,
key=room_id,
update=False,
)
pdu = await self.handler.on_make_join_request(origin, room_id, user_id)
return {"event": pdu.get_templated_pdu_json(), "room_version": room_version}

Expand Down Expand Up @@ -654,6 +664,12 @@ async def on_send_join_request(
room_id: str,
caller_supports_partial_state: bool = False,
) -> Dict[str, Any]:
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
requester=None,
key=room_id,
update=False,
)

event, context = await self._on_send_membership_event(
origin, content, Membership.JOIN, room_id
)
Expand Down
4 changes: 4 additions & 0 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -2063,6 +2063,10 @@ async def _notify_persisted_event(
event, event_pos, max_stream_token, extra_users=extra_users
)

if event.type == EventTypes.Member and event.membership == Membership.JOIN:
# TODO retrieve the previous state, and exclude join -> join transitions
self._notifier.notify_user_joined_room(event.event_id, event.room_id)

def _sanity_check_event(self, ev: EventBase) -> None:
"""
Do some early sanity checks of a received event
Expand Down
11 changes: 11 additions & 0 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ def __init__(self, hs: "HomeServer"):
)
self._events_shard_config = self.config.worker.events_shard_config
self._instance_name = hs.get_instance_name()
self._notifier = hs.get_notifier()

self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state

Expand Down Expand Up @@ -1515,6 +1516,16 @@ async def persist_and_notify_client_event(
requester, is_admin_redaction=is_admin_redaction
)

if event.type == EventTypes.Member and event.membership == Membership.JOIN:
(
current_membership,
_,
) = await self.store.get_local_current_membership_for_user_in_room(
event.state_key, event.room_id
)
if current_membership != Membership.JOIN:
self._notifier.notify_user_joined_room(event.event_id, event.room_id)

await self._maybe_kick_guest_users(event, context)

if event.type == EventTypes.CanonicalAlias:
Expand Down
37 changes: 37 additions & 0 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,29 @@ def __init__(self, hs: "HomeServer"):
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
burst_count=hs.config.ratelimiting.rc_joins_local.burst_count,
)
# Tracks joins from local users to rooms this server isn't a member of.
# I.e. joins this server makes by requesting /make_join /send_join from
# another server.
self._join_rate_limiter_remote = Ratelimiter(
store=self.store,
clock=self.clock,
rate_hz=hs.config.ratelimiting.rc_joins_remote.per_second,
burst_count=hs.config.ratelimiting.rc_joins_remote.burst_count,
)
# TODO: find a better place to keep this Ratelimiter.
# It needs to be
# - written to by event persistence code
# - written to by something which can snoop on replication streams
# - read by the RoomMemberHandler to rate limit joins from local users
# - read by the FederationServer to rate limit make_joins and send_joins from
# other homeservers
# I wonder if a homeserver-wide collection of rate limiters might be cleaner?
self._join_rate_per_room_limiter = Ratelimiter(
store=self.store,
clock=self.clock,
rate_hz=hs.config.ratelimiting.rc_joins_per_room.per_second,
burst_count=hs.config.ratelimiting.rc_joins_per_room.burst_count,
)

self._invites_per_room_limiter = Ratelimiter(
store=self.store,
Expand All @@ -122,6 +139,18 @@ def __init__(self, hs: "HomeServer"):
)

self.request_ratelimiter = hs.get_request_ratelimiter()
hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room)

def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
"""Notify the rate limiter that a room join has occurred.

Use this to inform the RoomMemberHandler about joins that have either
- taken place on another homeserver, or
- on another worker in this homeserver.
Joins actioned by this worker should use the usual `ratelimit` method, which
checks the limit and increments the counter in one go.
"""
self._join_rate_per_room_limiter.record_action(requester=None, key=room_id)

@abc.abstractmethod
async def _remote_join(
Expand Down Expand Up @@ -375,6 +404,9 @@ async def _local_membership_update(
# up blocking profile updates.
if newly_joined and ratelimit:
await self._join_rate_limiter_local.ratelimit(requester)
await self._join_rate_per_room_limiter.ratelimit(
requester, key=room_id, update=False
)

result_event = await self.event_creation_handler.handle_new_client_event(
requester,
Expand Down Expand Up @@ -823,6 +855,11 @@ async def update_membership_locked(
await self._join_rate_limiter_remote.ratelimit(
requester,
)
await self._join_rate_per_room_limiter.ratelimit(
requester,
key=room_id,
update=False,
)

inviter = await self._get_inviter(target.to_string(), room_id)
if inviter and not self.hs.is_mine(inviter):
Expand Down
18 changes: 18 additions & 0 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ def __init__(self, hs: "HomeServer"):

# Called when there are new things to stream over replication
self.replication_callbacks: List[Callable[[], None]] = []
self._new_join_in_room_callbacks: List[Callable[[str, str], None]] = []

self._federation_client = hs.get_federation_http_client()

Expand Down Expand Up @@ -280,6 +281,19 @@ def add_replication_callback(self, cb: Callable[[], None]) -> None:
"""
self.replication_callbacks.append(cb)

def add_new_join_in_room_callback(self, cb: Callable[[str, str], None]) -> None:
"""Add a callback that will be called when a user joins a room.

This only fires on genuine membership changes, e.g. "invite" -> "join".
Membership transitions like "join" -> "join" (for e.g. displayname changes) do
not trigger the callback.

When called, the callback receives two arguments: the event ID and the room ID.
It should *not* return a Deferred - if it needs to do any asynchronous work, a
background thread should be started and wrapped with run_as_background_process.
"""
self._new_join_in_room_callbacks.append(cb)

async def on_new_room_event(
self,
event: EventBase,
Expand Down Expand Up @@ -723,6 +737,10 @@ def notify_replication(self) -> None:
for cb in self.replication_callbacks:
cb()

def notify_user_joined_room(self, event_id: str, room_id: str) -> None:
for cb in self._new_join_in_room_callbacks:
cb(event_id, room_id)

def notify_remote_server_up(self, server: str) -> None:
"""Notify any replication that a remote server has come back up"""
# We call federation_sender directly rather than registering as a
Expand Down
Loading