@@ -224,6 +224,10 @@ def __init__(
224
224
# should be less than the minimum of this set (if not empty).
225
225
self ._unfinished_ids = set () # type: Set[int]
226
226
227
+ # Set of local IDs that we've processed that are larger than the current
228
+ # position, due to there being smaller unpersisted IDs.
229
+ self ._finished_ids = set () # type: Set[int]
230
+
227
231
# We track the max position where we know everything before has been
228
232
# persisted. This is done by a) looking at the min across all instances
229
233
# and b) noting that if we have seen a run of persisted positions
@@ -348,17 +352,44 @@ def get_next_txn(self, txn: LoggingTransaction):
348
352
349
353
def _mark_id_as_finished (self , next_id : int ):
350
354
"""The ID has finished being processed so we should advance the
351
- current poistion if possible.
355
+ current position if possible.
352
356
"""
353
357
354
358
with self ._lock :
355
359
self ._unfinished_ids .discard (next_id )
360
+ self ._finished_ids .add (next_id )
361
+
362
+ new_cur = None
363
+
364
+ if self ._unfinished_ids :
365
+ # If there are unfinished IDs then the new position will be the
366
+ # largest finished ID less than the minimum unfinished ID.
367
+
368
+ finished = set ()
369
+
370
+ min_unfinshed = min (self ._unfinished_ids )
371
+ for s in self ._finished_ids :
372
+ if s < min_unfinshed :
373
+ if new_cur is None or new_cur < s :
374
+ new_cur = s
375
+ else :
376
+ finished .add (s )
377
+
378
+ # We clear these out since they're now all less than the new
379
+ # position.
380
+ self ._finished_ids = finished
381
+ else :
382
+ # There are no unfinished IDs so the new position is simply the
383
+ # largest finished one.
384
+ new_cur = max (self ._finished_ids )
385
+
386
+ # We clear these out since they're now all less than the new
387
+ # position.
388
+ self ._finished_ids .clear ()
356
389
357
- # Figure out if its safe to advance the position by checking there
358
- # aren't any lower allocated IDs that are yet to finish.
359
- if all (c > next_id for c in self ._unfinished_ids ):
390
+ if new_cur :
360
391
curr = self ._current_positions .get (self ._instance_name , 0 )
361
- self ._current_positions [self ._instance_name ] = max (curr , next_id )
392
+ self ._current_positions [self ._instance_name ] = max (curr , new_cur )
362
393
363
394
self ._add_persisted_position (next_id )
364
395
@@ -428,7 +459,7 @@ def _add_persisted_position(self, new_id: int):
428
459
# We move the current min position up if the minimum current positions
429
460
# of all instances is higher (since by definition all positions less
430
461
# that that have been persisted).
431
- min_curr = min (self ._current_positions .values ())
462
+ min_curr = min (self ._current_positions .values (), default = 0 )
432
463
self ._persisted_upto_position = max (min_curr , self ._persisted_upto_position )
433
464
434
465
# We now iterate through the seen positions, discarding those that are
0 commit comments