Skip to content

Commit

Permalink
Address code reviews and more updates
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 committed Aug 14, 2024
1 parent 82cacd5 commit 0527b2c
Showing 1 changed file with 71 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package redis.clients.jedis.csc;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -56,7 +58,7 @@ public void lruEvictionTest() {
final int count = 100;
final int extra = 10;

// Add 100 + 10 keys to Redis (e.g., SET key:1 ... SET key:100)
// Add 100 + 10 keys to Redis
for (int i = 0; i < count + extra; i++) {
control.set("key:" + i, "value" + i);
}
Expand All @@ -65,11 +67,11 @@ public void lruEvictionTest() {
Cache cache = new DefaultCache(count, map);
try (JedisPooled jedis = new JedisPooled(hnp, clientConfig.get(), cache)) {

// Retrieve the 100 keys in the same order (e.g., GET key:1 ... GET key:100)
// Retrieve the 100 keys in the same order
for (int i = 0; i < count; i++) {
jedis.get("key:" + i);
}
assertThat(map, Matchers.aMapWithSize(count));
assertThat(map, aMapWithSize(count));

List<CacheKey> earlierKeys = new ArrayList<>(map.keySet()).subList(0, extra);
// earlier keys in map
Expand All @@ -82,7 +84,22 @@ public void lruEvictionTest() {

// earlier keys NOT in map
earlierKeys.forEach(cacheKey -> assertThat(map, Matchers.not(Matchers.hasKey(cacheKey))));
assertThat(map, Matchers.aMapWithSize(count));
assertThat(map, aMapWithSize(count));
}
}

@Test // T.5.2
public void deleteByKeyUsingMGetTest() {
Cache clientSideCache = new TestCache();
try (JedisPooled jedis = new JedisPooled(hnp, clientConfig.get(), clientSideCache)) {
jedis.set("1", "one");
jedis.set("2", "two");

assertEquals(Arrays.asList("one", "two"), jedis.mget("1", "2"));
assertEquals(1, clientSideCache.getSize());

assertThat(clientSideCache.deleteByRedisKey("1"), hasSize(1));
assertEquals(0, clientSideCache.getSize());
}
}

Expand All @@ -100,7 +117,7 @@ public void deleteByKeyTest() {
for (int i = 0; i < count; i++) {
jedis.get("k" + i);
}
assertEquals(count, map.size());
assertThat(map, aMapWithSize(count));

ArrayList<CacheKey> cacheKeys = new ArrayList<>(map.keySet());
for (int i = 0; i < count; i++) {
Expand All @@ -109,13 +126,13 @@ public void deleteByKeyTest() {
assertTrue(map.containsKey(cacheKey));
assertThat(clientSideCache.deleteByRedisKey(key), hasSize(1));
assertFalse(map.containsKey(cacheKey));
assertEquals(count - i - 1, map.size());
assertThat(map, aMapWithSize(count - i - 1));
}
assertEquals(0, map.size());
assertThat(map, aMapWithSize(0));
}
}

@Test // T.5.2 (2)
@Test // T.5.2
public void deleteByKeysTest() {
final int count = 100;
final int delete = 10;
Expand Down Expand Up @@ -144,7 +161,7 @@ public void deleteByKeysTest() {
}

@Test // T.5.3
public void deleteByEntry() {
public void deleteByEntryTest() {
final int count = 100;
for (int i = 0; i < count; i++) {
control.set("k" + i, "v" + i);
Expand All @@ -169,8 +186,8 @@ public void deleteByEntry() {
}
}

@Test // T.5.3 (2)
public void deleteByEntries() {
@Test // T.5.3
public void deleteByEntriesTest() {
final int count = 100;
final int delete = 10;
for (int i = 0; i < count; i++) {
Expand Down Expand Up @@ -283,79 +300,74 @@ public void testSequentialAccess() throws InterruptedException {
int threadCount = 10;
int iterations = 10000;

try (JedisPooled jedis = new JedisPooled(endpoint.getHostAndPort(), clientConfig.get())) {
jedis.set("foo", "0");
}
control.set("foo", "0");

ReentrantLock lock = new ReentrantLock(true);
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);

// Create the shared mock instance of cache
TestCache testCache = new TestCache();

// Submit multiple threads to perform concurrent operations
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try (JedisPooled jedis = new JedisPooled(endpoint.getHostAndPort(), clientConfig.get(), testCache)) {
for (int j = 0; j < iterations; j++) {
lock.lock();
try {
// Simulate continious get and update operations and consume invalidation events meanwhile
assertEquals(control.get("foo"), jedis.get("foo"));
Integer value = new Integer(jedis.get("foo"));
assertEquals("OK", jedis.set("foo", (++value).toString()));
} finally {
lock.unlock();
try (JedisPooled jedis = new JedisPooled(endpoint.getHostAndPort(), clientConfig.get(), new TestCache())) {
// Submit multiple threads to perform concurrent operations
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
for (int j = 0; j < iterations; j++) {
lock.lock();
try {
// Simulate continious get and update operations and consume invalidation events meanwhile
assertEquals(control.get("foo"), jedis.get("foo"));
Integer value = new Integer(jedis.get("foo"));
assertEquals("OK", jedis.set("foo", (++value).toString()));
} finally {
lock.unlock();
}
}
} finally {
latch.countDown();
}
} finally {
latch.countDown();
}
});
}
});
}

// wait for all threads to complete
latch.await();
// wait for all threads to complete
latch.await();
}

// Verify the final value of "foo" in Redis
String finalValue = control.get("foo");
assertEquals(threadCount * iterations, Integer.parseInt(finalValue));

}

@Test
public void testConcurrentAccessWithStats() throws InterruptedException {
int threadCount = 10;
int iterations = 10000;

try (JedisPooled jedis = new JedisPooled(endpoint.getHostAndPort(), clientConfig.get())) {
jedis.set("foo", "0");
}
control.set("foo", "0");

ExecutorService executorService = Executors.newFixedThreadPool(threadCount);

// Create the shared mock instance of cache
TestCache testCache = new TestCache();

// Submit multiple threads to perform concurrent operations
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try (JedisPooled jedis = new JedisPooled(endpoint.getHostAndPort(), clientConfig.get(), testCache)) {
for (int j = 0; j < iterations; j++) {
// Simulate continious get and update operations and consume invalidation events meanwhile
Integer value = new Integer(jedis.get("foo")) + 1;
assertEquals("OK", jedis.set("foo", value.toString()));
try (JedisPooled jedis = new JedisPooled(endpoint.getHostAndPort(), clientConfig.get(), testCache)) {
// Submit multiple threads to perform concurrent operations
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
for (int j = 0; j < iterations; j++) {
// Simulate continious get and update operations and consume invalidation events meanwhile
Integer value = new Integer(jedis.get("foo")) + 1;
assertEquals("OK", jedis.set("foo", value.toString()));
}
} finally {
latch.countDown();
}
} finally {
latch.countDown();
}
});
}
});
}

// wait for all threads to complete
latch.await();
// wait for all threads to complete
latch.await();
}

CacheStats stats = testCache.getStats();
assertEquals(threadCount * iterations, stats.getMissCount() + stats.getHitCount());
Expand Down

0 comments on commit 0527b2c

Please sign in to comment.