From f389358afded630b4061e7c4ee731e2e24791db8 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 29 Mar 2023 16:41:32 +0100 Subject: [PATCH 1/7] Add retry interval info to `FederationPullAttemptBackoffError` Signed-off-by: Sean Quah --- synapse/api/errors.py | 17 ++++++--- synapse/handlers/federation_event.py | 20 ++++++++--- .../databases/main/event_federation.py | 35 +++++++++++-------- tests/storage/test_event_federation.py | 13 ++++--- 4 files changed, 58 insertions(+), 27 deletions(-) diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 8c6822f3c6ea..fa9ed04842c6 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -19,7 +19,7 @@ import typing from enum import Enum from http import HTTPStatus -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, Iterable, List, Optional, Union from twisted.web import http @@ -682,18 +682,27 @@ class FederationPullAttemptBackoffError(RuntimeError): Attributes: event_id: The event_id which we are refusing to pull message: A custom error message that gives more context + retry_after_ms: The remaining backoff interval, in milliseconds """ - def __init__(self, event_ids: List[str], message: Optional[str]): - self.event_ids = event_ids + def __init__( + self, event_ids: Iterable[str], message: Optional[str], retry_after_ms: int + ): + event_ids = list(event_ids) if message: error_message = message else: - error_message = f"Not attempting to pull event_ids={self.event_ids} because we already tried to pull them recently (backing off)." + error_message = ( + f"Not attempting to pull event_ids={event_ids} because we already " + "tried to pull them recently (backing off)." + ) super().__init__(error_message) + self.event_ids = event_ids + self.retry_after_ms = retry_after_ms + class HttpResponseException(CodeMessageException): """ diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 648843cdbe9b..a5482a880948 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -140,6 +140,7 @@ class FederationEventHandler: """ def __init__(self, hs: "HomeServer"): + self._clock = hs.get_clock() self._store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() self._state_storage_controller = self._storage_controllers.state @@ -1053,13 +1054,22 @@ async def _compute_event_context_with_maybe_missing_prevs( # If we've already recently attempted to pull this missing event, don't # try it again so soon. Since we have to fetch all of the prev_events, we can # bail early here if we find any to ignore. - prevs_to_ignore = await self._store.get_event_ids_to_not_pull_from_backoff( - room_id, missing_prevs + prevs_with_pull_backoff = ( + await self._store.get_event_ids_to_not_pull_from_backoff( + room_id, missing_prevs + ) ) - if len(prevs_to_ignore) > 0: + if len(prevs_with_pull_backoff) > 0: raise FederationPullAttemptBackoffError( - event_ids=prevs_to_ignore, - message=f"While computing context for event={event_id}, not attempting to pull missing prev_event={prevs_to_ignore[0]} because we already tried to pull recently (backing off).", + event_ids=prevs_with_pull_backoff.keys(), + message=( + f"While computing context for event={event_id}, not attempting to " + f"pull missing prev_events={list(prevs_with_pull_backoff.keys())} " + "because we already tried to pull recently (backing off)." + ), + retry_after_ms=( + max(prevs_with_pull_backoff.values()) - self._clock.time_msec() + ), ) if not missing_prevs: diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index ff3edeb7160b..a19ba88bf8af 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1544,7 +1544,7 @@ async def get_event_ids_to_not_pull_from_backoff( self, room_id: str, event_ids: Collection[str], - ) -> List[str]: + ) -> Dict[str, int]: """ Filter down the events to ones that we've failed to pull before recently. Uses exponential backoff. @@ -1554,7 +1554,8 @@ async def get_event_ids_to_not_pull_from_backoff( event_ids: A list of events to filter down Returns: - List of event_ids that should not be attempted to be pulled + A dictionary of event_ids that should not be attempted to be pulled and the + next timestamp at which we may try pulling them again. """ event_failed_pull_attempts = await self.db_pool.simple_select_many_batch( table="event_failed_pull_attempts", @@ -1570,22 +1571,28 @@ async def get_event_ids_to_not_pull_from_backoff( ) current_time = self._clock.time_msec() - return [ - event_failed_pull_attempt["event_id"] - for event_failed_pull_attempt in event_failed_pull_attempts + + event_ids_with_backoff = {} + for event_failed_pull_attempt in event_failed_pull_attempts: + event_id = event_failed_pull_attempt["event_id"] # Exponential back-off (up to the upper bound) so we don't try to # pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc. - if current_time - < event_failed_pull_attempt["last_attempt_ts"] - + ( - 2 - ** min( - event_failed_pull_attempt["num_attempts"], - BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, + backoff_end_time = ( + event_failed_pull_attempt["last_attempt_ts"] + + ( + 2 + ** min( + event_failed_pull_attempt["num_attempts"], + BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, + ) ) + * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS ) - * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS - ] + + if current_time < backoff_end_time: # `backoff_end_time` is exclusive + event_ids_with_backoff[event_id] = backoff_end_time + + return event_ids_with_backoff async def get_missing_events( self, diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index 3e1984c15cf0..81e50bdd5523 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -1143,19 +1143,24 @@ def test_get_event_ids_to_not_pull_from_backoff(self) -> None: tok = self.login("alice", "test") room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) + failure_time = self.clock.time_msec() self.get_success( self.store.record_event_failed_pull_attempt( room_id, "$failed_event_id", "fake cause" ) ) - event_ids_to_backoff = self.get_success( + event_ids_with_backoff = self.get_success( self.store.get_event_ids_to_not_pull_from_backoff( room_id=room_id, event_ids=["$failed_event_id", "$normal_event_id"] ) ) - self.assertEqual(event_ids_to_backoff, ["$failed_event_id"]) + self.assertEqual( + event_ids_with_backoff, + # We expect a 2^1 hour backoff after a single failed attempt. + {"$failed_event_id": failure_time + 2 * 60 * 60 * 1000}, + ) def test_get_event_ids_to_not_pull_from_backoff_retry_after_backoff_duration( self, @@ -1179,14 +1184,14 @@ def test_get_event_ids_to_not_pull_from_backoff_retry_after_backoff_duration( # attempt (2^1 hours). self.reactor.advance(datetime.timedelta(hours=2).total_seconds()) - event_ids_to_backoff = self.get_success( + event_ids_with_backoff = self.get_success( self.store.get_event_ids_to_not_pull_from_backoff( room_id=room_id, event_ids=["$failed_event_id", "$normal_event_id"] ) ) # Since this function only returns events we should backoff from, time has # elapsed past the backoff range so there is no events to backoff from. - self.assertEqual(event_ids_to_backoff, []) + self.assertEqual(event_ids_with_backoff, {}) @attr.s(auto_attribs=True) From af0a3be4b09a20ed9cb58d89b7fdac7ec09d7893 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 29 Mar 2023 16:43:08 +0100 Subject: [PATCH 2/7] Fix comment placement --- synapse/handlers/federation.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 80156ef343aa..ff6cbc27257f 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1967,9 +1967,6 @@ async def _sync_partial_state_room( # This avoids a cascade of backoff for all events in the DAG downstream from # one event backoff upstream. except FederationError as e: - # TODO: We should `record_event_failed_pull_attempt` here, - # see https://github.com/matrix-org/synapse/issues/13700 - if attempt == len(destinations) - 1: # We have tried every remote server for this event. Give up. # TODO(faster_joins) giving up isn't the right thing to do @@ -1986,6 +1983,8 @@ async def _sync_partial_state_room( destination, e, ) + # TODO: We should `record_event_failed_pull_attempt` here, + # see https://github.com/matrix-org/synapse/issues/13700 raise # Try the next remote server. From 0a85a43ab44ee72e96b5f3d45fc4d19448c3598f Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 29 Mar 2023 16:43:18 +0100 Subject: [PATCH 3/7] Fix docstring --- synapse/handlers/federation_event.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index a5482a880948..982c8d3b2ff1 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -1039,8 +1039,8 @@ async def _compute_event_context_with_maybe_missing_prevs( Raises: FederationPullAttemptBackoffError if we are are deliberately not attempting - to pull the given event over federation because we've already done so - recently and are backing off. + to pull one of the given event's `prev_event`s over federation because + we've already done so recently and are backing off. FederationError if we fail to get the state from the remote server after any missing `prev_event`s. """ From 412b09e9ca8e0e204f9de730c80151dc43183402 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 29 Mar 2023 16:43:27 +0100 Subject: [PATCH 4/7] Fix spinloop during partial state sync when a prev event is in backoff Signed-off-by: Sean Quah --- synapse/handlers/federation.py | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ff6cbc27257f..70cf507fbe58 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1949,23 +1949,22 @@ async def _sync_partial_state_room( ) for event in events: for attempt in itertools.count(): + # We try a new destination on every iteration. try: - await self._federation_event_handler.update_state_for_partial_state_event( - destination, event - ) + while True: + try: + await self._federation_event_handler.update_state_for_partial_state_event( + destination, event + ) + break + except FederationPullAttemptBackoffError as e: + # We are in the backoff period for one of the event's + # prev_events. Wait it out and try again after. + logger.warning("%s; waiting for %d ms...", e, e.retry_after_ms) + await self.clock.sleep(e.retry_after_ms / 1000) + + # Success, no need to try the rest of the destinations. break - except FederationPullAttemptBackoffError as exc: - # Log a warning about why we failed to process the event (the error message - # for `FederationPullAttemptBackoffError` is pretty good) - logger.warning("_sync_partial_state_room: %s", exc) - # We do not record a failed pull attempt when we backoff fetching a missing - # `prev_event` because not being able to fetch the `prev_events` just means - # we won't be able to de-outlier the pulled event. But we can still use an - # `outlier` in the state/auth chain for another event. So we shouldn't stop - # a downstream event from trying to pull it. - # - # This avoids a cascade of backoff for all events in the DAG downstream from - # one event backoff upstream. except FederationError as e: if attempt == len(destinations) - 1: # We have tried every remote server for this event. Give up. From ba1d144a74db7bc5439e2ce8b17bd9d710ff4b80 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 29 Mar 2023 16:46:26 +0100 Subject: [PATCH 5/7] Add newsfile --- changelog.d/15351.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15351.bugfix diff --git a/changelog.d/15351.bugfix b/changelog.d/15351.bugfix new file mode 100644 index 000000000000..e68023c6716f --- /dev/null +++ b/changelog.d/15351.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in Synapse 1.70.0 where the background sync from a faster join could spin for hours when one of the events involved had been marked for backoff. From 0a9df6ff9920b41955ef50a01606cd89ee62393e Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 29 Mar 2023 16:58:17 +0100 Subject: [PATCH 6/7] fixup: run formatter --- synapse/handlers/federation.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 70cf507fbe58..65461a078723 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1960,7 +1960,9 @@ async def _sync_partial_state_room( except FederationPullAttemptBackoffError as e: # We are in the backoff period for one of the event's # prev_events. Wait it out and try again after. - logger.warning("%s; waiting for %d ms...", e, e.retry_after_ms) + logger.warning( + "%s; waiting for %d ms...", e, e.retry_after_ms + ) await self.clock.sleep(e.retry_after_ms / 1000) # Success, no need to try the rest of the destinations. From 45c56a889093b878ea141ce1dcc17f73bb2ab38b Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 29 Mar 2023 16:58:27 +0100 Subject: [PATCH 7/7] fixup: use StrCollection --- synapse/api/errors.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/api/errors.py b/synapse/api/errors.py index fa9ed04842c6..f2d6f9ab2d9e 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -19,7 +19,7 @@ import typing from enum import Enum from http import HTTPStatus -from typing import Any, Dict, Iterable, List, Optional, Union +from typing import Any, Dict, List, Optional, Union from twisted.web import http @@ -27,7 +27,7 @@ if typing.TYPE_CHECKING: from synapse.config.homeserver import HomeServerConfig - from synapse.types import JsonDict + from synapse.types import JsonDict, StrCollection logger = logging.getLogger(__name__) @@ -686,7 +686,7 @@ class FederationPullAttemptBackoffError(RuntimeError): """ def __init__( - self, event_ids: Iterable[str], message: Optional[str], retry_after_ms: int + self, event_ids: "StrCollection", message: Optional[str], retry_after_ms: int ): event_ids = list(event_ids)