Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Add a linearizer on (appservice, stream) when handling ephemeral events. #11207

Merged
1 change: 1 addition & 0 deletions changelog.d/11207.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Linearize appservice ephemeral event handling, fixing serialization errors. Contributed by @Fizzadar at Beeper.
46 changes: 30 additions & 16 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
)
from synapse.storage.databases.main.directory import RoomAliasMapping
from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.metrics import Measure

if TYPE_CHECKING:
Expand All @@ -58,6 +59,8 @@ def __init__(self, hs: "HomeServer"):
self.current_max = 0
self.is_processing = False

self._ephemeral_events_linearizer = Linearizer(name="ephemeral_events")

def notify_interested_services(self, max_token: RoomStreamToken) -> None:
"""Notifies (pushes) all application services interested in this event.

Expand Down Expand Up @@ -248,26 +251,37 @@ async def _notify_interested_services_ephemeral(
events = await self._handle_typing(service, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
continue

elif stream_key == "receipt_key":
events = await self._handle_receipts(service)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)

# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "read_receipt", new_token
# Since we read/update the stream position for this AS/stream
with (
await self._ephemeral_events_linearizer.queue(
(service.id, stream_key)
)
):
if stream_key == "receipt_key":
events = await self._handle_receipts(service)
if events:
self.scheduler.submit_ephemeral_events_for_as(
service, events
)

# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "read_receipt", new_token
)
Comment on lines +272 to +274
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we're sure that we're processing events in order, we should probably add a conditional that bails out early if new_token is less than or equal to the stored stream token for the appservice/stream ID combo.

This should probably be done right after we call get_type_stream_id_for_appservice in _handle_receipts and _handle_presence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented in f8f3c4a. I also attempted to de-dupe some of the logic here but not sure it's any better as the handle functions take different arguments; pushed to Fizzadar@8e5e670, I can pull that in if desired.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a very useful refactor, I'd love to pull it in. However, let's include it in a separate PR if you don't mind - as it will make the overall diff a bit messier.


elif stream_key == "presence_key":
events = await self._handle_presence(service, users)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
elif stream_key == "presence_key":
events = await self._handle_presence(service, users)
if events:
self.scheduler.submit_ephemeral_events_for_as(
service, events
)

# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
)
# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
)

async def _handle_typing(
self, service: ApplicationService, new_token: int
Expand Down