From 4726c83bc8c15823f512c214620031bd1a6d30ed Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 29 Oct 2021 07:58:56 +0100 Subject: [PATCH 01/13] Add a linearizer on (appservice, stream) when handling ephemeral events. Excludes typing events which do not have their position tracked for performance reasons. --- synapse/handlers/appservice.py | 46 ++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 36c206dae6a0..e5670e352931 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -34,6 +34,7 @@ ) from synapse.storage.databases.main.directory import RoomAliasMapping from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID +from synapse.util.async_helpers import Linearizer from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -58,6 +59,8 @@ def __init__(self, hs: "HomeServer"): self.current_max = 0 self.is_processing = False + self._ephemeral_events_linearizer = Linearizer(name="ephemeral_events") + def notify_interested_services(self, max_token: RoomStreamToken) -> None: """Notifies (pushes) all application services interested in this event. @@ -248,26 +251,37 @@ async def _notify_interested_services_ephemeral( events = await self._handle_typing(service, new_token) if events: self.scheduler.submit_ephemeral_events_for_as(service, events) + continue - elif stream_key == "receipt_key": - events = await self._handle_receipts(service) - if events: - self.scheduler.submit_ephemeral_events_for_as(service, events) - - # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( - service, "read_receipt", new_token + # Since we read/update the stream position for this AS/stream + with ( + await self._ephemeral_events_linearizer.queue( + (service.id, stream_key) ) + ): + if stream_key == "receipt_key": + events = await self._handle_receipts(service) + if events: + self.scheduler.submit_ephemeral_events_for_as( + service, events + ) + + # Persist the latest handled stream token for this appservice + await self.store.set_type_stream_id_for_appservice( + service, "read_receipt", new_token + ) - elif stream_key == "presence_key": - events = await self._handle_presence(service, users) - if events: - self.scheduler.submit_ephemeral_events_for_as(service, events) + elif stream_key == "presence_key": + events = await self._handle_presence(service, users) + if events: + self.scheduler.submit_ephemeral_events_for_as( + service, events + ) - # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( - service, "presence", new_token - ) + # Persist the latest handled stream token for this appservice + await self.store.set_type_stream_id_for_appservice( + service, "presence", new_token + ) async def _handle_typing( self, service: ApplicationService, new_token: int From c25bc9920f46ca727eb7ad0596b1f97f562f2a03 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 29 Oct 2021 08:04:44 +0100 Subject: [PATCH 02/13] Add changelog entry. --- changelog.d/11207.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/11207.bugfix diff --git a/changelog.d/11207.bugfix b/changelog.d/11207.bugfix new file mode 100644 index 000000000000..ed20ba440e05 --- /dev/null +++ b/changelog.d/11207.bugfix @@ -0,0 +1 @@ +Linearize appservice ephemeral event handling, fixing serialization errors. Contributed by @Fizzadar at Beeper. From 7959d0e80de15dac2936a10ae1fd6a1dad33885a Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 29 Oct 2021 13:08:30 +0100 Subject: [PATCH 03/13] Update name of linearizer for as ephemeral events. Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> --- synapse/handlers/appservice.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index e5670e352931..7bd78bf24387 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -59,7 +59,7 @@ def __init__(self, hs: "HomeServer"): self.current_max = 0 self.is_processing = False - self._ephemeral_events_linearizer = Linearizer(name="ephemeral_events") + self._ephemeral_events_linearizer = Linearizer(name="appservice_ephemeral_events") def notify_interested_services(self, max_token: RoomStreamToken) -> None: """Notifies (pushes) all application services interested in this event. From 1d9ea275f524fe0f81b5dba818bd2b29335cce76 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 29 Oct 2021 13:09:29 +0100 Subject: [PATCH 04/13] Update changelog language. Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> --- changelog.d/11207.bugfix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/11207.bugfix b/changelog.d/11207.bugfix index ed20ba440e05..7e98d565a14f 100644 --- a/changelog.d/11207.bugfix +++ b/changelog.d/11207.bugfix @@ -1 +1 @@ -Linearize appservice ephemeral event handling, fixing serialization errors. Contributed by @Fizzadar at Beeper. +Fix a long-standing bug which could result in serialization errors and potentially duplicate transaction data when sending ephemeral events to application services. Contributed by @Fizzadar at Beeper. From f8f3c4ad1b5efb696a33287f650f5a4450e393dd Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 29 Oct 2021 13:27:12 +0100 Subject: [PATCH 05/13] Reject handling tokens lower than the currently stored position. --- synapse/handlers/appservice.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 7bd78bf24387..e24d5c5cd369 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -59,7 +59,9 @@ def __init__(self, hs: "HomeServer"): self.current_max = 0 self.is_processing = False - self._ephemeral_events_linearizer = Linearizer(name="appservice_ephemeral_events") + self._ephemeral_events_linearizer = Linearizer( + name="appservice_ephemeral_events" + ) def notify_interested_services(self, max_token: RoomStreamToken) -> None: """Notifies (pushes) all application services interested in this event. @@ -260,7 +262,7 @@ async def _notify_interested_services_ephemeral( ) ): if stream_key == "receipt_key": - events = await self._handle_receipts(service) + events = await self._handle_receipts(service, new_token) if events: self.scheduler.submit_ephemeral_events_for_as( service, events @@ -272,7 +274,7 @@ async def _notify_interested_services_ephemeral( ) elif stream_key == "presence_key": - events = await self._handle_presence(service, users) + events = await self._handle_presence(service, users, new_token) if events: self.scheduler.submit_ephemeral_events_for_as( service, events @@ -318,7 +320,9 @@ async def _handle_typing( ) return typing - async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]: + async def _handle_receipts( + self, service: ApplicationService, new_token: Optional[int] + ) -> List[JsonDict]: """ Return the latest read receipts that the given application service should receive. @@ -337,6 +341,9 @@ async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]: from_key = await self.store.get_type_stream_id_for_appservice( service, "read_receipt" ) + if new_token is not None and new_token <= from_key: + raise Exception("Rejecting token lower than stored: %s" % (new_token,)) + receipts_source = self.event_sources.sources.receipt receipts, _ = await receipts_source.get_new_events_as( service=service, from_key=from_key @@ -344,7 +351,10 @@ async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]: return receipts async def _handle_presence( - self, service: ApplicationService, users: Collection[Union[str, UserID]] + self, + service: ApplicationService, + users: Collection[Union[str, UserID]], + new_token: Optional[int], ) -> List[JsonDict]: """ Return the latest presence updates that the given application service should receive. @@ -367,6 +377,9 @@ async def _handle_presence( from_key = await self.store.get_type_stream_id_for_appservice( service, "presence" ) + if new_token is not None and new_token <= from_key: + raise Exception("Rejecting token lower than stored: %s" % (new_token,)) + for user in users: if isinstance(user, str): user = UserID.from_string(user) From 1f2b14c9e83d0f6252e15882db6b2f254b0c0f0b Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 29 Oct 2021 15:32:57 +0100 Subject: [PATCH 06/13] Add tests for appservice ephemeral notifications. --- tests/handlers/test_appservice.py | 33 +++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 43998020b2eb..093a46c2219a 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -40,6 +40,7 @@ def setUp(self): hs.get_application_service_scheduler.return_value = self.mock_scheduler hs.get_clock.return_value = MockClock() self.handler = ApplicationServicesHandler(hs) + self.event_source = hs.get_event_sources() def test_notify_interested_services(self): interested_service = self._mkservice(is_interested=True) @@ -252,6 +253,38 @@ async def get_3pe_protocol(service, unusedProtocol): }, ) + def test_notify_interested_services_ephemeral(self): + interested_service = self._mkservice(is_interested=True) + services = [interested_service] + + self.mock_store.get_app_services.return_value = services + self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(579) + + event = Mock(event_id="event_1") + self.event_source.sources.receipt.get_new_events_as.return_value = make_awaitable(([event], None)) + + self.handler.notify_interested_services_ephemeral("receipt_key", 580) + self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with( + interested_service, [event] + ) + self.mock_store.set_type_stream_id_for_appservice.assert_called_once_with( + interested_service, "read_receipt", 580, + ) + + def test_notify_interested_services_ephemeral_out_of_order(self): + interested_service = self._mkservice(is_interested=True) + services = [interested_service] + + self.mock_store.get_app_services.return_value = services + self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(580) + + event = Mock(event_id="event_1") + self.event_source.sources.receipt.get_new_events_as.return_value = make_awaitable(([event], None)) + + self.handler.notify_interested_services_ephemeral("receipt_key", 579) + self.mock_scheduler.submit_ephemeral_events_for_as.assert_not_called() + self.mock_store.set_type_stream_id_for_appservice.assert_not_called() + def _mkservice(self, is_interested, protocols=None): service = Mock() service.is_interested.return_value = make_awaitable(is_interested) From 94458831f69e3c68dc575501c52c238812f26174 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 29 Oct 2021 15:40:10 +0100 Subject: [PATCH 07/13] Lint `test_appservice.py`. --- tests/handlers/test_appservice.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 093a46c2219a..731267d0db60 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -258,17 +258,23 @@ def test_notify_interested_services_ephemeral(self): services = [interested_service] self.mock_store.get_app_services.return_value = services - self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(579) + self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable( + 579 + ) event = Mock(event_id="event_1") - self.event_source.sources.receipt.get_new_events_as.return_value = make_awaitable(([event], None)) + self.event_source.sources.receipt.get_new_events_as.return_value = ( + make_awaitable(([event], None)) + ) self.handler.notify_interested_services_ephemeral("receipt_key", 580) self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with( interested_service, [event] ) self.mock_store.set_type_stream_id_for_appservice.assert_called_once_with( - interested_service, "read_receipt", 580, + interested_service, + "read_receipt", + 580, ) def test_notify_interested_services_ephemeral_out_of_order(self): @@ -276,10 +282,14 @@ def test_notify_interested_services_ephemeral_out_of_order(self): services = [interested_service] self.mock_store.get_app_services.return_value = services - self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(580) + self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable( + 580 + ) event = Mock(event_id="event_1") - self.event_source.sources.receipt.get_new_events_as.return_value = make_awaitable(([event], None)) + self.event_source.sources.receipt.get_new_events_as.return_value = ( + make_awaitable(([event], None)) + ) self.handler.notify_interested_services_ephemeral("receipt_key", 579) self.mock_scheduler.submit_ephemeral_events_for_as.assert_not_called() From bc991c2c049d2525c3f6f032f767fdd9404138c2 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Tue, 2 Nov 2021 12:55:24 +0000 Subject: [PATCH 08/13] Replace exception with log & empty return when stream tokens invalid. --- synapse/handlers/appservice.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index e24d5c5cd369..99ca0a3f55b7 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -268,10 +268,10 @@ async def _notify_interested_services_ephemeral( service, events ) - # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( - service, "read_receipt", new_token - ) + # Persist the latest handled stream token for this appservice + await self.store.set_type_stream_id_for_appservice( + service, "read_receipt", new_token + ) elif stream_key == "presence_key": events = await self._handle_presence(service, users, new_token) @@ -280,10 +280,10 @@ async def _notify_interested_services_ephemeral( service, events ) - # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( - service, "presence", new_token - ) + # Persist the latest handled stream token for this appservice + await self.store.set_type_stream_id_for_appservice( + service, "presence", new_token + ) async def _handle_typing( self, service: ApplicationService, new_token: int @@ -342,7 +342,8 @@ async def _handle_receipts( service, "read_receipt" ) if new_token is not None and new_token <= from_key: - raise Exception("Rejecting token lower than stored: %s" % (new_token,)) + logger.debug("Rejecting token lower than stored: %s" % (new_token,)) + return [] receipts_source = self.event_sources.sources.receipt receipts, _ = await receipts_source.get_new_events_as( @@ -378,7 +379,8 @@ async def _handle_presence( service, "presence" ) if new_token is not None and new_token <= from_key: - raise Exception("Rejecting token lower than stored: %s" % (new_token,)) + logger.debug("Rejecting token lower than stored: %s" % (new_token,)) + return [] for user in users: if isinstance(user, str): From 451c31f67a5e37da3d5afdfabe618f50e08edcab Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Tue, 2 Nov 2021 13:03:19 +0000 Subject: [PATCH 09/13] Add docstrings to ephemeral appservice tests. --- tests/handlers/test_appservice.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 731267d0db60..65bd813f2ab0 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -254,6 +254,11 @@ async def get_3pe_protocol(service, unusedProtocol): ) def test_notify_interested_services_ephemeral(self): + """ + Test sending ephemeral events to the appservice handler are scheduled + to be pushed out to interested appservices, and that the stream ID is + updated accordingly. + """ interested_service = self._mkservice(is_interested=True) services = [interested_service] @@ -278,6 +283,10 @@ def test_notify_interested_services_ephemeral(self): ) def test_notify_interested_services_ephemeral_out_of_order(self): + """ + Test sending out of order ephemeral events to the appservice handler + are ignored. + """ interested_service = self._mkservice(is_interested=True) services = [interested_service] From 88d2d5c0e490e597aa0cbde9e7c2a36dd068ffaf Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Wed, 3 Nov 2021 15:14:01 +0000 Subject: [PATCH 10/13] Always set the stream position, even when there are no events. --- synapse/handlers/appservice.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 99ca0a3f55b7..4c50af6bc322 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -268,10 +268,10 @@ async def _notify_interested_services_ephemeral( service, events ) - # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( - service, "read_receipt", new_token - ) + # Persist the latest handled stream token for this appservice + await self.store.set_type_stream_id_for_appservice( + service, "read_receipt", new_token + ) elif stream_key == "presence_key": events = await self._handle_presence(service, users, new_token) @@ -280,10 +280,10 @@ async def _notify_interested_services_ephemeral( service, events ) - # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( - service, "presence", new_token - ) + # Persist the latest handled stream token for this appservice + await self.store.set_type_stream_id_for_appservice( + service, "presence", new_token + ) async def _handle_typing( self, service: ApplicationService, new_token: int From 2d924c57c588d9fabd039efaed744052efd4f044 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Wed, 3 Nov 2021 15:30:29 +0000 Subject: [PATCH 11/13] Remove check that set stream has not been called. --- tests/handlers/test_appservice.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 65bd813f2ab0..1f6a924452ad 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -302,7 +302,6 @@ def test_notify_interested_services_ephemeral_out_of_order(self): self.handler.notify_interested_services_ephemeral("receipt_key", 579) self.mock_scheduler.submit_ephemeral_events_for_as.assert_not_called() - self.mock_store.set_type_stream_id_for_appservice.assert_not_called() def _mkservice(self, is_interested, protocols=None): service = Mock() From e76e59ccf59eda761ee488e30857592dab1be16f Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Wed, 3 Nov 2021 16:09:26 +0000 Subject: [PATCH 12/13] Update debug log line --- synapse/handlers/appservice.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 4c50af6bc322..ce5d4657cf96 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -342,7 +342,7 @@ async def _handle_receipts( service, "read_receipt" ) if new_token is not None and new_token <= from_key: - logger.debug("Rejecting token lower than stored: %s" % (new_token,)) + logger.debug("Rejecting token lower than or equal to stored: %s" % (new_token,)) return [] receipts_source = self.event_sources.sources.receipt @@ -379,7 +379,7 @@ async def _handle_presence( service, "presence" ) if new_token is not None and new_token <= from_key: - logger.debug("Rejecting token lower than stored: %s" % (new_token,)) + logger.debug("Rejecting token lower than or equal to stored: %s" % (new_token,)) return [] for user in users: From 36ecd707d8f931666af720dd39548433d3bcd462 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 3 Nov 2021 16:23:10 +0000 Subject: [PATCH 13/13] lint --- synapse/handlers/appservice.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index ce5d4657cf96..4308e5e5cfcd 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -342,7 +342,9 @@ async def _handle_receipts( service, "read_receipt" ) if new_token is not None and new_token <= from_key: - logger.debug("Rejecting token lower than or equal to stored: %s" % (new_token,)) + logger.debug( + "Rejecting token lower than or equal to stored: %s" % (new_token,) + ) return [] receipts_source = self.event_sources.sources.receipt @@ -379,7 +381,9 @@ async def _handle_presence( service, "presence" ) if new_token is not None and new_token <= from_key: - logger.debug("Rejecting token lower than or equal to stored: %s" % (new_token,)) + logger.debug( + "Rejecting token lower than or equal to stored: %s" % (new_token,) + ) return [] for user in users: