Skip to content

Commit c6da148

Browse files
committed
xrpc_sync.subscribeRepos: wait up to 60s for out-of-order sequence numbers
for #34
1 parent 73e36fb commit c6da148

File tree

3 files changed

+121
-27
lines changed

3 files changed

+121
-27
lines changed

arroba/tests/test_xrpc_sync.py

+68-14
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
"""Unit tests for xrpc_sync.py."""
2+
from datetime import timedelta
23
from io import BytesIO
34
from threading import Semaphore, Thread
5+
import time
46
from unittest import skip
57
from unittest.mock import patch
68

@@ -18,7 +20,7 @@
1820
from .. import server
1921
from ..storage import Action, Storage, SUBSCRIBE_REPOS_NSID
2022
from .. import util
21-
from ..util import dag_cbor_cid, int_to_tid, next_tid
23+
from ..util import dag_cbor_cid, int_to_tid, next_tid, tid_to_int
2224
from .. import xrpc_sync
2325

2426
from . import testutil
@@ -522,7 +524,7 @@ def test_get_blocks_repo_tombstoned(self):
522524
# with self.assertRaises(ValueError):
523525
# sync.loadCheckout(syncStorage, checkoutCar, repoDid, keypair.did())
524526

525-
# based atproto/packages/pds/tests/sync/list.test.ts
527+
# based on atproto/packages/pds/tests/sync/list.test.ts
526528
# def test_paginates_listed_hosted_repos(self):
527529
# full = xrpc_sync.list_repos({})
528530
# pt1 = xrpc_sync.list_repos({}, limit=2)
@@ -562,7 +564,8 @@ def subscribe(self, received, delivered=None, limit=None, cursor=None):
562564
return
563565

564566
def assertCommitMessage(self, commit_msg, record=None, write=None,
565-
repo=None, cur=None, prev=None, seq=None):
567+
repo=None, cur=None, prev=None, seq=None,
568+
check_commit=True):
566569
if not repo:
567570
repo = self.repo
568571
if not cur:
@@ -597,7 +600,7 @@ def assertCommitMessage(self, commit_msg, record=None, write=None,
597600
record_cid = dag_cbor_cid(record)
598601
mst_entry = {
599602
'e': [{
600-
'k': f'co.ll/{int_to_tid(util._tid_ts_last)}'.encode(),
603+
'k': f'co.ll/{write.rkey}'.encode(),
601604
'v': record_cid,
602605
'p': 0,
603606
't': None,
@@ -613,20 +616,21 @@ def assertCommitMessage(self, commit_msg, record=None, write=None,
613616
'l': None,
614617
}
615618

616-
commit_record = {
617-
'version': 3,
618-
'did': repo.did,
619-
'data': dag_cbor_cid(mst_entry),
620-
'rev': int_to_tid(seq, clock_id=0),
621-
'prev': prev,
622-
}
623-
624619
msg_records = [b.decoded for b in msg_blocks]
625620
# TODO: if I util.sign(commit_record), the sig doesn't match. why?
626621
for msg_record in msg_records:
627622
msg_record.pop('sig', None)
628623

629-
self.assertIn(commit_record, msg_records)
624+
if check_commit:
625+
commit_record = {
626+
'version': 3,
627+
'did': repo.did,
628+
'data': dag_cbor_cid(mst_entry),
629+
'rev': int_to_tid(seq, clock_id=0),
630+
'prev': prev,
631+
}
632+
self.assertIn(commit_record, msg_records)
633+
630634
if record:
631635
self.assertIn(record, msg_records)
632636

@@ -766,7 +770,7 @@ def test_include_preexisting_record_block(self, *_):
766770

767771
subscriber.join()
768772

769-
def test_tombstone(self, *_):
773+
def test_tombstoned(self, *_):
770774
# second repo: bob
771775
bob_repo = Repo.create(server.storage, 'did:bob',
772776
handle='bo.bb', signing_key=self.key)
@@ -826,6 +830,56 @@ def test_tombstone(self, *_):
826830
'time': testutil.NOW.isoformat(),
827831
}, payload)
828832

833+
@patch('arroba.xrpc_sync.NEW_EVENTS_TIMEOUT', timedelta(seconds=2))
834+
def test_subscribe_repos_skipped_seq(self, *_):
835+
# https://github.com/snarfed/arroba/issues/34
836+
received = []
837+
delivered = Semaphore(value=0)
838+
subscriber = Thread(target=self.subscribe,
839+
args=[received, delivered, 2])
840+
subscriber.start()
841+
842+
# prepare two writes with seqs 4 and 5
843+
write_4 = Write(Action.CREATE, 'co.ll', next_tid(), {'a': 'b'})
844+
commit_4 = Repo.format_commit(repo=self.repo, writes=[write_4])
845+
self.assertEqual(4, tid_to_int(commit_4.commit.decoded['rev']))
846+
847+
write_5 = Write(Action.CREATE, 'co.ll', next_tid(), {'x': 'y'})
848+
commit_5 = Repo.format_commit(repo=self.repo, writes=[write_5])
849+
self.assertEqual(5, tid_to_int(commit_5.commit.decoded['rev']))
850+
851+
prev = self.repo.head.cid
852+
853+
with self.assertLogs() as logs:
854+
# first write, skip seq 4, write with seq 5 instead
855+
self.repo.apply_commit(commit_5)
856+
head_5 = self.repo.head.cid
857+
858+
# there's a small chance that this could be flaky, if >.2s elapses
859+
# between starting the subscriber above and receiving the second
860+
# write below
861+
time.sleep(.1)
862+
863+
# shouldn't receive the event yet
864+
self.assertEqual(0, len(received))
865+
866+
# second write, use seq 4 that we skipped above
867+
self.repo.apply_commit(commit_4)
868+
869+
delivered.acquire()
870+
delivered.acquire()
871+
872+
self.assertIn('WARNING:arroba.xrpc_sync:Waiting for seq 4', logs.output)
873+
874+
# should receive both commits
875+
self.assertEqual(2, len(received))
876+
self.assertCommitMessage(received[0], {'a': 'b'}, write=write_4,
877+
cur=self.repo.head.cid, prev=prev, seq=4)
878+
self.assertCommitMessage(received[1], {'x': 'y'}, write=write_5,
879+
cur=head_5, prev=prev, seq=5, check_commit=False)
880+
881+
subscriber.join()
882+
829883

830884
class DatastoreXrpcSyncTest(XrpcSyncTest, testutil.DatastoreTest):
831885
STORAGE_CLS = DatastoreStorage

arroba/tests/testutil.py

+21
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
"""Common test utility code."""
2+
import contextlib
23
from datetime import datetime, timezone
34
import json
5+
import logging
46
import random
57
import os
68
import unittest
@@ -137,6 +139,25 @@ def random_keys_and_cids(num):
137139
def random_objects(num):
138140
return {next_tid(): {'foo': random.randint(1, 999999999)} for i in range(num)}
139141

142+
@contextlib.contextmanager
143+
def assertLogs(self):
144+
"""Wraps :meth:`unittest.TestCase.assertLogs` and enables/disables logs.
145+
146+
Copied from bridgy-fed/tests/testutil.py
147+
"""
148+
orig_disable_level = logging.root.manager.disable
149+
logging.disable(logging.NOTSET)
150+
151+
try:
152+
with super().assertLogs() as logs:
153+
yield logs
154+
finally:
155+
# emit logs that were captured
156+
for record in logs.records:
157+
if record.levelno >= orig_disable_level:
158+
logging.root.handle(record)
159+
logging.disable(orig_disable_level)
160+
140161

141162
class DatastoreTest(TestCase):
142163
ndb_client = ndb.Client(project='app', credentials=AnonymousCredentials())

arroba/xrpc_sync.py

+32-13
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import logging
55
import os
66
from threading import Condition
7+
import time
78

89
from carbox import car
910
import dag_cbor
@@ -127,13 +128,13 @@ def subscribe_repos(cursor=None):
127128
Returns:
128129
(dict, dict) tuple: (header, payload)
129130
"""
130-
last_seq = server.storage.last_seq(SUBSCRIBE_REPOS_NSID)
131+
cur_seq = server.storage.last_seq(SUBSCRIBE_REPOS_NSID)
131132

132133
def handle(event):
133-
nonlocal last_seq
134+
nonlocal cur_seq
134135

135136
if isinstance(event, dict): # non-commit event
136-
last_seq = event['seq']
137+
cur_seq = event['seq']
137138
type = event.pop('$type')
138139
type_fragment = type.removeprefix('com.atproto.sync.subscribeRepos')
139140
assert type_fragment != type, type
@@ -142,14 +143,14 @@ def handle(event):
142143
assert isinstance(event, CommitData), \
143144
f'unexpected event type {event.__class__} {event}'
144145

145-
last_seq = event.commit.seq
146+
cur_seq = event.commit.seq
146147
commit = event.commit.decoded
147148
car_blocks = [car.Block(cid=block.cid, data=block.encoded,
148149
decoded=block.decoded)
149150
for block in event.blocks.values()]
150151
return ({ # header
151-
'op': 1,
152-
't': '#commit',
152+
'op': 1,
153+
't': '#commit',
153154
}, { # payload
154155
'repo': commit['did'],
155156
'ops': [{
@@ -173,14 +174,14 @@ def handle(event):
173174

174175
if cursor is not None:
175176
# validate cursor
176-
if cursor > last_seq:
177-
msg = f'Cursor {cursor} is past our current sequence number {last_seq}'
177+
if cursor > cur_seq:
178+
msg = f'Cursor {cursor} is past our current sequence number {cur_seq}'
178179
logger.warning(msg)
179180
yield ({'op': -1}, {'error': 'FutureCursor', 'message': msg})
180181
return
181182

182183
if ROLLBACK_WINDOW is not None:
183-
rollback_start = max(last_seq - ROLLBACK_WINDOW - 1, 0)
184+
rollback_start = max(cur_seq - ROLLBACK_WINDOW - 1, 0)
184185
if cursor < rollback_start:
185186
logger.warning(f'Cursor {cursor} is before our rollback window; starting at {rollback_start}')
186187
yield ({'op': 1, 't': '#info'}, {'name': 'OutdatedCursor'})
@@ -190,14 +191,32 @@ def handle(event):
190191
for event in server.storage.read_events_by_seq(start=cursor):
191192
yield handle(event)
192193

193-
# serve new events as they happen
194+
# serve new events as they happen. if we see a sequence number skipped, wait
195+
# for it up to NEW_EVENTS_TIMEOUT before giving up on it and moving on
194196
logger.info(f'serving new events')
197+
timeout_s = NEW_EVENTS_TIMEOUT.total_seconds()
198+
last_yield = time.time()
199+
195200
while True:
196201
with new_events:
197-
new_events.wait(NEW_EVENTS_TIMEOUT.total_seconds())
202+
new_events.wait(timeout_s)
203+
204+
for commit_data in server.storage.read_events_by_seq(start=cur_seq + 1):
205+
last_seq = cur_seq
206+
event = handle(commit_data)
207+
208+
waited_enough = time.time() - last_yield > timeout_s
209+
if cur_seq == last_seq + 1 or waited_enough:
210+
if cur_seq > last_seq + 1:
211+
logger.warning(f'Gave up waiting for seqs {last_seq + 1} to {cur_seq - 1}!')
212+
213+
last_yield = time.time()
214+
yield event
215+
else:
216+
logger.warning(f'Waiting for seq {last_seq + 1}')
217+
cur_seq = last_seq
218+
break
198219

199-
for commit_data in server.storage.read_events_by_seq(start=last_seq + 1):
200-
yield handle(commit_data)
201220

202221

203222
@server.server.method('com.atproto.sync.getBlocks')

0 commit comments

Comments
 (0)