Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 1f42697

Browse files
committed
Push some deferred wrangling down into DeferredCache
1 parent 7b71695 commit 1f42697

File tree

5 files changed

+67
-46
lines changed

5 files changed

+67
-46
lines changed

changelog.d/8572.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Modify `DeferredCache.get()` to return `Deferred`s instead of `ObservableDeferred`s.

synapse/util/caches/deferred_cache.py

+48-9
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class DeferredCache(Generic[KT, VT]):
5757
"""Wraps an LruCache, adding support for Deferred results.
5858
5959
It expects that each entry added with set() will be a Deferred; likewise get()
60-
may return an ObservableDeferred.
60+
will return a Deferred.
6161
"""
6262

6363
__slots__ = (
@@ -130,16 +130,22 @@ def get(
130130
key: KT,
131131
callback: Optional[Callable[[], None]] = None,
132132
update_metrics: bool = True,
133-
) -> Union[ObservableDeferred, VT]:
133+
) -> defer.Deferred:
134134
"""Looks the key up in the caches.
135135
136+
For symmetry with set(), this method does *not* follow the synapse logcontext
137+
rules: the logcontext will not be cleared on return, and the Deferred will run
138+
its callbacks in the sentinel context. In other words: wrap the result with
139+
make_deferred_yieldable() before `await`ing it.
140+
136141
Args:
137-
key(tuple)
138-
callback(fn): Gets called when the entry in the cache is invalidated
142+
key:
143+
callback: Gets called when the entry in the cache is invalidated
139144
update_metrics (bool): whether to update the cache hit rate metrics
140145
141146
Returns:
142-
Either an ObservableDeferred or the result itself
147+
A Deferred which completes with the result. Note that this may later fail
148+
if there is an ongoing set() operation which later completes with a failure.
143149
144150
Raises:
145151
KeyError if the key is not found in the cache
@@ -152,15 +158,15 @@ def get(
152158
m = self.cache.metrics
153159
assert m # we always have a name, so should always have metrics
154160
m.inc_hits()
155-
return val.deferred
161+
return val.deferred.observe()
156162

157163
val2 = self.cache.get(
158164
key, _Sentinel.sentinel, callbacks=callbacks, update_metrics=update_metrics
159165
)
160166
if val2 is _Sentinel.sentinel:
161167
raise KeyError()
162168
else:
163-
return val2
169+
return defer.succeed(val2)
164170

165171
def get_immediate(
166172
self, key: KT, default: T, update_metrics: bool = True
@@ -173,7 +179,36 @@ def set(
173179
key: KT,
174180
value: defer.Deferred,
175181
callback: Optional[Callable[[], None]] = None,
176-
) -> ObservableDeferred:
182+
) -> defer.Deferred:
183+
"""Adds a new entry to the cache (or updates an existing one).
184+
185+
The given `value` *must* be a Deferred.
186+
187+
First any existing entry for the same key is invalidated. Then a new entry
188+
is added to the cache for the given key.
189+
190+
Until the `value` completes, calls to `get()` for the key will also result in an
191+
incomplete Deferred, which will ultimately complete with the same result as
192+
`value`.
193+
194+
If `value` completes successfully, subsequent calls to `get()` will then return
195+
a completed deferred with the same result. If it *fails*, the cache is
196+
invalidated and subequent calls to `get()` will raise a KeyError.
197+
198+
If another call to `set()` happens before `value` completes, then (a) any
199+
invalidation callbacks registered in the interim will be called, (b) any
200+
`get()`s in the interim will continue to complete with the result from the
201+
*original* `value`, (c) any future calls to `get()` will complete with the
202+
result from the *new* `value`.
203+
204+
It is expected that `value` does *not* follow the synapse logcontext rules - ie,
205+
if it is incomplete, it runs its callbacks in the sentinel context.
206+
207+
Args:
208+
key: Key to be set
209+
value: a deferred which will complete with a result to add to the cache
210+
callback: An optional callback to be called when the entry is invalidated
211+
"""
177212
if not isinstance(value, defer.Deferred):
178213
raise TypeError("not a Deferred")
179214

@@ -187,6 +222,8 @@ def set(
187222
if existing_entry:
188223
existing_entry.invalidate()
189224

225+
# XXX: why don't we invalidate the entry in `self.cache` yet?
226+
190227
self._pending_deferred_cache[key] = entry
191228

192229
def compare_and_pop():
@@ -230,7 +267,9 @@ def eb(_fail):
230267
# _pending_deferred_cache to the real cache.
231268
#
232269
observer.addCallbacks(cb, eb)
233-
return observable
270+
271+
# we return a new Deferred which will be called before any subsequent observers.
272+
return observable.observe()
234273

235274
def prefill(self, key: KT, value: VT, callback: Callable[[], None] = None):
236275
callbacks = [callback] if callback else []

synapse/util/caches/descriptors.py

+7-25
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
from synapse.logging.context import make_deferred_yieldable, preserve_fn
2525
from synapse.util import unwrapFirstError
26-
from synapse.util.async_helpers import ObservableDeferred
2726
from synapse.util.caches.deferred_cache import DeferredCache
2827

2928
logger = logging.getLogger(__name__)
@@ -156,7 +155,7 @@ def __get__(self, obj, owner):
156155
keylen=self.num_args,
157156
tree=self.tree,
158157
iterable=self.iterable,
159-
) # type: DeferredCache[Tuple, Any]
158+
) # type: DeferredCache[CacheKey, Any]
160159

161160
def get_cache_key_gen(args, kwargs):
162161
"""Given some args/kwargs return a generator that resolves into
@@ -208,26 +207,12 @@ def _wrapped(*args, **kwargs):
208207
kwargs["cache_context"] = _CacheContext.get_instance(cache, cache_key)
209208

210209
try:
211-
cached_result_d = cache.get(cache_key, callback=invalidate_callback)
212-
213-
if isinstance(cached_result_d, ObservableDeferred):
214-
observer = cached_result_d.observe()
215-
else:
216-
observer = defer.succeed(cached_result_d)
217-
210+
ret = cache.get(cache_key, callback=invalidate_callback)
218211
except KeyError:
219212
ret = defer.maybeDeferred(preserve_fn(self.orig), obj, *args, **kwargs)
213+
ret = cache.set(cache_key, ret, callback=invalidate_callback)
220214

221-
def onErr(f):
222-
cache.invalidate(cache_key)
223-
return f
224-
225-
ret.addErrback(onErr)
226-
227-
result_d = cache.set(cache_key, ret, callback=invalidate_callback)
228-
observer = result_d.observe()
229-
230-
return make_deferred_yieldable(observer)
215+
return make_deferred_yieldable(ret)
231216

232217
wrapped = cast(_CachedFunction, _wrapped)
233218

@@ -286,7 +271,7 @@ def __init__(self, orig, cached_method_name, list_name, num_args=None):
286271

287272
def __get__(self, obj, objtype=None):
288273
cached_method = getattr(obj, self.cached_method_name)
289-
cache = cached_method.cache
274+
cache = cached_method.cache # type: DeferredCache[CacheKey, Any]
290275
num_args = cached_method.num_args
291276

292277
@functools.wraps(self.orig)
@@ -326,14 +311,11 @@ def arg_to_cache_key(arg):
326311
for arg in list_args:
327312
try:
328313
res = cache.get(arg_to_cache_key(arg), callback=invalidate_callback)
329-
if not isinstance(res, ObservableDeferred):
330-
results[arg] = res
331-
elif not res.has_succeeded():
332-
res = res.observe()
314+
if not res.called:
333315
res.addCallback(update_results_dict, arg)
334316
cached_defers.append(res)
335317
else:
336-
results[arg] = res.get_result()
318+
results[arg] = res.result
337319
except KeyError:
338320
missing.add(arg)
339321

tests/util/caches/test_deferred_cache.py

+9-9
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,16 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
import unittest
1716
from functools import partial
1817

1918
from twisted.internet import defer
2019

2120
from synapse.util.caches.deferred_cache import DeferredCache
2221

22+
from tests.unittest import TestCase
2323

24-
class DeferredCacheTestCase(unittest.TestCase):
24+
25+
class DeferredCacheTestCase(TestCase):
2526
def test_empty(self):
2627
cache = DeferredCache("test")
2728
failed = False
@@ -36,7 +37,7 @@ def test_hit(self):
3637
cache = DeferredCache("test")
3738
cache.prefill("foo", 123)
3839

39-
self.assertEquals(cache.get("foo"), 123)
40+
self.assertEquals(self.successResultOf(cache.get("foo")), 123)
4041

4142
def test_get_immediate(self):
4243
cache = DeferredCache("test")
@@ -82,16 +83,15 @@ def record_callback(idx):
8283
d2 = defer.Deferred()
8384
cache.set("key2", d2, partial(record_callback, 1))
8485

85-
# lookup should return observable deferreds
86-
self.assertFalse(cache.get("key1").has_called())
87-
self.assertFalse(cache.get("key2").has_called())
86+
# lookup should return pending deferreds
87+
self.assertFalse(cache.get("key1").called)
88+
self.assertFalse(cache.get("key2").called)
8889

8990
# let one of the lookups complete
9091
d2.callback("result2")
9192

92-
# for now at least, the cache will return real results rather than an
93-
# observabledeferred
94-
self.assertEqual(cache.get("key2"), "result2")
93+
# now the cache will return a completed deferred
94+
self.assertEqual(self.successResultOf(cache.get("key2")), "result2")
9595

9696
# now do the invalidation
9797
cache.invalidate_all()

tests/util/caches/test_descriptors.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
current_context,
2828
make_deferred_yieldable,
2929
)
30-
from synapse.util.async_helpers import ObservableDeferred
3130
from synapse.util.caches import descriptors
3231
from synapse.util.caches.descriptors import cached
3332

@@ -419,9 +418,9 @@ def func(self, key):
419418

420419
a = A()
421420

422-
a.func.prefill(("foo",), ObservableDeferred(d))
421+
a.func.prefill(("foo",), 456)
423422

424-
self.assertEquals(a.func("foo").result, d.result)
423+
self.assertEquals(a.func("foo").result, 456)
425424
self.assertEquals(callcount[0], 0)
426425

427426
@defer.inlineCallbacks

0 commit comments

Comments
 (0)