Skip to content

Commit 39cd77c

Browse files
committed
DatastoreStorage.read_blocks_by_seq: resume and continue if we lose ndb context
should fix https://console.cloud.google.com/errors/detail/CO2g4eLG_tOkZg;time=P1D;locations=global?project=bridgy-federated . I was hoping it'd also fix snarfed/bridgy-fed#1367 , but probably not, due to snarfed/bridgy-fed#1367 (comment)
1 parent 396016e commit 39cd77c

File tree

2 files changed

+41
-26
lines changed

2 files changed

+41
-26
lines changed

arroba/datastore_storage.py

+38-21
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import dag_cbor
1212
import dag_json
1313
from google.cloud import ndb
14-
from google.cloud.ndb.context import get_context
14+
from google.cloud.ndb import context
1515
from google.cloud.ndb.exceptions import ContextError
1616
from lexrpc import ValidationError
1717
from multiformats import CID, multicodec, multihash
@@ -410,9 +410,9 @@ def __init__(self, *, ndb_client=None):
410410
def ndb_context(fn):
411411
@wraps(fn)
412412
def decorated(self, *args, **kwargs):
413-
context = get_context(raise_context_error=False)
413+
ctx = context.get_context(raise_context_error=False)
414414

415-
with context.use() if context else self.ndb_client.context():
415+
with ctx.use() if ctx else self.ndb_client.context():
416416
ret = fn(self, *args, **kwargs)
417417

418418
return ret
@@ -514,24 +514,41 @@ def read_many(self, cids):
514514
def read_blocks_by_seq(self, start=0, repo=None):
515515
assert start >= 0
516516

517-
context = get_context(raise_context_error=False)
518-
519-
with context.use() if context else self.ndb_client.context() as cm:
520-
# lexrpc event subscription handlers like subscribeRepos call this
521-
# on a different thread, so if we're there, we need to create a new
522-
# ndb context
523-
try:
524-
query = AtpBlock.query(AtpBlock.seq >= start).order(AtpBlock.seq)
525-
if repo:
526-
query = query.filter(AtpBlock.repo == AtpRepo(id=repo).key)
527-
# unproven hypothesis: need strong consistency to make sure we
528-
# get all blocks for a given seq, including commit
529-
# https://console.cloud.google.com/errors/detail/CO2g4eLG_tOkZg;service=atproto-hub;time=P1D;refresh=true;locations=global?project=bridgy-federated
530-
for atp_block in query.iter(read_consistency=ndb.STRONG):
531-
yield atp_block.to_block()
532-
except ContextError as e:
533-
logging.warning(f'lost ndb context! client may have disconnected? "{e}"')
534-
return
517+
cur_seq = start
518+
cur_seq_cids = []
519+
520+
while True:
521+
ctx = context.get_context(raise_context_error=False)
522+
with ctx.use() if ctx else self.ndb_client.context():
523+
# lexrpc event subscription handlers like subscribeRepos call this
524+
# on a different thread, so if we're there, we need to create a new
525+
# ndb context
526+
try:
527+
query = AtpBlock.query(AtpBlock.seq >= cur_seq).order(AtpBlock.seq)
528+
if repo:
529+
query = query.filter(AtpBlock.repo == AtpRepo(id=repo).key)
530+
# unproven hypothesis: need strong consistency to make sure we
531+
# get all blocks for a given seq, including commit
532+
# https://console.cloud.google.com/errors/detail/CO2g4eLG_tOkZg;service=atproto-hub;time=P1D;refresh=true;locations=global?project=bridgy-federated
533+
for atp_block in query.iter(read_consistency=ndb.STRONG):
534+
if atp_block.seq != cur_seq:
535+
cur_seq = atp_block.seq
536+
cur_seq_cids = []
537+
if atp_block.key.id() not in cur_seq_cids:
538+
cur_seq_cids.append(atp_block.key.id())
539+
yield atp_block.to_block()
540+
541+
# finished cleanly
542+
break
543+
544+
except ContextError as e:
545+
logging.warning(f'lost ndb context! re-querying at {cur_seq}. {e}')
546+
# continue loop, restart query
547+
548+
# Context.use() resets this to the previous context when it exits,
549+
# but that context is bad now, so make sure we get a new one at the
550+
# top of the loop
551+
context._state.context = None
535552

536553
@ndb_context
537554
def has(self, cid):

arroba/tests/test_datastore_storage.py

+3-5
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,7 @@ def test_read_blocks_by_seq_no_ndb_context(self):
223223
block = self.storage.write(repo_did='did:plc:123', obj={'foo': 2})
224224

225225
self.ndb_context.__exit__(None, None, None)
226-
self.assertEqual([block.cid],
227-
[b.cid for b in self.storage.read_blocks_by_seq()])
226+
self.assertEqual([block], list(self.storage.read_blocks_by_seq()))
228227

229228
def test_read_blocks_by_seq_ndb_context_closes_while_running(self):
230229
AtpSequence.allocate(SUBSCRIBE_REPOS_NSID)
@@ -234,10 +233,9 @@ def test_read_blocks_by_seq_ndb_context_closes_while_running(self):
234233
]
235234

236235
call = self.storage.read_blocks_by_seq()
237-
self.assertEqual(blocks[0].cid, next(call).cid)
238-
236+
self.assertEqual(blocks[0], next(call))
239237
self.ndb_context.__exit__(None, None, None)
240-
self.assertEqual([], list(call))
238+
self.assertEqual([blocks[1]], list(call))
241239

242240
def assert_same_seq(self, cids):
243241
"""

0 commit comments

Comments
 (0)