Skip to content

Commit 93a2db8

Browse files
Remove some overhead from TransportService message handling
Avoiding some indirection, volatile-reads and moving the listener functionality that needlessly kept iterating an empty CoW list (creating iterator instances, volatile reads, more code) in an effort to improve the low IPC on transport threads.
1 parent b9a9784 commit 93a2db8

File tree

4 files changed

+119
-81
lines changed

4 files changed

+119
-81
lines changed

server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@
6666
import org.elasticsearch.transport.TransportMessageListener;
6767
import org.elasticsearch.transport.TransportRequest;
6868
import org.elasticsearch.transport.TransportRequestOptions;
69-
import org.elasticsearch.transport.TransportService;
7069

7170
import java.io.IOException;
7271
import java.io.UncheckedIOException;
@@ -1052,8 +1051,7 @@ public void testAbortWaitsOnDataNode() throws Exception {
10521051

10531052
final AtomicBoolean blocked = new AtomicBoolean(true);
10541053

1055-
final TransportService transportService = internalCluster().getInstance(TransportService.class, otherDataNode);
1056-
transportService.addMessageListener(new TransportMessageListener() {
1054+
MockTransportService.getInstance(otherDataNode).addMessageListener(new TransportMessageListener() {
10571055
@Override
10581056
public void onRequestSent(
10591057
DiscoveryNode node,

server/src/main/java/org/elasticsearch/transport/TransportService.java

+8-69
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,9 @@
6060
import java.util.Map;
6161
import java.util.Objects;
6262
import java.util.Set;
63-
import java.util.concurrent.CopyOnWriteArrayList;
6463
import java.util.concurrent.CountDownLatch;
6564
import java.util.concurrent.Executor;
6665
import java.util.concurrent.TimeUnit;
67-
import java.util.concurrent.atomic.AtomicBoolean;
6866
import java.util.function.Function;
6967
import java.util.function.Predicate;
7068
import java.util.function.Supplier;
@@ -103,8 +101,7 @@ public class TransportService extends AbstractLifecycleComponent
103101
Setting.Property.Deprecated
104102
);
105103

106-
private final AtomicBoolean handleIncomingRequests = new AtomicBoolean();
107-
private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener();
104+
private volatile boolean handleIncomingRequests;
108105
protected final Transport transport;
109106
protected final ConnectionManager connectionManager;
110107
protected final ThreadPool threadPool;
@@ -134,7 +131,7 @@ protected boolean removeEldestEntry(Map.Entry<Long, TimeoutInfoHolder> eldest) {
134131

135132
// tracer log
136133

137-
private final Logger tracerLog;
134+
private static final Logger tracerLog = Loggers.getLogger(logger, ".tracer");
138135
private final Tracer tracer;
139136

140137
volatile String[] tracerLogInclude;
@@ -291,7 +288,6 @@ public TransportService(
291288
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
292289
setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings));
293290
setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings));
294-
tracerLog = Loggers.getLogger(logger, ".tracer");
295291
this.taskManager = taskManger;
296292
this.interceptor = transportInterceptor;
297293
this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
@@ -432,8 +428,8 @@ protected void doClose() throws IOException {
432428
* reject any incoming requests, including handshakes, by closing the connection.
433429
*/
434430
public final void acceptIncomingRequests() {
435-
final boolean startedWithThisCall = handleIncomingRequests.compareAndSet(false, true);
436-
assert startedWithThisCall : "transport service was already accepting incoming requests";
431+
assert handleIncomingRequests == false : "transport service was already accepting incoming requests";
432+
handleIncomingRequests = true;
437433
logger.debug("now accepting incoming requests");
438434
}
439435

@@ -750,14 +746,6 @@ public void disconnectFromNode(DiscoveryNode node) {
750746
connectionManager.disconnectFromNode(node);
751747
}
752748

753-
public void addMessageListener(TransportMessageListener listener) {
754-
messageListener.listeners.add(listener);
755-
}
756-
757-
public void removeMessageListener(TransportMessageListener listener) {
758-
messageListener.listeners.remove(listener);
759-
}
760-
761749
public void addConnectionListener(TransportConnectionListener listener) {
762750
connectionManager.addListener(listener);
763751
}
@@ -1265,13 +1253,12 @@ public <Request extends TransportRequest> void registerRequestHandler(
12651253
*/
12661254
@Override
12671255
public void onRequestReceived(long requestId, String action) {
1268-
if (handleIncomingRequests.get() == false) {
1256+
if (handleIncomingRequests == false) {
12691257
throw new TransportNotReadyException();
12701258
}
12711259
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
12721260
tracerLog.trace("[{}][{}] received request", requestId, action);
12731261
}
1274-
messageListener.onRequestReceived(requestId, action);
12751262
}
12761263

12771264
/** called by the {@link Transport} implementation once a request has been sent */
@@ -1286,7 +1273,6 @@ public void onRequestSent(
12861273
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
12871274
tracerLog.trace("[{}][{}] sent to [{}] (timeout: [{}])", requestId, action, node, options.timeout());
12881275
}
1289-
messageListener.onRequestSent(node, requestId, action, request, options);
12901276
}
12911277

12921278
@Override
@@ -1297,7 +1283,6 @@ public void onResponseReceived(long requestId, Transport.ResponseContext holder)
12971283
} else if (tracerLog.isTraceEnabled() && shouldTraceAction(holder.action())) {
12981284
tracerLog.trace("[{}][{}] received response from [{}]", requestId, holder.action(), holder.connection().getNode());
12991285
}
1300-
messageListener.onResponseReceived(requestId, holder);
13011286
}
13021287

13031288
/** called by the {@link Transport} implementation once a response was sent to calling node */
@@ -1306,7 +1291,6 @@ public void onResponseSent(long requestId, String action, TransportResponse resp
13061291
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
13071292
tracerLog.trace("[{}][{}] sent response", requestId, action);
13081293
}
1309-
messageListener.onResponseSent(requestId, action, response);
13101294
}
13111295

13121296
/** called by the {@link Transport} implementation after an exception was sent as a response to an incoming request */
@@ -1315,7 +1299,6 @@ public void onResponseSent(long requestId, String action, Exception e) {
13151299
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
13161300
tracerLog.trace(() -> format("[%s][%s] sent error response", requestId, action), e);
13171301
}
1318-
messageListener.onResponseSent(requestId, action, e);
13191302
}
13201303

13211304
public RequestHandlerRegistry<? extends TransportRequest> getRequestHandler(String action) {
@@ -1453,6 +1436,7 @@ public void run() {
14531436
public void cancel() {
14541437
assert responseHandlers.contains(requestId) == false
14551438
: "cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers";
1439+
var cancellable = this.cancellable;
14561440
if (cancellable != null) {
14571441
cancellable.cancel();
14581442
}
@@ -1492,6 +1476,7 @@ public T read(StreamInput in) throws IOException {
14921476

14931477
@Override
14941478
public void handleResponse(T response) {
1479+
var handler = this.handler;
14951480
if (handler != null) {
14961481
handler.cancel();
14971482
}
@@ -1502,6 +1487,7 @@ public void handleResponse(T response) {
15021487

15031488
@Override
15041489
public void handleException(TransportException exp) {
1490+
var handler = this.handler;
15051491
if (handler != null) {
15061492
handler.cancel();
15071493
}
@@ -1666,53 +1652,6 @@ private boolean isLocalNode(DiscoveryNode discoveryNode) {
16661652
return discoveryNode.equals(localNode);
16671653
}
16681654

1669-
private static final class DelegatingTransportMessageListener implements TransportMessageListener {
1670-
1671-
private final List<TransportMessageListener> listeners = new CopyOnWriteArrayList<>();
1672-
1673-
@Override
1674-
public void onRequestReceived(long requestId, String action) {
1675-
for (TransportMessageListener listener : listeners) {
1676-
listener.onRequestReceived(requestId, action);
1677-
}
1678-
}
1679-
1680-
@Override
1681-
public void onResponseSent(long requestId, String action, TransportResponse response) {
1682-
for (TransportMessageListener listener : listeners) {
1683-
listener.onResponseSent(requestId, action, response);
1684-
}
1685-
}
1686-
1687-
@Override
1688-
public void onResponseSent(long requestId, String action, Exception error) {
1689-
for (TransportMessageListener listener : listeners) {
1690-
listener.onResponseSent(requestId, action, error);
1691-
}
1692-
}
1693-
1694-
@Override
1695-
public void onRequestSent(
1696-
DiscoveryNode node,
1697-
long requestId,
1698-
String action,
1699-
TransportRequest request,
1700-
TransportRequestOptions finalOptions
1701-
) {
1702-
for (TransportMessageListener listener : listeners) {
1703-
listener.onRequestSent(node, requestId, action, request, finalOptions);
1704-
}
1705-
}
1706-
1707-
@Override
1708-
@SuppressWarnings("rawtypes")
1709-
public void onResponseReceived(long requestId, Transport.ResponseContext holder) {
1710-
for (TransportMessageListener listener : listeners) {
1711-
listener.onResponseReceived(requestId, holder);
1712-
}
1713-
}
1714-
}
1715-
17161655
private static class PendingDirectHandlers extends AbstractRefCounted {
17171656

17181657
// To handle a response we (i) remove the handler from responseHandlers and then (ii) enqueue an action to complete the handler on

test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java

+14-9
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.elasticsearch.ExceptionsHelper;
1313
import org.elasticsearch.test.InternalTestCluster;
14+
import org.elasticsearch.test.transport.MockTransportService;
1415
import org.elasticsearch.transport.TransportMessageListener;
1516
import org.elasticsearch.transport.TransportService;
1617

@@ -26,16 +27,20 @@ public enum ErrorTraceHelper {
2627

2728
public static BooleanSupplier setupErrorTraceListener(InternalTestCluster internalCluster) {
2829
final AtomicBoolean transportMessageHasStackTrace = new AtomicBoolean(false);
29-
internalCluster.getDataNodeInstances(TransportService.class).forEach(ts -> ts.addMessageListener(new TransportMessageListener() {
30-
@Override
31-
public void onResponseSent(long requestId, String action, Exception error) {
32-
TransportMessageListener.super.onResponseSent(requestId, action, error);
33-
if (action.startsWith("indices:data/read/search")) {
34-
Optional<Throwable> throwable = ExceptionsHelper.unwrapCausesAndSuppressed(error, t -> t.getStackTrace().length > 0);
35-
transportMessageHasStackTrace.set(throwable.isPresent());
30+
internalCluster.getDataNodeInstances(TransportService.class)
31+
.forEach(ts -> ((MockTransportService) ts).addMessageListener(new TransportMessageListener() {
32+
@Override
33+
public void onResponseSent(long requestId, String action, Exception error) {
34+
TransportMessageListener.super.onResponseSent(requestId, action, error);
35+
if (action.startsWith("indices:data/read/search")) {
36+
Optional<Throwable> throwable = ExceptionsHelper.unwrapCausesAndSuppressed(
37+
error,
38+
t -> t.getStackTrace().length > 0
39+
);
40+
transportMessageHasStackTrace.set(throwable.isPresent());
41+
}
3642
}
37-
}
38-
}));
43+
}));
3944
return transportMessageHasStackTrace::get;
4045
}
4146
}

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

+96
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,10 @@
5757
import org.elasticsearch.transport.TcpTransport;
5858
import org.elasticsearch.transport.Transport;
5959
import org.elasticsearch.transport.TransportInterceptor;
60+
import org.elasticsearch.transport.TransportMessageListener;
6061
import org.elasticsearch.transport.TransportRequest;
6162
import org.elasticsearch.transport.TransportRequestOptions;
63+
import org.elasticsearch.transport.TransportResponse;
6264
import org.elasticsearch.transport.TransportService;
6365
import org.elasticsearch.transport.TransportSettings;
6466
import org.elasticsearch.transport.netty4.Netty4Transport;
@@ -106,6 +108,8 @@ public class MockTransportService extends TransportService {
106108
private final List<Runnable> onStopListeners = new CopyOnWriteArrayList<>();
107109
private final AtomicReference<Consumer<Transport.Connection>> onConnectionClosedCallback = new AtomicReference<>();
108110

111+
private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener();
112+
109113
public static class TestPlugin extends Plugin {
110114
@Override
111115
public List<Setting<?>> getSettings() {
@@ -814,4 +818,96 @@ protected void doClose() throws IOException {
814818
assertTrue(ThreadPool.terminate(testExecutor, 10, TimeUnit.SECONDS));
815819
}
816820
}
821+
822+
@Override
823+
public void onRequestReceived(long requestId, String action) {
824+
super.onRequestReceived(requestId, action);
825+
messageListener.onRequestReceived(requestId, action);
826+
}
827+
828+
@Override
829+
public void onRequestSent(
830+
DiscoveryNode node,
831+
long requestId,
832+
String action,
833+
TransportRequest request,
834+
TransportRequestOptions options
835+
) {
836+
super.onRequestSent(node, requestId, action, request, options);
837+
messageListener.onRequestSent(node, requestId, action, request, options);
838+
}
839+
840+
@Override
841+
@SuppressWarnings("rawtypes")
842+
public void onResponseReceived(long requestId, Transport.ResponseContext holder) {
843+
super.onResponseReceived(requestId, holder);
844+
messageListener.onResponseReceived(requestId, holder);
845+
}
846+
847+
@Override
848+
public void onResponseSent(long requestId, String action, TransportResponse response) {
849+
super.onResponseSent(requestId, action, response);
850+
messageListener.onResponseSent(requestId, action, response);
851+
}
852+
853+
@Override
854+
public void onResponseSent(long requestId, String action, Exception e) {
855+
super.onResponseSent(requestId, action, e);
856+
messageListener.onResponseSent(requestId, action, e);
857+
}
858+
859+
public void addMessageListener(TransportMessageListener listener) {
860+
messageListener.listeners.add(listener);
861+
}
862+
863+
public void removeMessageListener(TransportMessageListener listener) {
864+
messageListener.listeners.remove(listener);
865+
}
866+
867+
private static final class DelegatingTransportMessageListener implements TransportMessageListener {
868+
869+
private final List<TransportMessageListener> listeners = new CopyOnWriteArrayList<>();
870+
871+
@Override
872+
public void onRequestReceived(long requestId, String action) {
873+
for (TransportMessageListener listener : listeners) {
874+
listener.onRequestReceived(requestId, action);
875+
}
876+
}
877+
878+
@Override
879+
public void onResponseSent(long requestId, String action, TransportResponse response) {
880+
for (TransportMessageListener listener : listeners) {
881+
listener.onResponseSent(requestId, action, response);
882+
}
883+
}
884+
885+
@Override
886+
public void onResponseSent(long requestId, String action, Exception error) {
887+
for (TransportMessageListener listener : listeners) {
888+
listener.onResponseSent(requestId, action, error);
889+
}
890+
}
891+
892+
@Override
893+
public void onRequestSent(
894+
DiscoveryNode node,
895+
long requestId,
896+
String action,
897+
TransportRequest request,
898+
TransportRequestOptions finalOptions
899+
) {
900+
for (TransportMessageListener listener : listeners) {
901+
listener.onRequestSent(node, requestId, action, request, finalOptions);
902+
}
903+
}
904+
905+
@Override
906+
@SuppressWarnings("rawtypes")
907+
public void onResponseReceived(long requestId, Transport.ResponseContext holder) {
908+
for (TransportMessageListener listener : listeners) {
909+
listener.onResponseReceived(requestId, holder);
910+
}
911+
}
912+
}
817913
}

0 commit comments

Comments
 (0)