Skip to content

Commit

Permalink
Avoid QUIT for broken connections (#2336)
Browse files Browse the repository at this point in the history
* Avoid QUIT for broken connections

* Added a test with a Redis instance being killed

* modify logging practice

* modify logging practice

* modify logging practice

* kill unavailable redis with checking existence of pid

Co-authored-by: Mina Asham <[email protected]>
  • Loading branch information
sazzad16 and mina-asham authored Feb 17, 2021
1 parent aa0158c commit 6f08468
Show file tree
Hide file tree
Showing 12 changed files with 163 additions and 46 deletions.
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,17 @@ save ""
appendonly no
endef

# UNAVAILABLE REDIS NODES
define REDIS_UNAVAILABLE_CONF
daemonize yes
protected-mode no
port 6400
pidfile /tmp/redis_unavailable.pid
logfile /tmp/redis_unavailable.log
save ""
appendonly no
endef

#STUNNEL
define STUNNEL_CONF
cert = src/test/resources/private.pem
Expand Down Expand Up @@ -278,6 +289,7 @@ export REDIS_CLUSTER_NODE3_CONF
export REDIS_CLUSTER_NODE4_CONF
export REDIS_CLUSTER_NODE5_CONF
export REDIS_UDS
export REDIS_UNAVAILABLE_CONF
export STUNNEL_CONF
export STUNNEL_BIN

Expand Down Expand Up @@ -309,6 +321,7 @@ start: stunnel cleanup
echo "$$REDIS_CLUSTER_NODE4_CONF" | redis-server -
echo "$$REDIS_CLUSTER_NODE5_CONF" | redis-server -
echo "$$REDIS_UDS" | redis-server -
echo "$$REDIS_UNAVAILABLE_CONF" | redis-server -

cleanup:
- rm -vf /tmp/redis_cluster_node*.conf 2>/dev/null
Expand Down Expand Up @@ -338,6 +351,7 @@ stop:
kill `cat /tmp/redis_cluster_node5.pid` || true
kill `cat /tmp/redis_uds.pid` || true
kill `cat /tmp/stunnel.pid` || true
[ -f /tmp/redis_unavailable.pid ] && kill `cat /tmp/redis_unavailable.pid` || true
rm -f /tmp/sentinel1.conf
rm -f /tmp/sentinel2.conf
rm -f /tmp/sentinel3.conf
Expand Down
14 changes: 9 additions & 5 deletions src/main/java/redis/clients/jedis/BinaryJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ private void initializeClientFromURI(URI uri, final SSLSocketFactory sslSocketFa
}
}

public boolean isConnected() {
return client.isConnected();
}

public boolean isBroken() {
return client.isBroken();
}

@Override
public String ping() {
checkIsInMultiOrPipeline();
Expand Down Expand Up @@ -2044,7 +2052,7 @@ public void disconnect() {
}

public void resetState() {
if (client.isConnected()) {
if (isConnected()) {
if (transaction != null) {
transaction.close();
}
Expand Down Expand Up @@ -3298,10 +3306,6 @@ public byte[] configSet(final byte[] parameter, final byte[] value) {
return client.getBinaryBulkReply();
}

public boolean isConnected() {
return client.isConnected();
}

@Override
public Long strlen(final byte[] key) {
checkIsInMultiOrPipeline();
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/redis/clients/jedis/BinaryShardedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.commands.BinaryJedisCommands;
import redis.clients.jedis.commands.ProtocolCommand;
Expand All @@ -19,6 +21,8 @@
public class BinaryShardedJedis extends Sharded<Jedis, JedisShardInfo> implements
BinaryJedisCommands {

private static final Logger logger = LoggerFactory.getLogger(BinaryShardedJedis.class);

private final byte[][] dummyArray = new byte[0][];

public BinaryShardedJedis(List<JedisShardInfo> shards) {
Expand All @@ -41,14 +45,19 @@ public void disconnect() {
for (Jedis jedis : getAllShards()) {
if (jedis.isConnected()) {
try {
jedis.quit();
// need a proper test, probably with mock
if (!jedis.isBroken()) {
jedis.quit();
}
} catch (JedisConnectionException e) {
// ignore the exception node, so that all other normal nodes can release all connections.
logger.warn("Error while QUIT", e);
}
try {
jedis.disconnect();
} catch (JedisConnectionException e) {
// ignore the exception node, so that all other normal nodes can release all connections.
logger.warn("Error while disconnect", e);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -3608,7 +3608,7 @@ public void close() {
if (dataSource != null) {
JedisPoolAbstract pool = this.dataSource;
this.dataSource = null;
if (client.isBroken()) {
if (isBroken()) {
pool.returnBrokenResource(this);
} else {
pool.returnResource(this);
Expand Down
16 changes: 13 additions & 3 deletions src/main/java/redis/clients/jedis/JedisFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.exceptions.InvalidURIException;
import redis.clients.jedis.exceptions.JedisException;
Expand All @@ -19,6 +21,9 @@
* PoolableObjectFactory custom impl.
*/
class JedisFactory implements PooledObjectFactory<Jedis> {

private static final Logger logger = LoggerFactory.getLogger(JedisFactory.class);

private final AtomicReference<HostAndPort> hostAndPort = new AtomicReference<>();
private final int connectionTimeout;
private final int soTimeout;
Expand Down Expand Up @@ -129,12 +134,17 @@ public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {
final BinaryJedis jedis = pooledJedis.getObject();
if (jedis.isConnected()) {
try {
try {
// need a proper test, probably with mock
if (!jedis.isBroken()) {
jedis.quit();
} catch (Exception e) {
}
} catch (Exception e) {
logger.warn("Error while QUIT", e);
}
try {
jedis.disconnect();
} catch (Exception e) {
logger.warn("Error while disconnect", e);
}
}
}
Expand Down Expand Up @@ -187,4 +197,4 @@ public boolean validateObject(PooledObject<Jedis> pooledJedis) {
return false;
}
}
}
}
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/ShardedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ public void close() {
boolean broken = false;

for (Jedis jedis : getAllShards()) {
if (jedis.getClient().isBroken()) {
if (jedis.isBroken()) {
broken = true;
break;
}
Expand Down
25 changes: 17 additions & 8 deletions src/main/java/redis/clients/jedis/ShardedJedisPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.util.Hashing;
import redis.clients.jedis.util.Pool;

public class ShardedJedisPool extends Pool<ShardedJedis> {

private static final Logger logger = LoggerFactory.getLogger(ShardedJedisPool.class);

public ShardedJedisPool(final GenericObjectPoolConfig poolConfig, List<JedisShardInfo> shards) {
this(poolConfig, shards, Hashing.MURMUR_HASH);
}
Expand Down Expand Up @@ -50,9 +55,10 @@ public void returnResource(final ShardedJedis resource) {
* PoolableObjectFactory custom impl.
*/
private static class ShardedJedisFactory implements PooledObjectFactory<ShardedJedis> {
private List<JedisShardInfo> shards;
private Hashing algo;
private Pattern keyTagPattern;

private final List<JedisShardInfo> shards;
private final Hashing algo;
private final Pattern keyTagPattern;

public ShardedJedisFactory(List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {
this.shards = shards;
Expand All @@ -72,14 +78,17 @@ public void destroyObject(PooledObject<ShardedJedis> pooledShardedJedis) throws
for (Jedis jedis : shardedJedis.getAllShards()) {
if (jedis.isConnected()) {
try {
try {
// need a proper test, probably with mock
if (!jedis.isBroken()) {
jedis.quit();
} catch (Exception e) {

}
} catch (Exception e) {
logger.warn("Error while QUIT", e);
}
try {
jedis.disconnect();
} catch (Exception e) {

logger.warn("Error while disconnect", e);
}
}
}
Expand Down Expand Up @@ -110,4 +119,4 @@ public void passivateObject(PooledObject<ShardedJedis> p) throws Exception {

}
}
}
}
2 changes: 1 addition & 1 deletion src/test/java/redis/clients/jedis/tests/JedisPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public void closeBrokenResourceTwice() {
fail();
} catch (Exception e) {
}
assertTrue(j.getClient().isBroken());
assertTrue(j.isBroken());
j.close();
j.close();
}
Expand Down
29 changes: 14 additions & 15 deletions src/test/java/redis/clients/jedis/tests/JedisTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,17 @@ public void timeoutConnectionWithURI() throws Exception {

@Test
public void infiniteTimeout() throws Exception {
Jedis jedis = new Jedis("localhost", 6379, 350, 350, 350);
jedis.auth("foobared");
try {
jedis.blpop(0, "foo");
fail("SocketTimeoutException should occur");
} catch(JedisConnectionException jce) {
assertEquals(java.net.SocketTimeoutException.class, jce.getCause().getClass());
assertEquals("Read timed out", jce.getCause().getMessage());
assertTrue(jedis.getClient().isBroken());
try (Jedis timeoutJedis = new Jedis("localhost", 6379, 350, 350, 350)) {
timeoutJedis.auth("foobared");
try {
timeoutJedis.blpop(0, "foo");
fail("SocketTimeoutException should occur");
} catch(JedisConnectionException jce) {
assertEquals(java.net.SocketTimeoutException.class, jce.getCause().getClass());
assertEquals("Read timed out", jce.getCause().getMessage());
assertTrue(timeoutJedis.isBroken());
}
}
jedis.close();
}

@Test(expected = JedisDataException.class)
Expand Down Expand Up @@ -192,16 +192,15 @@ public void allowUrlWithNoDBAndNoPassword() {

@Test
public void checkCloseable() {
jedis.close();
BinaryJedis bj = new BinaryJedis("localhost");
bj.connect();
bj.close();
try (BinaryJedis bj = new BinaryJedis("localhost")) {
bj.connect();
}
}

@Test
public void checkDisconnectOnQuit() {
jedis.quit();
assertFalse(jedis.getClient().isConnected());
assertFalse(jedis.isConnected());
}

}
20 changes: 10 additions & 10 deletions src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package redis.clients.jedis.tests;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
Expand Down Expand Up @@ -64,20 +64,20 @@ public void testAvoidLeaksUponDisconnect() throws InterruptedException {

ClientKillerUtil.killClient(deadClient, "DEAD");

assertEquals(true, deadClient.isConnected());
assertEquals(false, deadClient.getClient().getSocket().isClosed());
assertEquals(false, deadClient.getClient().isBroken()); // normal - not found
assertTrue(deadClient.isConnected());
assertFalse(deadClient.getClient().getSocket().isClosed());
assertFalse(deadClient.isBroken()); // normal - not found

shardedJedis.disconnect();

assertEquals(false, deadClient.isConnected());
assertEquals(true, deadClient.getClient().getSocket().isClosed());
assertEquals(true, deadClient.getClient().isBroken());
assertFalse(deadClient.isConnected());
assertTrue(deadClient.getClient().getSocket().isClosed());
assertTrue(deadClient.isBroken());

Jedis jedis2 = it.next();
assertEquals(false, jedis2.isConnected());
assertEquals(true, jedis2.getClient().getSocket().isClosed());
assertEquals(false, jedis2.getClient().isBroken());
assertFalse(jedis2.isConnected());
assertTrue(jedis2.getClient().getSocket().isClosed());
assertFalse(jedis2.isBroken());

}

Expand Down
Loading

0 comments on commit 6f08468

Please sign in to comment.