Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid QUIT for broken connections #2336

Merged
merged 7 commits into from
Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,17 @@ save ""
appendonly no
endef

# UNAVAILABLE REDIS NODES
define REDIS_UNAVAILABLE_CONF
daemonize yes
protected-mode no
port 6400
pidfile /tmp/redis_unavailable.pid
logfile /tmp/redis_unavailable.log
save ""
appendonly no
endef

#STUNNEL
define STUNNEL_CONF
cert = src/test/resources/private.pem
Expand Down Expand Up @@ -278,6 +289,7 @@ export REDIS_CLUSTER_NODE3_CONF
export REDIS_CLUSTER_NODE4_CONF
export REDIS_CLUSTER_NODE5_CONF
export REDIS_UDS
export REDIS_UNAVAILABLE_CONF
export STUNNEL_CONF
export STUNNEL_BIN

Expand Down Expand Up @@ -309,6 +321,7 @@ start: stunnel cleanup
echo "$$REDIS_CLUSTER_NODE4_CONF" | redis-server -
echo "$$REDIS_CLUSTER_NODE5_CONF" | redis-server -
echo "$$REDIS_UDS" | redis-server -
echo "$$REDIS_UNAVAILABLE_CONF" | redis-server -

cleanup:
- rm -vf /tmp/redis_cluster_node*.conf 2>/dev/null
Expand Down Expand Up @@ -338,6 +351,7 @@ stop:
kill `cat /tmp/redis_cluster_node5.pid` || true
kill `cat /tmp/redis_uds.pid` || true
kill `cat /tmp/stunnel.pid` || true
# kill `cat /tmp/redis_unavailable.pid` || true
rm -f /tmp/sentinel1.conf
rm -f /tmp/sentinel2.conf
rm -f /tmp/sentinel3.conf
Expand Down
14 changes: 9 additions & 5 deletions src/main/java/redis/clients/jedis/BinaryJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ private void initializeClientFromURI(URI uri, final SSLSocketFactory sslSocketFa
}
}

public boolean isConnected() {
return client.isConnected();
}

public boolean isBroken() {
return client.isBroken();
}

@Override
public String ping() {
checkIsInMultiOrPipeline();
Expand Down Expand Up @@ -2044,7 +2052,7 @@ public void disconnect() {
}

public void resetState() {
if (client.isConnected()) {
if (isConnected()) {
if (transaction != null) {
transaction.close();
}
Expand Down Expand Up @@ -3298,10 +3306,6 @@ public byte[] configSet(final byte[] parameter, final byte[] value) {
return client.getBinaryBulkReply();
}

public boolean isConnected() {
return client.isConnected();
}

@Override
public Long strlen(final byte[] key) {
checkIsInMultiOrPipeline();
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/redis/clients/jedis/BinaryShardedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.commands.BinaryJedisCommands;
import redis.clients.jedis.commands.ProtocolCommand;
Expand All @@ -19,6 +21,8 @@
public class BinaryShardedJedis extends Sharded<Jedis, JedisShardInfo> implements
BinaryJedisCommands {

private final Logger logger = LoggerFactory.getLogger(getClass());

private final byte[][] dummyArray = new byte[0][];

public BinaryShardedJedis(List<JedisShardInfo> shards) {
Expand All @@ -41,14 +45,19 @@ public void disconnect() {
for (Jedis jedis : getAllShards()) {
if (jedis.isConnected()) {
try {
jedis.quit();
// need a proper test, probably with mock
if (!jedis.isBroken()) {
jedis.quit();
}
} catch (JedisConnectionException e) {
// ignore the exception node, so that all other normal nodes can release all connections.
logger.error("Error while QUIT", e);
}
try {
jedis.disconnect();
} catch (JedisConnectionException e) {
// ignore the exception node, so that all other normal nodes can release all connections.
logger.error("Error while disconnect", e);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -3604,7 +3604,7 @@ public void close() {
if (dataSource != null) {
JedisPoolAbstract pool = this.dataSource;
this.dataSource = null;
if (client.isBroken()) {
if (isBroken()) {
pool.returnBrokenResource(this);
} else {
pool.returnResource(this);
Expand Down
14 changes: 12 additions & 2 deletions src/main/java/redis/clients/jedis/JedisFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.exceptions.InvalidURIException;
import redis.clients.jedis.exceptions.JedisException;
Expand All @@ -19,6 +21,9 @@
* PoolableObjectFactory custom impl.
*/
class JedisFactory implements PooledObjectFactory<Jedis> {

private final Logger logger = LoggerFactory.getLogger(getClass());

private final AtomicReference<HostAndPort> hostAndPort = new AtomicReference<>();
private final int connectionTimeout;
private final int soTimeout;
Expand Down Expand Up @@ -129,12 +134,17 @@ public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {
final BinaryJedis jedis = pooledJedis.getObject();
if (jedis.isConnected()) {
try {
try {
// need a proper test, probably with mock
if (!jedis.isBroken()) {
jedis.quit();
} catch (Exception e) {
}
} catch (Exception e) {
logger.error("Error while QUIT", e);
}
try {
jedis.disconnect();
} catch (Exception e) {
logger.error("Error while disconnect", e);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/ShardedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ public void close() {
boolean broken = false;

for (Jedis jedis : getAllShards()) {
if (jedis.getClient().isBroken()) {
if (jedis.isBroken()) {
broken = true;
break;
}
Expand Down
16 changes: 12 additions & 4 deletions src/main/java/redis/clients/jedis/ShardedJedisPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.util.Hashing;
import redis.clients.jedis.util.Pool;
Expand Down Expand Up @@ -57,6 +59,9 @@ protected void returnResource(final ShardedJedis resource) {
* PoolableObjectFactory custom impl.
*/
private static class ShardedJedisFactory implements PooledObjectFactory<ShardedJedis> {

private final Logger logger = LoggerFactory.getLogger(getClass());

private List<JedisShardInfo> shards;
private Hashing algo;
private Pattern keyTagPattern;
Expand All @@ -79,14 +84,17 @@ public void destroyObject(PooledObject<ShardedJedis> pooledShardedJedis) throws
for (Jedis jedis : shardedJedis.getAllShards()) {
if (jedis.isConnected()) {
try {
try {
// need a proper test, probably with mock
if (!jedis.isBroken()) {
jedis.quit();
} catch (Exception e) {

}
} catch (Exception e) {
logger.error("Error while QUIT", e);
}
try {
jedis.disconnect();
} catch (Exception e) {

logger.error("Error while disconnect", e);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/redis/clients/jedis/tests/JedisPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public void closeBrokenResourceTwice() {
fail();
} catch (Exception e) {
}
assertTrue(j.getClient().isBroken());
assertTrue(j.isBroken());
j.close();
j.close();
}
Expand Down
29 changes: 14 additions & 15 deletions src/test/java/redis/clients/jedis/tests/JedisTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,17 @@ public void timeoutConnectionWithURI() throws Exception {

@Test
public void infiniteTimeout() throws Exception {
Jedis jedis = new Jedis("localhost", 6379, 350, 350, 350);
jedis.auth("foobared");
try {
jedis.blpop(0, "foo");
fail("SocketTimeoutException should occur");
} catch(JedisConnectionException jce) {
assertEquals(java.net.SocketTimeoutException.class, jce.getCause().getClass());
assertEquals("Read timed out", jce.getCause().getMessage());
assertTrue(jedis.getClient().isBroken());
try (Jedis timeoutJedis = new Jedis("localhost", 6379, 350, 350, 350)) {
timeoutJedis.auth("foobared");
try {
timeoutJedis.blpop(0, "foo");
fail("SocketTimeoutException should occur");
} catch(JedisConnectionException jce) {
assertEquals(java.net.SocketTimeoutException.class, jce.getCause().getClass());
assertEquals("Read timed out", jce.getCause().getMessage());
assertTrue(timeoutJedis.isBroken());
}
}
jedis.close();
}

@Test(expected = JedisDataException.class)
Expand Down Expand Up @@ -192,16 +192,15 @@ public void allowUrlWithNoDBAndNoPassword() {

@Test
public void checkCloseable() {
jedis.close();
BinaryJedis bj = new BinaryJedis("localhost");
bj.connect();
bj.close();
try (BinaryJedis bj = new BinaryJedis("localhost")) {
bj.connect();
}
}

@Test
public void checkDisconnectOnQuit() {
jedis.quit();
assertFalse(jedis.getClient().isConnected());
assertFalse(jedis.isConnected());
}

}
20 changes: 10 additions & 10 deletions src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package redis.clients.jedis.tests;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
Expand Down Expand Up @@ -64,20 +64,20 @@ public void testAvoidLeaksUponDisconnect() throws InterruptedException {

ClientKillerUtil.killClient(deadClient, "DEAD");

assertEquals(true, deadClient.isConnected());
assertEquals(false, deadClient.getClient().getSocket().isClosed());
assertEquals(false, deadClient.getClient().isBroken()); // normal - not found
assertTrue(deadClient.isConnected());
assertFalse(deadClient.getClient().getSocket().isClosed());
assertFalse(deadClient.isBroken()); // normal - not found

shardedJedis.disconnect();

assertEquals(false, deadClient.isConnected());
assertEquals(true, deadClient.getClient().getSocket().isClosed());
assertEquals(true, deadClient.getClient().isBroken());
assertFalse(deadClient.isConnected());
assertTrue(deadClient.getClient().getSocket().isClosed());
assertTrue(deadClient.isBroken());

Jedis jedis2 = it.next();
assertEquals(false, jedis2.isConnected());
assertEquals(true, jedis2.getClient().getSocket().isClosed());
assertEquals(false, jedis2.getClient().isBroken());
assertFalse(jedis2.isConnected());
assertTrue(jedis2.getClient().getSocket().isClosed());
assertFalse(jedis2.isBroken());

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package redis.clients.jedis.tests;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.junit.BeforeClass;
import org.junit.Test;

import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.exceptions.JedisConnectionException;

public class UnavailableConnectionTest {

private static final HostAndPort unavailableHostAndPort = new HostAndPort("localhost", 6400);

@BeforeClass
public static void setup() {
setupAvoidQuitInDestroyObject();

try (Jedis j = new Jedis(unavailableHostAndPort)) {
j.shutdown();
}
}

public static void cleanup() {
cleanupAvoidQuitInDestroyObject();
}

private static JedisPool poolForBrokenJedis1;
private static Thread threadForBrokenJedis1;
private static Jedis brokenJedis1;

public static void setupAvoidQuitInDestroyObject() {
GenericObjectPoolConfig<Jedis> config = new GenericObjectPoolConfig<>();
config.setMaxTotal(1);
poolForBrokenJedis1 = new JedisPool(config, unavailableHostAndPort.getHost(), unavailableHostAndPort.getPort());
brokenJedis1 = poolForBrokenJedis1.getResource();
threadForBrokenJedis1 = new Thread(new Runnable() {
@Override
public void run() {
brokenJedis1.blpop(0, "broken-key-1");
}
});
threadForBrokenJedis1.start();
}

@Test(timeout = 5000)
public void testAvoidQuitInDestroyObjectForBrokenConnection() throws InterruptedException {
threadForBrokenJedis1.join();
assertFalse(threadForBrokenJedis1.isAlive());
assertTrue(brokenJedis1.isBroken());
brokenJedis1.close(); // we need capture/mock to test this properly

try {
poolForBrokenJedis1.getResource();
fail("Should not get connection from pool");
} catch(Exception ex) {
assertEquals(JedisConnectionException.class, ex.getClass());
assertEquals(JedisConnectionException.class, ex.getCause().getClass());
assertEquals(java.net.ConnectException.class, ex.getCause().getCause().getClass());
}
}

public static void cleanupAvoidQuitInDestroyObject() {
poolForBrokenJedis1.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public void scriptExistsWithBrokenConnection() {
// ignore it
}

assertEquals(true, deadClient.getClient().isBroken());
assertEquals(true, deadClient.isBroken());

deadClient.close();
}
Expand Down