@@ -413,6 +413,14 @@ def parse_stream_token(cls, string: str) -> "RoomStreamToken":
413
413
pass
414
414
raise SynapseError (400 , "Invalid token %r" % (string ,))
415
415
416
+ def copy_and_advance (self , other : "RoomStreamToken" ) -> "RoomStreamToken" :
417
+ if self .topological or other .topological :
418
+ raise Exception ("Can't advance topological tokens" )
419
+
420
+ max_stream = max (self .stream , other .stream )
421
+
422
+ return RoomStreamToken (None , max_stream )
423
+
416
424
def as_tuple (self ) -> Tuple [Optional [int ], int ]:
417
425
return (self .topological , self .stream )
418
426
@@ -462,13 +470,16 @@ def copy_and_advance(self, key, new_value) -> "StreamToken":
462
470
"""Advance the given key in the token to a new value if and only if the
463
471
new value is after the old value.
464
472
"""
465
- new_token = self .copy_and_replace (key , new_value )
466
473
if key == "room_key" :
467
- new_id = new_token .room_stream_id
468
- old_id = self .room_stream_id
469
- else :
470
- new_id = int (getattr (new_token , key ))
471
- old_id = int (getattr (self , key ))
474
+ new_token = self .copy_and_replace (
475
+ "room_key" , self .room_key .copy_and_advance (new_value )
476
+ )
477
+ return new_token
478
+
479
+ new_token = self .copy_and_replace (key , new_value )
480
+ new_id = int (getattr (new_token , key ))
481
+ old_id = int (getattr (self , key ))
482
+
472
483
if old_id < new_id :
473
484
return new_token
474
485
else :
0 commit comments