Skip to content

Commit 715cd50

Browse files
committed
Return refresh future and ignore redundant refreshes
A mapping of in-flight refreshes is now maintained and lazily initialized if not used. This allows the cache to ignore redundant requests for reloads, like Guava does. It also removes disablement of expiration during refresh and resolves an ABA problem if the entry is modified in a previously undectectable way. The refresh future can now be obtained from LoadingCache to chain operations against. fixes #143 fixes #193 fixes #236 fixes #282 fixes #322 fixed #373 fixes #467
1 parent 98ce8af commit 715cd50

17 files changed

+585
-134
lines changed

caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java

+91-40
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import java.util.concurrent.ForkJoinTask;
6161
import java.util.concurrent.ThreadLocalRandom;
6262
import java.util.concurrent.TimeUnit;
63+
import java.util.concurrent.atomic.AtomicReference;
6364
import java.util.concurrent.locks.ReentrantLock;
6465
import java.util.function.BiFunction;
6566
import java.util.function.Consumer;
@@ -220,10 +221,10 @@ abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef<K, V>
220221
final Executor executor;
221222
final boolean isAsync;
222223

223-
// The collection views
224-
@Nullable transient Set<K> keySet;
225-
@Nullable transient Collection<V> values;
226-
@Nullable transient Set<Entry<K, V>> entrySet;
224+
@Nullable Set<K> keySet;
225+
@Nullable Collection<V> values;
226+
@Nullable Set<Entry<K, V>> entrySet;
227+
AtomicReference<ConcurrentMap<Object, CompletableFuture<?>>> refreshes;
227228

228229
/** Creates an instance based on the builder's configuration. */
229230
protected BoundedLocalCache(Caffeine<K, V> builder,
@@ -233,6 +234,7 @@ protected BoundedLocalCache(Caffeine<K, V> builder,
233234
executor = builder.getExecutor();
234235
writer = builder.getCacheWriter();
235236
evictionLock = new ReentrantLock();
237+
refreshes = new AtomicReference<>();
236238
weigher = builder.getWeigher(isAsync);
237239
drainBuffersTask = new PerformCleanupTask(this);
238240
nodeFactory = NodeFactory.newFactory(builder, isAsync);
@@ -288,11 +290,29 @@ public final Executor executor() {
288290
return executor;
289291
}
290292

293+
@Override
294+
@SuppressWarnings("NullAway")
295+
public ConcurrentMap<Object, CompletableFuture<?>> refreshes() {
296+
var pending = refreshes.get();
297+
if (pending == null) {
298+
pending = new ConcurrentHashMap<>();
299+
if (!refreshes.compareAndSet(null, pending)) {
300+
pending = refreshes.get();
301+
}
302+
}
303+
return pending;
304+
}
305+
291306
/** Returns whether this cache notifies a writer when an entry is modified. */
292307
protected boolean hasWriter() {
293308
return (writer != CacheWriter.disabledWriter());
294309
}
295310

311+
@Override
312+
public Object referenceKey(K key) {
313+
return nodeFactory.newLookupKey(key);
314+
}
315+
296316
/* --------------- Stats Support --------------- */
297317

298318
@Override
@@ -899,8 +919,9 @@ boolean evictEntry(Node<K, V> node, RemovalCause cause, long now) {
899919
boolean[] removed = new boolean[1];
900920
boolean[] resurrect = new boolean[1];
901921
RemovalCause[] actualCause = new RemovalCause[1];
922+
Object keyReference = node.getKeyReference();
902923

903-
data.computeIfPresent(node.getKeyReference(), (k, n) -> {
924+
data.computeIfPresent(keyReference, (k, n) -> {
904925
if (n != node) {
905926
return n;
906927
}
@@ -965,6 +986,12 @@ boolean evictEntry(Node<K, V> node, RemovalCause cause, long now) {
965986

966987
if (removed[0]) {
967988
statsCounter().recordEviction(node.getWeight(), actualCause[0]);
989+
990+
var pending = refreshes.get();
991+
if (pending != null) {
992+
pending.remove(keyReference);
993+
}
994+
968995
if (hasRemovalListener()) {
969996
// Notify the listener only if the entry was evicted. This must be performed as the last
970997
// step during eviction to safe guard against the executor rejecting the notification task.
@@ -1172,51 +1199,60 @@ void refreshIfNeeded(Node<K, V> node, long now) {
11721199
if (!refreshAfterWrite()) {
11731200
return;
11741201
}
1202+
11751203
K key;
11761204
V oldValue;
1177-
long oldWriteTime = node.getWriteTime();
1178-
long refreshWriteTime = (now + ASYNC_EXPIRY);
1179-
if (((now - oldWriteTime) > refreshAfterWriteNanos())
1205+
long writeTime = node.getWriteTime();
1206+
Object keyReference = node.getKeyReference();
1207+
if (((now - writeTime) > refreshAfterWriteNanos()) && (keyReference != null)
11801208
&& ((key = node.getKey()) != null) && ((oldValue = node.getValue()) != null)
1181-
&& node.casWriteTime(oldWriteTime, refreshWriteTime)) {
1182-
try {
1183-
CompletableFuture<V> refreshFuture;
1184-
long startTime = statsTicker().read();
1209+
&& !refreshes().containsKey(keyReference)) {
1210+
long[] startTime = new long[1];
1211+
@SuppressWarnings({"unchecked", "rawtypes"})
1212+
CompletableFuture<V>[] refreshFuture = new CompletableFuture[1];
1213+
refreshes().computeIfAbsent(keyReference, k -> {
1214+
startTime[0] = statsTicker().read();
11851215
if (isAsync) {
11861216
@SuppressWarnings("unchecked")
11871217
CompletableFuture<V> future = (CompletableFuture<V>) oldValue;
11881218
if (Async.isReady(future)) {
11891219
@SuppressWarnings("NullAway")
1190-
CompletableFuture<V> refresh = future.thenCompose(value ->
1191-
cacheLoader.asyncReload(key, value, executor));
1192-
refreshFuture = refresh;
1220+
var refresh = cacheLoader.asyncReload(key, future.join(), executor);
1221+
refreshFuture[0] = refresh;
11931222
} else {
11941223
// no-op if load is pending
1195-
node.casWriteTime(refreshWriteTime, oldWriteTime);
1196-
return;
1224+
return future;
11971225
}
11981226
} else {
11991227
@SuppressWarnings("NullAway")
1200-
CompletableFuture<V> refresh = cacheLoader.asyncReload(key, oldValue, executor);
1201-
refreshFuture = refresh;
1228+
var refresh = cacheLoader.asyncReload(key, oldValue, executor);
1229+
refreshFuture[0] = refresh;
12021230
}
1203-
refreshFuture.whenComplete((newValue, error) -> {
1204-
long loadTime = statsTicker().read() - startTime;
1231+
return refreshFuture[0];
1232+
});
1233+
1234+
if (refreshFuture[0] != null) {
1235+
refreshFuture[0].whenComplete((newValue, error) -> {
1236+
long loadTime = statsTicker().read() - startTime[0];
12051237
if (error != null) {
12061238
logger.log(Level.WARNING, "Exception thrown during refresh", error);
1207-
node.casWriteTime(refreshWriteTime, oldWriteTime);
1239+
refreshes().remove(keyReference, refreshFuture[0]);
12081240
statsCounter().recordLoadFailure(loadTime);
12091241
return;
12101242
}
12111243

12121244
@SuppressWarnings("unchecked")
1213-
V value = (isAsync && (newValue != null)) ? (V) refreshFuture : newValue;
1245+
V value = (isAsync && (newValue != null)) ? (V) refreshFuture[0] : newValue;
12141246

12151247
boolean[] discard = new boolean[1];
12161248
compute(key, (k, currentValue) -> {
12171249
if (currentValue == null) {
1218-
return value;
1219-
} else if ((currentValue == oldValue) && (node.getWriteTime() == refreshWriteTime)) {
1250+
if (value == null) {
1251+
return null;
1252+
} else if (refreshes().get(key) == refreshFuture[0]) {
1253+
return value;
1254+
}
1255+
} else if ((currentValue == oldValue) && (node.getWriteTime() == writeTime)) {
12201256
return value;
12211257
}
12221258
discard[0] = true;
@@ -1231,10 +1267,9 @@ void refreshIfNeeded(Node<K, V> node, long now) {
12311267
} else {
12321268
statsCounter().recordLoadSuccess(loadTime);
12331269
}
1270+
1271+
refreshes().remove(keyReference, refreshFuture[0]);
12341272
});
1235-
} catch (Throwable t) {
1236-
node.casWriteTime(refreshWriteTime, oldWriteTime);
1237-
logger.log(Level.ERROR, "Exception thrown when submitting refresh task", t);
12381273
}
12391274
}
12401275
}
@@ -1782,8 +1817,12 @@ public void clear() {
17821817
}
17831818

17841819
// Discard all entries
1785-
for (Node<K, V> node : data.values()) {
1786-
removeNode(node, now);
1820+
var pending = refreshes.get();
1821+
for (var entry : data.entrySet()) {
1822+
removeNode(entry.getValue(), now);
1823+
if (pending != null) {
1824+
pending.remove(entry.getKey());
1825+
}
17871826
}
17881827

17891828
// Discard all pending reads
@@ -2099,8 +2138,9 @@ public Map<K, V> getAllPresent(Iterable<?> keys) {
20992138
@SuppressWarnings("unchecked")
21002139
V[] oldValue = (V[]) new Object[1];
21012140
RemovalCause[] cause = new RemovalCause[1];
2141+
Object lookupKey = nodeFactory.newLookupKey(key);
21022142

2103-
data.computeIfPresent(nodeFactory.newLookupKey(key), (k, n) -> {
2143+
data.computeIfPresent(lookupKey, (k, n) -> {
21042144
synchronized (n) {
21052145
oldValue[0] = n.getValue();
21062146
if (oldValue[0] == null) {
@@ -2118,6 +2158,11 @@ public Map<K, V> getAllPresent(Iterable<?> keys) {
21182158
});
21192159

21202160
if (cause[0] != null) {
2161+
var pending = refreshes.get();
2162+
if (pending != null) {
2163+
pending.remove(lookupKey);
2164+
}
2165+
21212166
afterWrite(new RemovalTask(node[0]));
21222167
if (hasRemovalListener()) {
21232168
notifyRemoval(castKey, oldValue[0], cause[0]);
@@ -2140,8 +2185,9 @@ public boolean remove(Object key, Object value) {
21402185
@SuppressWarnings("unchecked")
21412186
V[] oldValue = (V[]) new Object[1];
21422187
RemovalCause[] cause = new RemovalCause[1];
2188+
Object lookupKey = nodeFactory.newLookupKey(key);
21432189

2144-
data.computeIfPresent(nodeFactory.newLookupKey(key), (kR, node) -> {
2190+
data.computeIfPresent(lookupKey, (kR, node) -> {
21452191
synchronized (node) {
21462192
oldKey[0] = node.getKey();
21472193
oldValue[0] = node.getValue();
@@ -2163,7 +2209,13 @@ public boolean remove(Object key, Object value) {
21632209

21642210
if (removed[0] == null) {
21652211
return false;
2166-
} else if (hasRemovalListener()) {
2212+
}
2213+
2214+
var pending = refreshes.get();
2215+
if (pending != null) {
2216+
pending.remove(lookupKey);
2217+
}
2218+
if (hasRemovalListener()) {
21672219
notifyRemoval(oldKey[0], oldValue[0], cause[0]);
21682220
}
21692221
afterWrite(new RemovalTask(removed[0]));
@@ -2582,15 +2634,14 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
25822634
if (expiresAfterWrite() || (weightedDifference != 0)) {
25832635
afterWrite(new UpdateTask(node, weightedDifference));
25842636
} else {
2585-
if (cause[0] == null) {
2586-
if (!isComputingAsync(node)) {
2587-
tryExpireAfterRead(node, key, newValue[0], expiry(), now[0]);
2588-
setAccessTime(node, now[0]);
2589-
}
2590-
} else if (cause[0] == RemovalCause.COLLECTED) {
2591-
scheduleDrainBuffers();
2637+
if ((cause[0] == null) && !isComputingAsync(node)) {
2638+
tryExpireAfterRead(node, key, newValue[0], expiry(), now[0]);
2639+
setAccessTime(node, now[0]);
25922640
}
25932641
afterRead(node, now[0], /* recordHit */ false);
2642+
if ((cause[0] != null) && cause[0].wasEvicted()) {
2643+
scheduleDrainBuffers();
2644+
}
25942645
}
25952646
}
25962647

caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.github.benmanes.caffeine.cache;
1717

1818
import java.util.Map;
19+
import java.util.concurrent.CompletableFuture;
1920
import java.util.concurrent.CompletionException;
2021

2122
import org.checkerframework.checker.nullness.qual.NonNull;
@@ -101,9 +102,13 @@ public interface LoadingCache<K, V> extends Cache<K, V> {
101102
* Caches loaded by a {@link CacheLoader} will call {@link CacheLoader#reload} if the cache
102103
* currently contains a value for the {@code key}, and {@link CacheLoader#load} otherwise. Loading
103104
* is asynchronous by delegating to the default executor.
105+
* <p>
106+
* Returns an existing future without doing anything if another thread is currently loading the
107+
* value for {@code key}.
104108
*
105109
* @param key key with which a value may be associated
110+
* @return the future that is loading the value
106111
* @throws NullPointerException if the specified key is null
107112
*/
108-
void refresh(@NonNull K key);
113+
CompletableFuture<V> refresh(@NonNull K key);
109114
}

0 commit comments

Comments
 (0)