Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Fix-up assertions about last stream token in push #9020

Merged
merged 4 commits into from
Jan 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9020.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add type hints to push module.
2 changes: 1 addition & 1 deletion synapse/push/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class PusherConfig:
ts = attr.ib(type=int)
lang = attr.ib(type=Optional[str])
data = attr.ib(type=Optional[JsonDict])
last_stream_ordering = attr.ib(type=Optional[int])
last_stream_ordering = attr.ib(type=int)
last_success = attr.ib(type=Optional[int])
failing_since = attr.ib(type=Optional[int])

Expand Down
7 changes: 1 addition & 6 deletions synapse/push/emailpusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ async def _unsafe_process(self) -> None:
being run.
"""
start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
assert start is not None
unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_email(
self.user_id, start, self.max_stream_ordering
)
Expand Down Expand Up @@ -220,12 +219,8 @@ async def _unsafe_process(self) -> None:
)

async def save_last_stream_ordering_and_success(
self, last_stream_ordering: Optional[int]
self, last_stream_ordering: int
) -> None:
if last_stream_ordering is None:
# This happens if we haven't yet processed anything
return

self.last_stream_ordering = last_stream_ordering
pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
self.app_id,
Expand Down
2 changes: 0 additions & 2 deletions synapse/push/httppusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ async def _unsafe_process(self) -> None:
Never call this directly: use _process which will only allow this to
run once per pusher.
"""
assert self.last_stream_ordering is not None
unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_http(
self.user_id, self.last_stream_ordering, self.max_stream_ordering
)
Expand Down Expand Up @@ -205,7 +204,6 @@ async def _unsafe_process(self) -> None:
http_push_processed_counter.inc()
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action["stream_ordering"]
assert self.last_stream_ordering is not None
pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
self.app_id,
self.pushkey,
Expand Down
10 changes: 5 additions & 5 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ async def add_pusher(

time_now_msec = self.clock.time_msec()

# create the pusher setting last_stream_ordering to the current maximum
# stream ordering, so it will process pushes from this point onwards.
last_stream_ordering = self.store.get_room_max_stream_ordering()

# we try to create the pusher just to validate the config: it
# will then get pulled out of the database,
# recreated, added and started: this means we have only one
Expand All @@ -124,16 +128,12 @@ async def add_pusher(
ts=time_now_msec,
lang=lang,
data=data,
last_stream_ordering=None,
last_stream_ordering=last_stream_ordering,
last_success=None,
failing_since=None,
)
)

# create the pusher setting last_stream_ordering to the current maximum
# stream ordering, so it will process pushes from this point onwards.
last_stream_ordering = self.store.get_room_max_stream_ordering()

await self.store.add_pusher(
user_id=user_id,
access_token=access_token,
Expand Down