Skip to content

Commit a47b500

Browse files
committed
Return refresh future and ignore redundant refreshes
A mapping of in-flight refreshes is now maintained and lazily initialized if not used. This allows the cache to ignore redundant requests for reloads, like Guava does. It also removes disablement of expiration during refresh and resolves an ABA problem if the entry is modified in a previously undectectable way. The refresh future can now be obtained from LoadingCache to chain operations against. fixes #143 fixes #193 fixes #236 fixes #282 fixes #322 fixed #373 fixes #467
1 parent 9c8b3d0 commit a47b500

16 files changed

+573
-133
lines changed

caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java

+91-40
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import java.util.concurrent.ForkJoinTask;
6161
import java.util.concurrent.ThreadLocalRandom;
6262
import java.util.concurrent.TimeUnit;
63+
import java.util.concurrent.atomic.AtomicReference;
6364
import java.util.concurrent.locks.ReentrantLock;
6465
import java.util.function.BiFunction;
6566
import java.util.function.Consumer;
@@ -221,10 +222,10 @@ abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef<K, V>
221222
final Executor executor;
222223
final boolean isAsync;
223224

224-
// The collection views
225-
@Nullable transient Set<K> keySet;
226-
@Nullable transient Collection<V> values;
227-
@Nullable transient Set<Entry<K, V>> entrySet;
225+
@Nullable Set<K> keySet;
226+
@Nullable Collection<V> values;
227+
@Nullable Set<Entry<K, V>> entrySet;
228+
AtomicReference<ConcurrentMap<Object, CompletableFuture<?>>> refreshes;
228229

229230
/** Creates an instance based on the builder's configuration. */
230231
protected BoundedLocalCache(Caffeine<K, V> builder,
@@ -234,6 +235,7 @@ protected BoundedLocalCache(Caffeine<K, V> builder,
234235
executor = builder.getExecutor();
235236
writer = builder.getCacheWriter();
236237
evictionLock = new ReentrantLock();
238+
refreshes = new AtomicReference<>();
237239
weigher = builder.getWeigher(isAsync);
238240
drainBuffersTask = new PerformCleanupTask(this);
239241
nodeFactory = NodeFactory.newFactory(builder, isAsync);
@@ -289,11 +291,29 @@ public final Executor executor() {
289291
return executor;
290292
}
291293

294+
@Override
295+
@SuppressWarnings("NullAway")
296+
public ConcurrentMap<Object, CompletableFuture<?>> refreshes() {
297+
var pending = refreshes.get();
298+
if (pending == null) {
299+
pending = new ConcurrentHashMap<>();
300+
if (!refreshes.compareAndSet(null, pending)) {
301+
pending = refreshes.get();
302+
}
303+
}
304+
return pending;
305+
}
306+
292307
/** Returns whether this cache notifies a writer when an entry is modified. */
293308
protected boolean hasWriter() {
294309
return (writer != CacheWriter.disabledWriter());
295310
}
296311

312+
@Override
313+
public Object referenceKey(K key) {
314+
return nodeFactory.newLookupKey(key);
315+
}
316+
297317
/* --------------- Stats Support --------------- */
298318

299319
@Override
@@ -900,8 +920,9 @@ boolean evictEntry(Node<K, V> node, RemovalCause cause, long now) {
900920
boolean[] removed = new boolean[1];
901921
boolean[] resurrect = new boolean[1];
902922
RemovalCause[] actualCause = new RemovalCause[1];
923+
Object keyReference = node.getKeyReference();
903924

904-
data.computeIfPresent(node.getKeyReference(), (k, n) -> {
925+
data.computeIfPresent(keyReference, (k, n) -> {
905926
if (n != node) {
906927
return n;
907928
}
@@ -964,6 +985,12 @@ boolean evictEntry(Node<K, V> node, RemovalCause cause, long now) {
964985

965986
if (removed[0]) {
966987
statsCounter().recordEviction(node.getWeight(), actualCause[0]);
988+
989+
var pending = refreshes.get();
990+
if (pending != null) {
991+
pending.remove(keyReference);
992+
}
993+
967994
if (hasRemovalListener()) {
968995
// Notify the listener only if the entry was evicted. This must be performed as the last
969996
// step during eviction to safe guard against the executor rejecting the notification task.
@@ -1171,51 +1198,60 @@ void refreshIfNeeded(Node<K, V> node, long now) {
11711198
if (!refreshAfterWrite()) {
11721199
return;
11731200
}
1201+
11741202
K key;
11751203
V oldValue;
1176-
long oldWriteTime = node.getWriteTime();
1177-
long refreshWriteTime = (now + ASYNC_EXPIRY);
1178-
if (((now - oldWriteTime) > refreshAfterWriteNanos())
1204+
long writeTime = node.getWriteTime();
1205+
Object keyReference = node.getKeyReference();
1206+
if (((now - writeTime) > refreshAfterWriteNanos()) && (keyReference != null)
11791207
&& ((key = node.getKey()) != null) && ((oldValue = node.getValue()) != null)
1180-
&& node.casWriteTime(oldWriteTime, refreshWriteTime)) {
1181-
try {
1182-
CompletableFuture<V> refreshFuture;
1183-
long startTime = statsTicker().read();
1208+
&& !refreshes().containsKey(keyReference)) {
1209+
long[] startTime = new long[1];
1210+
@SuppressWarnings({"unchecked", "rawtypes"})
1211+
CompletableFuture<V>[] refreshFuture = new CompletableFuture[1];
1212+
refreshes().computeIfAbsent(keyReference, k -> {
1213+
startTime[0] = statsTicker().read();
11841214
if (isAsync) {
11851215
@SuppressWarnings("unchecked")
11861216
CompletableFuture<V> future = (CompletableFuture<V>) oldValue;
11871217
if (Async.isReady(future)) {
11881218
@SuppressWarnings("NullAway")
1189-
CompletableFuture<V> refresh = future.thenCompose(value ->
1190-
cacheLoader.asyncReload(key, value, executor));
1191-
refreshFuture = refresh;
1219+
var refresh = cacheLoader.asyncReload(key, future.join(), executor);
1220+
refreshFuture[0] = refresh;
11921221
} else {
11931222
// no-op if load is pending
1194-
node.casWriteTime(refreshWriteTime, oldWriteTime);
1195-
return;
1223+
return future;
11961224
}
11971225
} else {
11981226
@SuppressWarnings("NullAway")
1199-
CompletableFuture<V> refresh = cacheLoader.asyncReload(key, oldValue, executor);
1200-
refreshFuture = refresh;
1227+
var refresh = cacheLoader.asyncReload(key, oldValue, executor);
1228+
refreshFuture[0] = refresh;
12011229
}
1202-
refreshFuture.whenComplete((newValue, error) -> {
1203-
long loadTime = statsTicker().read() - startTime;
1230+
return refreshFuture[0];
1231+
});
1232+
1233+
if (refreshFuture[0] != null) {
1234+
refreshFuture[0].whenComplete((newValue, error) -> {
1235+
long loadTime = statsTicker().read() - startTime[0];
12041236
if (error != null) {
12051237
logger.log(Level.WARNING, "Exception thrown during refresh", error);
1206-
node.casWriteTime(refreshWriteTime, oldWriteTime);
1238+
refreshes().remove(keyReference, refreshFuture[0]);
12071239
statsCounter().recordLoadFailure(loadTime);
12081240
return;
12091241
}
12101242

12111243
@SuppressWarnings("unchecked")
1212-
V value = (isAsync && (newValue != null)) ? (V) refreshFuture : newValue;
1244+
V value = (isAsync && (newValue != null)) ? (V) refreshFuture[0] : newValue;
12131245

12141246
boolean[] discard = new boolean[1];
12151247
compute(key, (k, currentValue) -> {
12161248
if (currentValue == null) {
1217-
return value;
1218-
} else if ((currentValue == oldValue) && (node.getWriteTime() == refreshWriteTime)) {
1249+
if (value == null) {
1250+
return null;
1251+
} else if (refreshes().get(key) == refreshFuture[0]) {
1252+
return value;
1253+
}
1254+
} else if ((currentValue == oldValue) && (node.getWriteTime() == writeTime)) {
12191255
return value;
12201256
}
12211257
discard[0] = true;
@@ -1230,10 +1266,9 @@ void refreshIfNeeded(Node<K, V> node, long now) {
12301266
} else {
12311267
statsCounter().recordLoadSuccess(loadTime);
12321268
}
1269+
1270+
refreshes().remove(keyReference, refreshFuture[0]);
12331271
});
1234-
} catch (Throwable t) {
1235-
node.casWriteTime(refreshWriteTime, oldWriteTime);
1236-
logger.log(Level.ERROR, "Exception thrown when submitting refresh task", t);
12371272
}
12381273
}
12391274
}
@@ -1781,8 +1816,12 @@ public void clear() {
17811816
}
17821817

17831818
// Discard all entries
1784-
for (Node<K, V> node : data.values()) {
1785-
removeNode(node, now);
1819+
var pending = refreshes.get();
1820+
for (var entry : data.entrySet()) {
1821+
removeNode(entry.getValue(), now);
1822+
if (pending != null) {
1823+
pending.remove(entry.getKey());
1824+
}
17861825
}
17871826

17881827
// Discard all pending reads
@@ -2098,8 +2137,9 @@ public Map<K, V> getAllPresent(Iterable<?> keys) {
20982137
@SuppressWarnings("unchecked")
20992138
V[] oldValue = (V[]) new Object[1];
21002139
RemovalCause[] cause = new RemovalCause[1];
2140+
Object lookupKey = nodeFactory.newLookupKey(key);
21012141

2102-
data.computeIfPresent(nodeFactory.newLookupKey(key), (k, n) -> {
2142+
data.computeIfPresent(lookupKey, (k, n) -> {
21032143
synchronized (n) {
21042144
oldValue[0] = n.getValue();
21052145
if (oldValue[0] == null) {
@@ -2117,6 +2157,11 @@ public Map<K, V> getAllPresent(Iterable<?> keys) {
21172157
});
21182158

21192159
if (cause[0] != null) {
2160+
var pending = refreshes.get();
2161+
if (pending != null) {
2162+
pending.remove(lookupKey);
2163+
}
2164+
21202165
afterWrite(new RemovalTask(node[0]));
21212166
if (hasRemovalListener()) {
21222167
notifyRemoval(castKey, oldValue[0], cause[0]);
@@ -2139,8 +2184,9 @@ public boolean remove(Object key, Object value) {
21392184
@SuppressWarnings("unchecked")
21402185
V[] oldValue = (V[]) new Object[1];
21412186
RemovalCause[] cause = new RemovalCause[1];
2187+
Object lookupKey = nodeFactory.newLookupKey(key);
21422188

2143-
data.computeIfPresent(nodeFactory.newLookupKey(key), (kR, node) -> {
2189+
data.computeIfPresent(lookupKey, (kR, node) -> {
21442190
synchronized (node) {
21452191
oldKey[0] = node.getKey();
21462192
oldValue[0] = node.getValue();
@@ -2162,7 +2208,13 @@ public boolean remove(Object key, Object value) {
21622208

21632209
if (removed[0] == null) {
21642210
return false;
2165-
} else if (hasRemovalListener()) {
2211+
}
2212+
2213+
var pending = refreshes.get();
2214+
if (pending != null) {
2215+
pending.remove(lookupKey);
2216+
}
2217+
if (hasRemovalListener()) {
21662218
notifyRemoval(oldKey[0], oldValue[0], cause[0]);
21672219
}
21682220
afterWrite(new RemovalTask(removed[0]));
@@ -2581,15 +2633,14 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
25812633
if (expiresAfterWrite() || (weightedDifference != 0)) {
25822634
afterWrite(new UpdateTask(node, weightedDifference));
25832635
} else {
2584-
if (cause[0] == null) {
2585-
if (!isComputingAsync(node)) {
2586-
tryExpireAfterRead(node, key, newValue[0], expiry(), now[0]);
2587-
setAccessTime(node, now[0]);
2588-
}
2589-
} else if (cause[0] == RemovalCause.COLLECTED) {
2590-
scheduleDrainBuffers();
2636+
if ((cause[0] == null) && !isComputingAsync(node)) {
2637+
tryExpireAfterRead(node, key, newValue[0], expiry(), now[0]);
2638+
setAccessTime(node, now[0]);
25912639
}
25922640
afterRead(node, now[0], /* recordHit */ false);
2641+
if ((cause[0] != null) && cause[0].wasEvicted()) {
2642+
scheduleDrainBuffers();
2643+
}
25932644
}
25942645
}
25952646

caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.github.benmanes.caffeine.cache;
1717

1818
import java.util.Map;
19+
import java.util.concurrent.CompletableFuture;
1920
import java.util.concurrent.CompletionException;
2021

2122
import org.checkerframework.checker.nullness.qual.NonNull;
@@ -101,9 +102,13 @@ public interface LoadingCache<K, V> extends Cache<K, V> {
101102
* Caches loaded by a {@link CacheLoader} will call {@link CacheLoader#reload} if the cache
102103
* currently contains a value for the {@code key}, and {@link CacheLoader#load} otherwise. Loading
103104
* is asynchronous by delegating to the default executor.
105+
* <p>
106+
* Returns an existing future without doing anything if another thread is currently loading the
107+
* value for {@code key}.
104108
*
105109
* @param key key with which a value may be associated
110+
* @return the future that is loading the value
106111
* @throws NullPointerException if the specified key is null
107112
*/
108-
void refresh(@NonNull K key);
113+
CompletableFuture<V> refresh(@NonNull K key);
109114
}

0 commit comments

Comments
 (0)