Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 7941372

Browse files
authored
Make token serializing/deserializing async (#8427)
The idea is that in future tokens will encode a mapping of instance to position. However, we don't want to include the full instance name in the string representation, so instead we'll have a mapping between instance name and an immutable integer ID in the DB that we can use instead. We'll then do the lookup when we serialize/deserialize the token (we could alternatively pass around an `Instance` type that includes both the name and ID, but that turns out to be a lot more invasive).
1 parent a0a1ba6 commit 7941372

File tree

17 files changed

+115
-59
lines changed

17 files changed

+115
-59
lines changed

changelog.d/8427.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Make stream token serializing/deserializing async.

synapse/handlers/events.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,8 @@ async def get_stream(
133133

134134
chunk = {
135135
"chunk": chunks,
136-
"start": tokens[0].to_string(),
137-
"end": tokens[1].to_string(),
136+
"start": await tokens[0].to_string(self.store),
137+
"end": await tokens[1].to_string(self.store),
138138
}
139139

140140
return chunk

synapse/handlers/initial_sync.py

+7-7
Original file line numberDiff line numberDiff line change
@@ -203,8 +203,8 @@ async def handle_room(event: RoomsForUser):
203203
messages, time_now=time_now, as_client_event=as_client_event
204204
)
205205
),
206-
"start": start_token.to_string(),
207-
"end": end_token.to_string(),
206+
"start": await start_token.to_string(self.store),
207+
"end": await end_token.to_string(self.store),
208208
}
209209

210210
d["state"] = await self._event_serializer.serialize_events(
@@ -249,7 +249,7 @@ async def handle_room(event: RoomsForUser):
249249
],
250250
"account_data": account_data_events,
251251
"receipts": receipt,
252-
"end": now_token.to_string(),
252+
"end": await now_token.to_string(self.store),
253253
}
254254

255255
return ret
@@ -348,8 +348,8 @@ async def _room_initial_sync_parted(
348348
"chunk": (
349349
await self._event_serializer.serialize_events(messages, time_now)
350350
),
351-
"start": start_token.to_string(),
352-
"end": end_token.to_string(),
351+
"start": await start_token.to_string(self.store),
352+
"end": await end_token.to_string(self.store),
353353
},
354354
"state": (
355355
await self._event_serializer.serialize_events(
@@ -447,8 +447,8 @@ async def get_receipts():
447447
"chunk": (
448448
await self._event_serializer.serialize_events(messages, time_now)
449449
),
450-
"start": start_token.to_string(),
451-
"end": end_token.to_string(),
450+
"start": await start_token.to_string(self.store),
451+
"end": await end_token.to_string(self.store),
452452
},
453453
"state": state,
454454
"presence": presence,

synapse/handlers/pagination.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -413,8 +413,8 @@ async def get_messages(
413413
if not events:
414414
return {
415415
"chunk": [],
416-
"start": from_token.to_string(),
417-
"end": next_token.to_string(),
416+
"start": await from_token.to_string(self.store),
417+
"end": await next_token.to_string(self.store),
418418
}
419419

420420
state = None
@@ -442,8 +442,8 @@ async def get_messages(
442442
events, time_now, as_client_event=as_client_event
443443
)
444444
),
445-
"start": from_token.to_string(),
446-
"end": next_token.to_string(),
445+
"start": await from_token.to_string(self.store),
446+
"end": await next_token.to_string(self.store),
447447
}
448448

449449
if state:

synapse/handlers/room.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -1077,11 +1077,13 @@ def filter_evts(events):
10771077
# the token, which we replace.
10781078
token = StreamToken.START
10791079

1080-
results["start"] = token.copy_and_replace(
1080+
results["start"] = await token.copy_and_replace(
10811081
"room_key", results["start"]
1082-
).to_string()
1082+
).to_string(self.store)
10831083

1084-
results["end"] = token.copy_and_replace("room_key", results["end"]).to_string()
1084+
results["end"] = await token.copy_and_replace(
1085+
"room_key", results["end"]
1086+
).to_string(self.store)
10851087

10861088
return results
10871089

synapse/handlers/search.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -362,13 +362,13 @@ async def search(self, user, content, batch=None):
362362
self.storage, user.to_string(), res["events_after"]
363363
)
364364

365-
res["start"] = now_token.copy_and_replace(
365+
res["start"] = await now_token.copy_and_replace(
366366
"room_key", res["start"]
367-
).to_string()
367+
).to_string(self.store)
368368

369-
res["end"] = now_token.copy_and_replace(
369+
res["end"] = await now_token.copy_and_replace(
370370
"room_key", res["end"]
371-
).to_string()
371+
).to_string(self.store)
372372

373373
if include_profile:
374374
senders = {

synapse/rest/admin/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ async def on_POST(self, request, room_id, event_id):
110110
raise SynapseError(400, "Event is for wrong room.")
111111

112112
room_token = await self.store.get_topological_token_for_event(event_id)
113-
token = str(room_token)
113+
token = await room_token.to_string(self.store)
114114

115115
logger.info("[purge] purging up to token %s (event_id %s)", token, event_id)
116116
elif "purge_up_to_ts" in body:

synapse/rest/client/v1/events.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ def __init__(self, hs):
3333
super().__init__()
3434
self.event_stream_handler = hs.get_event_stream_handler()
3535
self.auth = hs.get_auth()
36+
self.store = hs.get_datastore()
3637

3738
async def on_GET(self, request):
3839
requester = await self.auth.get_user_by_req(request, allow_guest=True)
@@ -44,7 +45,7 @@ async def on_GET(self, request):
4445
if b"room_id" in request.args:
4546
room_id = request.args[b"room_id"][0].decode("ascii")
4647

47-
pagin_config = PaginationConfig.from_request(request)
48+
pagin_config = await PaginationConfig.from_request(self.store, request)
4849
timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS
4950
if b"timeout" in request.args:
5051
try:

synapse/rest/client/v1/initial_sync.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,12 @@ def __init__(self, hs):
2727
super().__init__()
2828
self.initial_sync_handler = hs.get_initial_sync_handler()
2929
self.auth = hs.get_auth()
30+
self.store = hs.get_datastore()
3031

3132
async def on_GET(self, request):
3233
requester = await self.auth.get_user_by_req(request)
3334
as_client_event = b"raw" not in request.args
34-
pagination_config = PaginationConfig.from_request(request)
35+
pagination_config = await PaginationConfig.from_request(self.store, request)
3536
include_archived = parse_boolean(request, "archived", default=False)
3637
content = await self.initial_sync_handler.snapshot_all_rooms(
3738
user_id=requester.user.to_string(),

synapse/rest/client/v1/room.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,7 @@ def __init__(self, hs):
451451
super().__init__()
452452
self.message_handler = hs.get_message_handler()
453453
self.auth = hs.get_auth()
454+
self.store = hs.get_datastore()
454455

455456
async def on_GET(self, request, room_id):
456457
# TODO support Pagination stream API (limit/tokens)
@@ -465,7 +466,7 @@ async def on_GET(self, request, room_id):
465466
if at_token_string is None:
466467
at_token = None
467468
else:
468-
at_token = StreamToken.from_string(at_token_string)
469+
at_token = await StreamToken.from_string(self.store, at_token_string)
469470

470471
# let you filter down on particular memberships.
471472
# XXX: this may not be the best shape for this API - we could pass in a filter
@@ -521,10 +522,13 @@ def __init__(self, hs):
521522
super().__init__()
522523
self.pagination_handler = hs.get_pagination_handler()
523524
self.auth = hs.get_auth()
525+
self.store = hs.get_datastore()
524526

525527
async def on_GET(self, request, room_id):
526528
requester = await self.auth.get_user_by_req(request, allow_guest=True)
527-
pagination_config = PaginationConfig.from_request(request, default_limit=10)
529+
pagination_config = await PaginationConfig.from_request(
530+
self.store, request, default_limit=10
531+
)
528532
as_client_event = b"raw" not in request.args
529533
filter_str = parse_string(request, b"filter", encoding="utf-8")
530534
if filter_str:
@@ -580,10 +584,11 @@ def __init__(self, hs):
580584
super().__init__()
581585
self.initial_sync_handler = hs.get_initial_sync_handler()
582586
self.auth = hs.get_auth()
587+
self.store = hs.get_datastore()
583588

584589
async def on_GET(self, request, room_id):
585590
requester = await self.auth.get_user_by_req(request, allow_guest=True)
586-
pagination_config = PaginationConfig.from_request(request)
591+
pagination_config = await PaginationConfig.from_request(self.store, request)
587592
content = await self.initial_sync_handler.room_initial_sync(
588593
room_id=room_id, requester=requester, pagin_config=pagination_config
589594
)

synapse/rest/client/v2_alpha/keys.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ def __init__(self, hs):
180180
super().__init__()
181181
self.auth = hs.get_auth()
182182
self.device_handler = hs.get_device_handler()
183+
self.store = hs.get_datastore()
183184

184185
async def on_GET(self, request):
185186
requester = await self.auth.get_user_by_req(request, allow_guest=True)
@@ -191,7 +192,7 @@ async def on_GET(self, request):
191192
# changes after the "to" as well as before.
192193
set_tag("to", parse_string(request, "to"))
193194

194-
from_token = StreamToken.from_string(from_token_string)
195+
from_token = await StreamToken.from_string(self.store, from_token_string)
195196

196197
user_id = requester.user.to_string()
197198

synapse/rest/client/v2_alpha/sync.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ def __init__(self, hs):
7777
super().__init__()
7878
self.hs = hs
7979
self.auth = hs.get_auth()
80+
self.store = hs.get_datastore()
8081
self.sync_handler = hs.get_sync_handler()
8182
self.clock = hs.get_clock()
8283
self.filtering = hs.get_filtering()
@@ -151,10 +152,9 @@ async def on_GET(self, request):
151152
device_id=device_id,
152153
)
153154

155+
since_token = None
154156
if since is not None:
155-
since_token = StreamToken.from_string(since)
156-
else:
157-
since_token = None
157+
since_token = await StreamToken.from_string(self.store, since)
158158

159159
# send any outstanding server notices to the user.
160160
await self._server_notices_sender.on_user_syncing(user.to_string())
@@ -236,7 +236,7 @@ async def encode_response(self, time_now, sync_result, access_token_id, filter):
236236
"leave": sync_result.groups.leave,
237237
},
238238
"device_one_time_keys_count": sync_result.device_one_time_keys_count,
239-
"next_batch": sync_result.next_batch.to_string(),
239+
"next_batch": await sync_result.next_batch.to_string(self.store),
240240
}
241241

242242
@staticmethod
@@ -413,7 +413,7 @@ def serialize(events):
413413
result = {
414414
"timeline": {
415415
"events": serialized_timeline,
416-
"prev_batch": room.timeline.prev_batch.to_string(),
416+
"prev_batch": await room.timeline.prev_batch.to_string(self.store),
417417
"limited": room.timeline.limited,
418418
},
419419
"state": {"events": serialized_state},

synapse/storage/databases/main/purge_events.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,17 @@ async def purge_history(
4242
The set of state groups that are referenced by deleted events.
4343
"""
4444

45+
parsed_token = await RoomStreamToken.parse(self, token)
46+
4547
return await self.db_pool.runInteraction(
4648
"purge_history",
4749
self._purge_history_txn,
4850
room_id,
49-
token,
51+
parsed_token,
5052
delete_local_events,
5153
)
5254

53-
def _purge_history_txn(self, txn, room_id, token_str, delete_local_events):
54-
token = RoomStreamToken.parse(token_str)
55-
55+
def _purge_history_txn(self, txn, room_id, token, delete_local_events):
5656
# Tables that should be pruned:
5757
# event_auth
5858
# event_backward_extremities

synapse/streams/config.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
15-
1615
import logging
1716
from typing import Optional
1817

@@ -21,6 +20,7 @@
2120
from synapse.api.errors import SynapseError
2221
from synapse.http.servlet import parse_integer, parse_string
2322
from synapse.http.site import SynapseRequest
23+
from synapse.storage.databases.main import DataStore
2424
from synapse.types import StreamToken
2525

2626
logger = logging.getLogger(__name__)
@@ -39,8 +39,9 @@ class PaginationConfig:
3939
limit = attr.ib(type=Optional[int])
4040

4141
@classmethod
42-
def from_request(
42+
async def from_request(
4343
cls,
44+
store: "DataStore",
4445
request: SynapseRequest,
4546
raise_invalid_params: bool = True,
4647
default_limit: Optional[int] = None,
@@ -54,13 +55,13 @@ def from_request(
5455
if from_tok == "END":
5556
from_tok = None # For backwards compat.
5657
elif from_tok:
57-
from_tok = StreamToken.from_string(from_tok)
58+
from_tok = await StreamToken.from_string(store, from_tok)
5859
except Exception:
5960
raise SynapseError(400, "'from' parameter is invalid")
6061

6162
try:
6263
if to_tok:
63-
to_tok = StreamToken.from_string(to_tok)
64+
to_tok = await StreamToken.from_string(store, to_tok)
6465
except Exception:
6566
raise SynapseError(400, "'to' parameter is invalid")
6667

0 commit comments

Comments
 (0)