Skip to content

Commit ad77b8d

Browse files
Make some constant SubscribableListener instances cheaper
We can just use a real constant for the `null` case, avoiding any non-plain stores in all cases. This should be somewhat helpful for the security interceptors. Also, the spots where we create completed instances can use weaker release-type memory semantics to potentially help the compiler out.
1 parent 425823c commit ad77b8d

File tree

16 files changed

+82
-73
lines changed

16 files changed

+82
-73
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -157,15 +157,15 @@ public TransportClusterStatsAction(
157157
protected SubscribableListener<AdditionalStats> createActionContext(Task task, ClusterStatsRequest request) {
158158
assert task instanceof CancellableTask;
159159
final var cancellableTask = (CancellableTask) task;
160-
final var additionalStatsListener = new SubscribableListener<AdditionalStats>();
161160
if (request.isRemoteStats() == false) {
161+
final var additionalStatsListener = new SubscribableListener<AdditionalStats>();
162162
final AdditionalStats additionalStats = new AdditionalStats();
163163
additionalStats.compute(cancellableTask, request, additionalStatsListener);
164+
return additionalStatsListener;
164165
} else {
165166
// For remote stats request, we don't need to compute anything
166-
additionalStatsListener.onResponse(null);
167+
return SubscribableListener.nullSuccess();
167168
}
168-
return additionalStatsListener;
169169
}
170170

171171
@Override

server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java

+29-14
Original file line numberDiff line numberDiff line change
@@ -110,20 +110,6 @@ public SubscribableListener() {
110110
this(EMPTY);
111111
}
112112

113-
/**
114-
* Create a {@link SubscribableListener} which has already succeeded with the given result.
115-
*/
116-
public static <T> SubscribableListener<T> newSucceeded(T result) {
117-
return new SubscribableListener<>(new SuccessResult<>(result));
118-
}
119-
120-
/**
121-
* Create a {@link SubscribableListener} which has already failed with the given exception.
122-
*/
123-
public static <T> SubscribableListener<T> newFailed(Exception exception) {
124-
return new SubscribableListener<>(new FailureResult(exception, exception));
125-
}
126-
127113
/**
128114
* Create a {@link SubscribableListener}, fork a computation to complete it, and return the listener. If the forking itself throws an
129115
* exception then the exception is caught and fed to the returned listener.
@@ -586,4 +572,33 @@ private Runnable scheduleTimeout(TimeValue timeout, ThreadPool threadPool, Execu
586572
private Object compareAndExchangeState(Object expectedValue, Object newValue) {
587573
return VH_STATE_FIELD.compareAndExchange(this, expectedValue, newValue);
588574
}
575+
576+
@SuppressWarnings("rawtypes")
577+
private static final SubscribableListener NULL_SUCCESS = newSucceeded(null);
578+
579+
/**
580+
* Same as {@link #newSucceeded(Object)} but always returns the same instance with result value {@code null}.
581+
*/
582+
@SuppressWarnings("unchecked")
583+
public static <T> SubscribableListener<T> nullSuccess() {
584+
return NULL_SUCCESS;
585+
}
586+
587+
/**
588+
* Create a {@link SubscribableListener} which has already succeeded with the given result.
589+
*/
590+
public static <T> SubscribableListener<T> newSucceeded(T result) {
591+
var res = new SubscribableListener<T>();
592+
VH_STATE_FIELD.setRelease(res, new SuccessResult<>(result));
593+
return res;
594+
}
595+
596+
/**
597+
* Create a {@link SubscribableListener} which has already failed with the given exception.
598+
*/
599+
public static <T> SubscribableListener<T> newFailed(Exception exception) {
600+
var res = new SubscribableListener<T>();
601+
VH_STATE_FIELD.setRelease(res, new FailureResult(exception, exception));
602+
return res;
603+
}
589604
}

server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ protected void doClose() {}
243243
* Kind of a hack tbh, we can't be sure the shard locks are fully released when this is completed so there's all sorts of retries and
244244
* other lenience to handle that. It'd be better to wait for the shard locks to be released and then delete the data. See #74149.
245245
*/
246-
private volatile SubscribableListener<Void> lastClusterStateShardsClosedListener = SubscribableListener.newSucceeded(null);
246+
private volatile SubscribableListener<Void> lastClusterStateShardsClosedListener = SubscribableListener.nullSuccess();
247247

248248
@Nullable // if not currently applying a cluster state
249249
private RefCountingListener currentClusterStateShardsClosedListeners;
@@ -397,7 +397,7 @@ private void deleteIndices(final ClusterChangedEvent event) {
397397
);
398398
} else if (project.isPresent() && project.get().hasIndex(index)) {
399399
// The deleted index was part of the previous cluster state, but not loaded on the local node
400-
indexServiceClosedListener = SubscribableListener.newSucceeded(null);
400+
indexServiceClosedListener = SubscribableListener.nullSuccess();
401401
final IndexMetadata metadata = project.get().index(index);
402402
indexSettings = new IndexSettings(metadata, settings);
403403
indicesService.deleteUnassignedIndex("deleted index was not assigned to local node", metadata, state);
@@ -411,7 +411,7 @@ private void deleteIndices(final ClusterChangedEvent event) {
411411
// previous cluster state is not initialized/recovered.
412412
assert state.metadata().projects().values().stream().anyMatch(p -> p.indexGraveyard().containsIndex(index))
413413
|| previousState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
414-
indexServiceClosedListener = SubscribableListener.newSucceeded(null);
414+
indexServiceClosedListener = SubscribableListener.nullSuccess();
415415
final IndexMetadata metadata = indicesService.verifyIndexIsDeleted(index, event.state());
416416
if (metadata != null) {
417417
indexSettings = new IndexSettings(metadata, settings);

test/framework/src/main/java/org/elasticsearch/indices/recovery/RecoveryClusterStateDelayListeners.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public SubscribableListener<Void> getClusterStateDelayListener(long clusterState
6464
refCounted.decRef();
6565
}
6666
} else {
67-
return SubscribableListener.newSucceeded(null);
67+
return SubscribableListener.nullSuccess();
6868
}
6969
}
7070

test/framework/src/main/java/org/elasticsearch/test/TestCluster.java

+34-40
Original file line numberDiff line numberDiff line change
@@ -127,46 +127,40 @@ private void deleteTemplates(Set<String> excludeTemplates, ActionListener<Void>
127127
l -> client().execute(GetComponentTemplateAction.INSTANCE, new GetComponentTemplateAction.Request(TEST_REQUEST_TIMEOUT, "*"), l)
128128
);
129129

130-
SubscribableListener
131-
132-
// dummy start step for symmetry
133-
.newSucceeded(null)
134-
135-
// delete composable templates
136-
.<GetComposableIndexTemplateAction.Response>andThen(getComposableTemplates::addListener)
137-
.<AcknowledgedResponse>andThen((l, r) -> {
138-
var templates = r.indexTemplates()
139-
.keySet()
140-
.stream()
141-
.filter(template -> excludeTemplates.contains(template) == false)
142-
.toArray(String[]::new);
143-
if (templates.length == 0) {
144-
l.onResponse(AcknowledgedResponse.TRUE);
145-
} else {
146-
var request = new TransportDeleteComposableIndexTemplateAction.Request(templates);
147-
client().execute(TransportDeleteComposableIndexTemplateAction.TYPE, request, l);
148-
}
149-
})
150-
.andThenAccept(ElasticsearchAssertions::assertAcked)
151-
152-
// then delete component templates
153-
.<GetComponentTemplateAction.Response>andThen(getComponentTemplates::addListener)
154-
.<AcknowledgedResponse>andThen((l, response) -> {
155-
var componentTemplates = response.getComponentTemplates()
156-
.keySet()
157-
.stream()
158-
.filter(template -> excludeTemplates.contains(template) == false)
159-
.toArray(String[]::new);
160-
if (componentTemplates.length == 0) {
161-
l.onResponse(AcknowledgedResponse.TRUE);
162-
} else {
163-
client().execute(
164-
TransportDeleteComponentTemplateAction.TYPE,
165-
new TransportDeleteComponentTemplateAction.Request(componentTemplates),
166-
l
167-
);
168-
}
169-
})
130+
// dummy start step for symmetry
131+
SubscribableListener.nullSuccess()
132+
// delete composable templates
133+
.<GetComposableIndexTemplateAction.Response>andThen(getComposableTemplates::addListener).<AcknowledgedResponse>andThen((l, r) -> {
134+
var templates = r.indexTemplates()
135+
.keySet()
136+
.stream()
137+
.filter(template -> excludeTemplates.contains(template) == false)
138+
.toArray(String[]::new);
139+
if (templates.length == 0) {
140+
l.onResponse(AcknowledgedResponse.TRUE);
141+
} else {
142+
var request = new TransportDeleteComposableIndexTemplateAction.Request(templates);
143+
client().execute(TransportDeleteComposableIndexTemplateAction.TYPE, request, l);
144+
}
145+
}).andThenAccept(ElasticsearchAssertions::assertAcked)
146+
147+
// then delete component templates
148+
.<GetComponentTemplateAction.Response>andThen(getComponentTemplates::addListener).<AcknowledgedResponse>andThen((l, response) -> {
149+
var componentTemplates = response.getComponentTemplates()
150+
.keySet()
151+
.stream()
152+
.filter(template -> excludeTemplates.contains(template) == false)
153+
.toArray(String[]::new);
154+
if (componentTemplates.length == 0) {
155+
l.onResponse(AcknowledgedResponse.TRUE);
156+
} else {
157+
client().execute(
158+
TransportDeleteComponentTemplateAction.TYPE,
159+
new TransportDeleteComponentTemplateAction.Request(componentTemplates),
160+
l
161+
);
162+
}
163+
})
170164
.andThenAccept(ElasticsearchAssertions::assertAcked)
171165

172166
// and finish

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ default IsBlockedResult isBlocked() {
9292
return NOT_BLOCKED;
9393
}
9494

95-
IsBlockedResult NOT_BLOCKED = new IsBlockedResult(SubscribableListener.newSucceeded(null), "not blocked");
95+
IsBlockedResult NOT_BLOCKED = new IsBlockedResult(SubscribableListener.nullSuccess(), "not blocked");
9696

9797
/**
9898
* A factory for creating intermediate operators.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeRe
357357
}
358358
doFetchPageAsync(false, ActionListener.wrap(r -> {
359359
if (r.finished()) {
360-
completionListenerRef.compareAndSet(null, SubscribableListener.newSucceeded(null));
360+
completionListenerRef.compareAndSet(null, SubscribableListener.nullSuccess());
361361
}
362362
listener.onResponse(r);
363363
}, e -> close(ActionListener.running(() -> listener.onFailure(e)))));

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/BulkShardRequestInterceptor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,6 @@ public SubscribableListener<Void> intercept(
8181
}
8282
}
8383
}
84-
return SubscribableListener.newSucceeded(null);
84+
return SubscribableListener.nullSuccess();
8585
}
8686
}

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/DlsFlsLicenseRequestInterceptor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,6 @@ public SubscribableListener<Void> intercept(
101101
}
102102
}
103103
}
104-
return SubscribableListener.newSucceeded(null);
104+
return SubscribableListener.nullSuccess();
105105
}
106106
}

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/FieldAndDocumentLevelSecurityRequestInterceptor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ && supports(indicesRequest)
7777
return listener;
7878
}
7979
}
80-
return SubscribableListener.newSucceeded(null);
80+
return SubscribableListener.nullSuccess();
8181
}
8282

8383
abstract void disableFeatures(

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public SubscribableListener<Void> intercept(
125125
);
126126
return listener;
127127
} else {
128-
return SubscribableListener.newSucceeded(null);
128+
return SubscribableListener.nullSuccess();
129129
}
130130
}
131131
}

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/ResizeRequestInterceptor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public SubscribableListener<Void> intercept(
103103
);
104104
return listener;
105105
} else {
106-
return SubscribableListener.newSucceeded(null);
106+
return SubscribableListener.nullSuccess();
107107
}
108108
}
109109
}

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestCacheDisablingInterceptor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ && hasRemoteIndices(searchRequest)
4949
searchRequest.requestCache(false);
5050
}
5151
}
52-
return SubscribableListener.newSucceeded(null);
52+
return SubscribableListener.nullSuccess();
5353
}
5454

5555
// package private for test

x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryVerifyIntegrityIT.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ public void testTaskStatus() throws IOException {
191191
assertEquals(0L, status.indexSnapshotsVerified());
192192
assertEquals(0L, status.blobsVerified());
193193
assertEquals(0L, status.blobBytesVerified());
194-
yield SubscribableListener.newSucceeded(null);
194+
yield SubscribableListener.nullSuccess();
195195
}
196196
case INDEX_RESTORABILITY -> {
197197
// several of these chunks might arrive concurrently; we want to verify the task status before processing any of
@@ -210,7 +210,7 @@ public void testTaskStatus() throws IOException {
210210
assertEquals(0L, status.indicesVerified());
211211
});
212212
}
213-
case SNAPSHOT_INFO -> SubscribableListener.newSucceeded(null);
213+
case SNAPSHOT_INFO -> SubscribableListener.nullSuccess();
214214
case ANOMALY -> fail(null, "should not see anomalies");
215215
};
216216

x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryIntegrityVerifier.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -798,7 +798,7 @@ protected InputStream openSlice(int slice) throws IOException {
798798
})));
799799
} else {
800800
blobBytesVerified.addAndGet(fileInfo.length());
801-
return SubscribableListener.newSucceeded(null);
801+
return SubscribableListener.nullSuccess();
802802
}
803803
});
804804
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformUpgradeModeAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ private void unassignTransforms(ClusterState state, ActionListener<Void> listene
131131

132132
// chain each call one at a time
133133
// because that is what we are doing for ML, and that is all that is supported in the persistentTasksClusterService (for now)
134-
SubscribableListener<PersistentTasksCustomMetadata.PersistentTask<?>> chainListener = SubscribableListener.newSucceeded(null);
134+
SubscribableListener<PersistentTasksCustomMetadata.PersistentTask<?>> chainListener = SubscribableListener.nullSuccess();
135135
for (var task : transformTasks) {
136136
@FixForMultiProject
137137
final var projectId = Metadata.DEFAULT_PROJECT_ID;

0 commit comments

Comments
 (0)