Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Sniffer and make it testable #29638

Merged
merged 25 commits into from
May 31, 2018
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e581c56
Simplify `Sniffer` and begin testing it properly
javanna Apr 4, 2018
900d3aa
Merge branch 'master' into enhancement/rest_client_sniffer_tests
javanna Apr 10, 2018
0d8a270
some improvements
javanna Apr 10, 2018
c9ec5a8
tests rewritten
javanna Apr 12, 2018
1858a4c
Merge branch 'master' into enhancement/rest_client_sniffer_tests
javanna Apr 20, 2018
4c886a0
refactor and add tests
javanna Apr 20, 2018
c2a88a3
improve tests
javanna Apr 20, 2018
5d7c636
address first review comments
javanna Apr 20, 2018
9407f80
Merge branch 'master' into enhancement/rest_client_sniffer_tests
javanna Apr 30, 2018
9658886
adapt NodeFailureListener
javanna Apr 30, 2018
4094a4e
fixed warnings
javanna Apr 30, 2018
f3baab3
Adapt HttpExporter
javanna Apr 30, 2018
07dce21
adapt NodeFailureListenerTests
javanna Apr 30, 2018
5603296
Merge branch 'master' into enhancement/rest_client_sniffer_tests
javanna May 1, 2018
440bd5c
Merge branch 'master' into enhancement/rest_client_sniffer_tests
javanna May 14, 2018
2cf41b0
Merge branch 'master' into enhancement/rest_client_sniffer_tests
javanna May 17, 2018
dccb673
Add tests around cancelling tasks and make impl more robust
javanna May 25, 2018
880b6a9
Merge branch 'master' into enhancement/rest_client_sniffer_tests
javanna May 25, 2018
1d5ac98
Merge branch 'master' into enhancement/rest_client_sniffer_tests
javanna May 28, 2018
1759215
update comments
javanna May 28, 2018
44b5178
addressed comments
javanna May 30, 2018
2dd307e
Merge branch 'master' into enhancement/rest_client_sniffer_tests
javanna May 30, 2018
8b22063
Merge branch 'master' into enhancement/rest_client_sniffer_tests
javanna May 30, 2018
8a8d348
Merge branch 'master' into enhancement/rest_client_sniffer_tests
javanna May 31, 2018
f80b119
Merge branch 'master' into enhancement/rest_client_sniffer_tests
javanna May 31, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -132,7 +133,7 @@ public synchronized void setHosts(HttpHost... hosts) {
if (hosts == null || hosts.length == 0) {
throw new IllegalArgumentException("hosts must not be null nor empty");
}
Set<HttpHost> httpHosts = new HashSet<>();
Set<HttpHost> httpHosts = new LinkedHashSet<>();
AuthCache authCache = new BasicAuthCache();
for (HttpHost host : hosts) {
Objects.requireNonNull(host, "host cannot be null");
Expand All @@ -143,6 +144,13 @@ public synchronized void setHosts(HttpHost... hosts) {
this.blacklist.clear();
}

/**
* Returns the configured hosts
*/
public List<HttpHost> getHosts() {
return new ArrayList<>(hostTuple.hosts);
}

/**
* Sends a request to the Elasticsearch cluster that the client points to and waits for the corresponding response
* to be returned. Shortcut to {@link #performRequest(String, String, Map, HttpEntity, Header...)} but without parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;

Expand Down Expand Up @@ -175,6 +176,37 @@ public void testSetHostsWrongArguments() throws IOException {
}
}

public void testSetHostsPreservesOrdering() throws Exception {
try (RestClient restClient = createRestClient()) {
HttpHost[] hosts = randomHosts();
restClient.setHosts(hosts);
assertEquals(Arrays.asList(hosts), restClient.getHosts());
}
}

private static HttpHost[] randomHosts() {
int numHosts = randomIntBetween(1, 10);
HttpHost[] hosts = new HttpHost[numHosts];
for (int i = 0; i < hosts.length; i++) {
hosts[i] = new HttpHost("host-" + i, 9200);
}
return hosts;
}

public void testSetHostsDuplicatedHosts() throws Exception {
try (RestClient restClient = createRestClient()) {
int numHosts = randomIntBetween(1, 10);
HttpHost[] hosts = new HttpHost[numHosts];
HttpHost host = new HttpHost("host", 9200);
for (int i = 0; i < hosts.length; i++) {
hosts[i] = host;
}
restClient.setHosts(hosts);
assertEquals(1, restClient.getHosts().size());
assertEquals(host, restClient.getHosts().get(0));
}
}

public void testNullPath() throws IOException {
try (RestClient restClient = createRestClient()) {
for (String method : getHttpMethods()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public void onFailure(HttpHost host) {
if (sniffer == null) {
throw new IllegalStateException("sniffer was not set, unable to sniff on failure");
}
//re-sniff immediately but take out the node that failed
sniffer.sniffOnFailure(host);
sniffer.sniffOnFailure();
}
}
190 changes: 112 additions & 78 deletions client/sniffer/src/main/java/org/elasticsearch/client/sniff/Sniffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@
import java.security.PrivilegedAction;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
* Class responsible for sniffing nodes from some source (default is elasticsearch itself) and setting them to a provided instance of
Expand All @@ -51,101 +52,80 @@ public class Sniffer implements Closeable {
private static final Log logger = LogFactory.getLog(Sniffer.class);
private static final String SNIFFER_THREAD_NAME = "es_rest_client_sniffer";

private final Task task;
private final HostsSniffer hostsSniffer;
private final RestClient restClient;

private final long sniffIntervalMillis;
private final long sniffAfterFailureDelayMillis;
private final Scheduler scheduler;

private final AtomicReference<Future<?>> nextTask = new AtomicReference<>();

Sniffer(RestClient restClient, HostsSniffer hostsSniffer, long sniffInterval, long sniffAfterFailureDelay) {
this.task = new Task(hostsSniffer, restClient, sniffInterval, sniffAfterFailureDelay);
this(restClient, hostsSniffer, new DefaultScheduler(), sniffInterval, sniffAfterFailureDelay);
}

Sniffer(RestClient restClient, HostsSniffer hostsSniffer, Scheduler scheduler, long sniffInterval, long sniffAfterFailureDelay) {
this.hostsSniffer = hostsSniffer;
this.restClient = restClient;
this.sniffIntervalMillis = sniffInterval;
this.sniffAfterFailureDelayMillis = sniffAfterFailureDelay;
this.scheduler = scheduler;
//first sniffing round is immediately executed, next one will be executed depending on the configured sniff interval
scheduleNextRound(0L, sniffIntervalMillis, false);
}

/**
* Triggers a new sniffing round and explicitly takes out the failed host provided as argument
* Triggers a new immediate sniffing round, which will schedule a new round in sniffAfterFailureDelayMillis ms
*/
public void sniffOnFailure(HttpHost failedHost) {
this.task.sniffOnFailure(failedHost);
public final void sniffOnFailure() {
scheduleNextRound(0L, sniffAfterFailureDelayMillis, true);
}

@Override
public void close() throws IOException {
task.shutdown();
private void scheduleNextRound(long delay, long nextDelay, boolean mustCancelNextRound) {
Task task = new Task(nextDelay);
Future<?> nextFuture = scheduler.schedule(task, delay);
Future<?> previousFuture = nextTask.getAndSet(nextFuture);
if (mustCancelNextRound) {
previousFuture.cancel(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I was playing with cancellation as part of reindex I found that canceling a Runnable was sort of "best effort". If you make a test that calls sniffOnFailure a bunch of time really fast together I'll bet you get multiple rounds of sniffing in parallel.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it depends on whether the runnable has already started or not? That is what I've seen, but hard to test in real-life though from a unit test...

}
}

private static class Task implements Runnable {
private final HostsSniffer hostsSniffer;
private final RestClient restClient;

private final long sniffIntervalMillis;
private final long sniffAfterFailureDelayMillis;
private final ScheduledExecutorService scheduledExecutorService;
private final AtomicBoolean running = new AtomicBoolean(false);
private ScheduledFuture<?> scheduledFuture;

private Task(HostsSniffer hostsSniffer, RestClient restClient, long sniffIntervalMillis, long sniffAfterFailureDelayMillis) {
this.hostsSniffer = hostsSniffer;
this.restClient = restClient;
this.sniffIntervalMillis = sniffIntervalMillis;
this.sniffAfterFailureDelayMillis = sniffAfterFailureDelayMillis;
SnifferThreadFactory threadFactory = new SnifferThreadFactory(SNIFFER_THREAD_NAME);
this.scheduledExecutorService = Executors.newScheduledThreadPool(1, threadFactory);
scheduleNextRun(0);
}
final class Task implements Runnable {
final long nextTaskDelay;

synchronized void scheduleNextRun(long delayMillis) {
if (scheduledExecutorService.isShutdown() == false) {
try {
if (scheduledFuture != null) {
//regardless of when the next sniff is scheduled, cancel it and schedule a new one with updated delay
this.scheduledFuture.cancel(false);
}
logger.debug("scheduling next sniff in " + delayMillis + " ms");
this.scheduledFuture = this.scheduledExecutorService.schedule(this, delayMillis, TimeUnit.MILLISECONDS);
} catch(Exception e) {
logger.error("error while scheduling next sniffer task", e);
}
}
Task(long nextTaskDelay) {
this.nextTaskDelay = nextTaskDelay;
}

@Override
public void run() {
sniff(null, sniffIntervalMillis);
try {
sniff();
} catch (Exception e) {
logger.error("error while sniffing nodes", e);
} finally {
scheduleNextRound(nextTaskDelay, sniffIntervalMillis, false);
}
}
}

void sniffOnFailure(HttpHost failedHost) {
sniff(failedHost, sniffAfterFailureDelayMillis);
final void sniff() throws IOException {
List<HttpHost> sniffedHosts = hostsSniffer.sniffHosts();
if (logger.isDebugEnabled()) {
logger.debug("sniffed hosts: " + sniffedHosts);
}

void sniff(HttpHost excludeHost, long nextSniffDelayMillis) {
if (running.compareAndSet(false, true)) {
try {
List<HttpHost> sniffedHosts = hostsSniffer.sniffHosts();
logger.debug("sniffed hosts: " + sniffedHosts);
if (excludeHost != null) {
sniffedHosts.remove(excludeHost);
}
if (sniffedHosts.isEmpty()) {
logger.warn("no hosts to set, hosts will be updated at the next sniffing round");
} else {
this.restClient.setHosts(sniffedHosts.toArray(new HttpHost[sniffedHosts.size()]));
}
} catch (Exception e) {
logger.error("error while sniffing nodes", e);
} finally {
scheduleNextRun(nextSniffDelayMillis);
running.set(false);
}
}
if (sniffedHosts.isEmpty()) {
logger.warn("no hosts to set, hosts will be updated at the next sniffing round");
} else {
restClient.setHosts(sniffedHosts.toArray(new HttpHost[sniffedHosts.size()]));
}
}

synchronized void shutdown() {
scheduledExecutorService.shutdown();
try {
if (scheduledExecutorService.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
return;
}
scheduledExecutorService.shutdownNow();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void close() {
nextTask.get().cancel(false);
this.scheduler.shutdown();
}

/**
Expand All @@ -158,8 +138,62 @@ public static SnifferBuilder builder(RestClient restClient) {
return new SnifferBuilder(restClient);
}

private static class SnifferThreadFactory implements ThreadFactory {
/**
* The Scheduler interface allows to isolate the sniffing scheduling aspects so that we can test
* the sniffer by injecting when needed a custom scheduler that is more suited for testing.
*/
interface Scheduler {
/**
* Schedules the provided {@link Runnable} to be executed in <code>delayMillis</code> milliseconds
*/
Future<?> schedule(Task task, long delayMillis);

/**
* Shuts this scheduler down
*/
void shutdown();
}

/**
* Default implementation of {@link Scheduler}, based on {@link ScheduledExecutorService}
*/
static final class DefaultScheduler implements Scheduler {
final ScheduledExecutorService executor;

DefaultScheduler() {
this(initScheduledExecutorService());
}

DefaultScheduler(ScheduledExecutorService executor) {
this.executor = executor;
}

private static ScheduledExecutorService initScheduledExecutorService() {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new SnifferThreadFactory(SNIFFER_THREAD_NAME));
executor.setRemoveOnCancelPolicy(true);
return executor;
}

@Override
public Future<?> schedule(Task task, long delayMillis) {
return executor.schedule(task, delayMillis, TimeUnit.MILLISECONDS);
}

@Override
public void shutdown() {
executor.shutdown();
try {
if (executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
return;
}
executor.shutdownNow();
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
}

static class SnifferThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private final ThreadFactory originalThreadFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.http.HttpHost;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

Expand All @@ -30,7 +29,7 @@
*/
class MockHostsSniffer implements HostsSniffer {
@Override
public List<HttpHost> sniffHosts() throws IOException {
public List<HttpHost> sniffHosts() {
return Collections.singletonList(new HttpHost("localhost", 9200));
}
}
Loading