Skip to content

Commit 4097fe7

Browse files
committed
use ndb in-memory context cache for DID doc Objects
for #1149
1 parent 0f12c75 commit 4097fe7

8 files changed

+66
-28
lines changed

atproto_firehose.py

+6-7
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from atproto import ATProto, Cursor
2525
from common import (
2626
add,
27+
cache_policy,
2728
create_task,
2829
global_cache,
2930
global_cache_timeout_policy,
@@ -67,7 +68,7 @@ def load_dids():
6768
def _load_dids():
6869
global atproto_dids, atproto_loaded_at, bridged_dids, bridged_loaded_at
6970

70-
with ndb_client.context(global_cache=global_cache,
71+
with ndb_client.context(cache_policy=cache_policy, global_cache=global_cache,
7172
global_cache_timeout_policy=global_cache_timeout_policy):
7273
if not DEBUG:
7374
Timer(STORE_CURSOR_FREQ.total_seconds(), _load_dids).start()
@@ -97,9 +98,8 @@ def subscriber():
9798
while True:
9899
try:
99100
with ndb_client.context(
100-
global_cache=global_cache,
101-
global_cache_timeout_policy=global_cache_timeout_policy,
102-
cache_policy=lambda key: False):
101+
cache_policy=cache_policy, global_cache=global_cache,
102+
global_cache_timeout_policy=global_cache_timeout_policy):
103103
subscribe()
104104

105105
logger.info(f'disconnected! waiting {RECONNECT_DELAY} and then reconnecting')
@@ -260,9 +260,8 @@ def handler():
260260
while True:
261261
try:
262262
with ndb_client.context(
263-
global_cache=global_cache,
264-
global_cache_timeout_policy=global_cache_timeout_policy,
265-
cache_policy=lambda key: False):
263+
cache_policy=cache_policy, global_cache=global_cache,
264+
global_cache_timeout_policy=global_cache_timeout_policy):
266265
handle()
267266

268267
# if we return cleanly, that means we hit the limit

atproto_hub.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
# all protocols
2121
import activitypub, atproto, web
2222
import atproto_firehose
23-
from common import global_cache, global_cache_timeout_policy, USER_AGENT
23+
from common import cache_policy, global_cache, global_cache_timeout_policy, USER_AGENT
2424
import models
2525

2626
# as of 2024-07-10
@@ -42,11 +42,11 @@
4242

4343
app.wsgi_app = flask_util.ndb_context_middleware(
4444
app.wsgi_app, client=appengine_config.ndb_client,
45+
# limited context-local cache. avoid full one due to this bug:
46+
# https://github.com/googleapis/python-ndb/issues/888
47+
cache_policy=cache_policy,
4548
global_cache=global_cache,
4649
global_cache_timeout_policy=global_cache_timeout_policy,
47-
# disable context-local cache due to this bug:
48-
# https://github.com/googleapis/python-ndb/issues/888
49-
cache_policy=lambda key: False)
5050

5151

5252
@app.get('/liveness_check')

common.py

+26-1
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,22 @@ def report_error(msg, *, exception=False, **kwargs):
386386
logger.warning(f'Failed to report error! {kwargs}', exc_info=exception)
387387

388388

389+
def cache_policy(key):
390+
"""In memory ndb cache, only DID docs right now.
391+
392+
https://github.com/snarfed/bridgy-fed/issues/1149#issuecomment-2261383697
393+
394+
Args:
395+
key (google.cloud.datastore.key.Key): not ``google.cloud.ndb.key.Key``!
396+
see https://github.com/googleapis/python-ndb/issues/987
397+
398+
Returns:
399+
bool: whether to cache this object
400+
"""
401+
logger.info(f'ndb-cache-key {key.__class__}')
402+
return key and key.kind == 'Object' and key.name.startswith('did:')
403+
404+
389405
PROFILE_ID_RE = re.compile(
390406
fr"""
391407
/users?/[^/]+$ |
@@ -395,7 +411,16 @@ def report_error(msg, *, exception=False, **kwargs):
395411
""", re.VERBOSE)
396412

397413
def global_cache_timeout_policy(key):
398-
"""Cache users and profile objects longer than other objects."""
414+
"""Cache users and profile objects longer than other objects.
415+
416+
Args:
417+
key (google.cloud.datastore.key.Key): not ``google.cloud.ndb.key.Key``!
418+
see https://github.com/googleapis/python-ndb/issues/987
419+
420+
Returns:
421+
int: cache expiration for this object, in seconds
422+
"""
423+
logger.info(f'ndb-cache-key {key.__class__}')
399424
if (key and
400425
(key.kind in ('ActivityPub', 'ATProto', 'Follower', 'MagicKey')
401426
or key.kind == 'Object' and PROFILE_ID_RE.search(key.name))):

flask_app.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
flask_util,
1717
)
1818

19-
from common import global_cache, global_cache_timeout_policy
19+
from common import cache_policy, global_cache, global_cache_timeout_policy
2020

2121
logger = logging.getLogger(__name__)
2222
# logging.getLogger('lexrpc').setLevel(logging.INFO)
@@ -43,11 +43,11 @@
4343

4444
app.wsgi_app = flask_util.ndb_context_middleware(
4545
app.wsgi_app, client=appengine_config.ndb_client,
46-
global_cache=global_cache,
47-
global_cache_timeout_policy=global_cache_timeout_policy,
48-
# disable context-local cache due to this bug:
46+
# limited context-local cache. avoid full one due to this bug:
4947
# https://github.com/googleapis/python-ndb/issues/888
50-
cache_policy=lambda key: False)
48+
cache_policy=cache_policy,
49+
global_cache=global_cache,
50+
global_cache_timeout_policy=global_cache_timeout_policy)
5151

5252
# deregister XRPC methods we don't support
5353
for nsid in (

router.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
# all protocols
1313
import activitypub, atproto, web
14-
from common import global_cache, global_cache_timeout_policy
14+
from common import cache_policy, global_cache, global_cache_timeout_policy
1515
import models
1616
import protocol
1717

@@ -24,11 +24,11 @@
2424

2525
app.wsgi_app = flask_util.ndb_context_middleware(
2626
app.wsgi_app, client=appengine_config.ndb_client,
27-
global_cache=global_cache,
28-
global_cache_timeout_policy=global_cache_timeout_policy,
29-
# disable context-local cache due to this bug:
27+
# limited context-local cache. avoid full one due to this bug:
3028
# https://github.com/googleapis/python-ndb/issues/888
31-
cache_policy=lambda key: False)
29+
cache_policy=cache_policy,
30+
global_cache=global_cache,
31+
global_cache_timeout_policy=global_cache_timeout_policy)
3232

3333
app.add_url_rule('/queue/poll-feed', view_func=web.poll_feed_task, methods=['POST'])
3434
app.add_url_rule('/queue/receive', view_func=protocol.receive_task, methods=['POST'])

tests/test_common.py

+15-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,21 @@ def test_host_url(self):
111111
with app.test_request_context(base_url='https://bsky.brid.gy', path='/foo'):
112112
self.assertEqual('https://bsky.brid.gy/asdf', common.host_url('asdf'))
113113

114-
def test_global_cache_policy(self):
114+
def test_cache_policy(self):
115+
for id in 'did:plc:foo', 'did:web:foo':
116+
self.assertTrue(common.cache_policy(Object(id=id).key._key))
117+
118+
for obj in (
119+
ATProto(id='alice'),
120+
ActivityPub(id='alice'),
121+
Web(id='alice'),
122+
Object(id='https://mastodon.social/users/alice'),
123+
Object(id='at://did:plc:user/app.bsky.actor.profile/self'),
124+
Follower(id='abc'),
125+
):
126+
self.assertFalse(common.cache_policy(obj.key._key))
127+
128+
def test_global_cache_timeout_policy(self):
115129
for good in (
116130
ATProto(id='alice'),
117131
ActivityPub(id='alice'),

tests/test_protocol.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -2604,9 +2604,9 @@ def send(*args, **kwargs):
26042604
def receive():
26052605
with app.test_request_context('/'), \
26062606
ndb_client.context(
2607-
global_cache=_InProcessGlobalCache(),
2608-
global_cache_timeout_policy=common.global_cache_timeout_policy,
2609-
cache_policy=lambda key: False):
2607+
cache_policy=common.cache_policy,
2608+
global_cache=_InProcessGlobalCache(),
2609+
global_cache_timeout_policy=common.global_cache_timeout_policy):
26102610
try:
26112611
Fake.receive_as1(note)
26122612
except NoContent: # raised by the second thread

tests/testutil.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -285,9 +285,9 @@ def setUp(self):
285285
# clear datastore
286286
requests.post(f'http://{ndb_client.host}/reset')
287287
self.ndb_context = ndb_client.context(
288+
cache_policy=common.cache_policy,
288289
global_cache=_InProcessGlobalCache(),
289-
global_cache_timeout_policy=global_cache_timeout_policy,
290-
cache_policy=lambda key: False)
290+
global_cache_timeout_policy=global_cache_timeout_policy)
291291
self.ndb_context.__enter__()
292292

293293
util.now = lambda **kwargs: testutil.NOW

0 commit comments

Comments
 (0)