Skip to content

Commit 8c9c049

Browse files
Release aggs earlier
1 parent a80402b commit 8c9c049

File tree

6 files changed

+56
-46
lines changed

6 files changed

+56
-46
lines changed

server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

+32-13
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.common.io.stream.DelayableWriteable;
1919
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
2020
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
21+
import org.elasticsearch.core.Releasables;
2122
import org.elasticsearch.search.SearchPhaseResult;
2223
import org.elasticsearch.search.SearchService;
2324
import org.elasticsearch.search.SearchShardTarget;
@@ -174,7 +175,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
174175
this.mergeResult = null;
175176
final int resultSize = buffer.size() + (mergeResult == null ? 0 : 1);
176177
final List<TopDocs> topDocsList = hasTopDocs ? new ArrayList<>(resultSize) : null;
177-
final List<DelayableWriteable<InternalAggregations>> aggsList = hasAggs ? new ArrayList<>(resultSize) : null;
178+
List<DelayableWriteable<InternalAggregations>> aggsList = hasAggs ? new ArrayList<>(resultSize) : null;
178179
if (mergeResult != null) {
179180
if (topDocsList != null) {
180181
topDocsList.add(mergeResult.reducedTopDocs);
@@ -191,29 +192,37 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
191192
topDocsList.add(topDocs.topDocs);
192193
}
193194
if (aggsList != null) {
194-
aggsList.add(result.getAggs());
195+
aggsList.add(result.consumeAggs());
195196
}
196197
}
197198
SearchPhaseController.ReducedQueryPhase reducePhase;
198199
long breakerSize = circuitBreakerBytes;
199200
try {
201+
final InternalAggregations aggs;
200202
if (aggsList != null) {
201203
// Add an estimate of the final reduce size
202204
breakerSize = addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce(breakerSize));
205+
aggs = InternalAggregations.topLevelReduceDelayable(
206+
aggsList,
207+
performFinalReduce ? aggReduceContextBuilder.forFinalReduction() : aggReduceContextBuilder.forPartialReduction()
208+
);
209+
aggsList = null;
210+
} else {
211+
aggs = null;
203212
}
204213
reducePhase = SearchPhaseController.reducedQueryPhase(
205214
results.asList(),
206-
aggsList,
215+
aggs,
207216
topDocsList == null ? Collections.emptyList() : topDocsList,
208217
topDocsStats,
209218
numReducePhases,
210219
false,
211-
aggReduceContextBuilder,
212-
queryPhaseRankCoordinatorContext,
213-
performFinalReduce
220+
queryPhaseRankCoordinatorContext
214221
);
215222
} finally {
216-
releaseAggs(buffer);
223+
if (aggsList != null) {
224+
releaseAggs(buffer, aggsList);
225+
}
217226
}
218227
if (hasAggs
219228
// reduced aggregations can be null if all shards failed
@@ -250,7 +259,7 @@ private MergeResult partialReduce(
250259

251260
final TopDocs newTopDocs;
252261
final InternalAggregations newAggs;
253-
final List<DelayableWriteable<InternalAggregations>> aggsList;
262+
List<DelayableWriteable<InternalAggregations>> aggsList;
254263
final int resultSetSize = toConsume.size() + (lastMerge != null ? 1 : 0);
255264
if (hasAggs) {
256265
aggsList = new ArrayList<>(resultSetSize);
@@ -275,7 +284,7 @@ private MergeResult partialReduce(
275284
SearchShardTarget target = result.getSearchShardTarget();
276285
processedShards.add(new SearchShard(target.getClusterAlias(), target.getShardId()));
277286
if (aggsList != null) {
278-
aggsList.add(result.getAggs());
287+
aggsList.add(result.consumeAggs());
279288
}
280289
if (topDocsList != null) {
281290
TopDocsAndMaxScore topDocs = result.consumeTopDocs();
@@ -285,11 +294,16 @@ private MergeResult partialReduce(
285294
}
286295
// we have to merge here in the same way we collect on a shard
287296
newTopDocs = topDocsList == null ? null : mergeTopDocs(topDocsList, topNSize, 0);
288-
newAggs = aggsList == null
289-
? null
290-
: InternalAggregations.topLevelReduceDelayable(aggsList, aggReduceContextBuilder.forPartialReduction());
297+
if (aggsList != null) {
298+
newAggs = InternalAggregations.topLevelReduceDelayable(aggsList, aggReduceContextBuilder.forPartialReduction());
299+
aggsList = null;
300+
} else {
301+
newAggs = null;
302+
}
291303
} finally {
292-
releaseAggs(toConsume);
304+
if (aggsList != null) {
305+
releaseAggs(toConsume, aggsList);
306+
}
293307
}
294308
if (lastMerge != null) {
295309
processedShards.addAll(lastMerge.processedShards);
@@ -302,6 +316,11 @@ private MergeResult partialReduce(
302316
return new MergeResult(processedShards, newTopDocs, newAggs, newAggs != null ? DelayableWriteable.getSerializedSize(newAggs) : 0);
303317
}
304318

319+
private static void releaseAggs(List<QuerySearchResult> toConsume, List<DelayableWriteable<InternalAggregations>> aggsList) {
320+
releaseAggs(toConsume);
321+
Releasables.close(aggsList);
322+
}
323+
305324
public int getNumReducePhases() {
306325
return numReducePhases;
307326
}

server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

+4-12
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.lucene.search.TotalHits;
2121
import org.apache.lucene.search.TotalHits.Relation;
2222
import org.elasticsearch.common.breaker.CircuitBreaker;
23-
import org.elasticsearch.common.io.stream.DelayableWriteable;
2423
import org.elasticsearch.common.lucene.Lucene;
2524
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
2625
import org.elasticsearch.common.util.Maps;
@@ -401,22 +400,20 @@ private static SearchHits getHits(
401400
/**
402401
* Reduces the given query results and consumes all aggregations and profile results.
403402
* @param queryResults a list of non-null query shard results
404-
* @param bufferedAggs a list of pre-collected aggregations.
403+
* @param reducedAggs already reduced aggregations
405404
* @param bufferedTopDocs a list of pre-collected top docs.
406405
* @param numReducePhases the number of non-final reduce phases applied to the query results.
407406
* @see QuerySearchResult#getAggs()
408407
* @see QuerySearchResult#consumeProfileResult()
409408
*/
410409
static ReducedQueryPhase reducedQueryPhase(
411410
Collection<? extends SearchPhaseResult> queryResults,
412-
@Nullable List<DelayableWriteable<InternalAggregations>> bufferedAggs,
411+
@Nullable InternalAggregations reducedAggs,
413412
List<TopDocs> bufferedTopDocs,
414413
TopDocsStats topDocsStats,
415414
int numReducePhases,
416415
boolean isScrollRequest,
417-
AggregationReduceContext.Builder aggReduceContextBuilder,
418-
QueryPhaseRankCoordinatorContext queryPhaseRankCoordinatorContext,
419-
boolean performFinalReduce
416+
QueryPhaseRankCoordinatorContext queryPhaseRankCoordinatorContext
420417
) {
421418
assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases;
422419
numReducePhases++; // increment for this phase
@@ -520,12 +517,7 @@ static ReducedQueryPhase reducedQueryPhase(
520517
topDocsStats.timedOut,
521518
topDocsStats.terminatedEarly,
522519
reducedSuggest,
523-
bufferedAggs == null
524-
? null
525-
: InternalAggregations.topLevelReduceDelayable(
526-
bufferedAggs,
527-
performFinalReduce ? aggReduceContextBuilder.forFinalReduction() : aggReduceContextBuilder.forPartialReduction()
528-
),
520+
reducedAggs,
529521
profileShardResults.isEmpty() ? null : new SearchProfileResultsBuilder(profileShardResults),
530522
sortedTopDocs,
531523
sortValueFormats,

server/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java

+1-11
Original file line numberDiff line numberDiff line change
@@ -339,16 +339,6 @@ public AggregationReduceContext forFinalReduction() {
339339
topDocs.add(td.topDocs);
340340
}
341341
}
342-
return SearchPhaseController.reducedQueryPhase(
343-
queryResults,
344-
null,
345-
topDocs,
346-
topDocsStats,
347-
0,
348-
true,
349-
aggReduceContextBuilder,
350-
null,
351-
true
352-
);
342+
return SearchPhaseController.reducedQueryPhase(queryResults, null, topDocs, topDocsStats, 0, true, null);
353343
}
354344
}

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.common.io.stream.Writeable;
1515
import org.elasticsearch.common.util.CollectionUtils;
1616
import org.elasticsearch.common.util.Maps;
17+
import org.elasticsearch.core.Releasables;
1718
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
1819
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
1920
import org.elasticsearch.search.aggregations.support.AggregationPath;
@@ -199,7 +200,11 @@ public int size() {
199200
return delayableAggregations.size();
200201
}
201202
};
202-
return topLevelReduce(aggregations, context);
203+
try {
204+
return topLevelReduce(aggregations, context);
205+
} finally {
206+
Releasables.close(delayableAggregations);
207+
}
203208
}
204209

205210
/**

server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java

+9
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,15 @@ public DelayableWriteable<InternalAggregations> getAggs() {
236236
return aggregations;
237237
}
238238

239+
public DelayableWriteable<InternalAggregations> consumeAggs() {
240+
if (aggregations == null) {
241+
throw new IllegalStateException("aggs already released");
242+
}
243+
var res = aggregations;
244+
aggregations = null;
245+
return res;
246+
}
247+
239248
/**
240249
* Release the memory hold by the {@link DelayableWriteable} aggregations
241250
* @throws IllegalStateException if {@link #releaseAggs()} has already being called.

server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java

+4-9
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@
6868
import org.elasticsearch.search.suggest.term.TermSuggestion;
6969
import org.elasticsearch.tasks.TaskId;
7070
import org.elasticsearch.test.ESTestCase;
71-
import org.elasticsearch.test.InternalAggregationTestCase;
7271
import org.elasticsearch.threadpool.TestThreadPool;
7372
import org.elasticsearch.threadpool.ThreadPool;
7473
import org.elasticsearch.transport.TransportMessage;
@@ -273,14 +272,12 @@ public void testMerge() {
273272
try {
274273
SearchPhaseController.ReducedQueryPhase reducedQueryPhase = SearchPhaseController.reducedQueryPhase(
275274
queryResults.asList(),
276-
new ArrayList<>(),
275+
InternalAggregations.EMPTY,
277276
new ArrayList<>(),
278277
new TopDocsStats(trackTotalHits),
279278
0,
280279
true,
281-
InternalAggregationTestCase.emptyReduceContextBuilder(),
282-
null,
283-
true
280+
null
284281
);
285282
List<SearchShardTarget> shards = queryResults.asList()
286283
.stream()
@@ -363,12 +360,11 @@ public void testMergeWithRank() {
363360
try {
364361
SearchPhaseController.ReducedQueryPhase reducedQueryPhase = SearchPhaseController.reducedQueryPhase(
365362
queryResults.asList(),
366-
new ArrayList<>(),
363+
InternalAggregations.EMPTY,
367364
new ArrayList<>(),
368365
new TopDocsStats(trackTotalHits),
369366
0,
370367
true,
371-
InternalAggregationTestCase.emptyReduceContextBuilder(),
372368
new QueryPhaseRankCoordinatorContext(windowSize) {
373369
@Override
374370
public ScoreDoc[] rankQueryPhaseResults(List<QuerySearchResult> querySearchResults, TopDocsStats topDocStats) {
@@ -395,8 +391,7 @@ protected boolean lessThan(RankDoc a, RankDoc b) {
395391
topDocStats.fetchHits = topResults.length;
396392
return topResults;
397393
}
398-
},
399-
true
394+
}
400395
);
401396
List<SearchShardTarget> shards = queryResults.asList()
402397
.stream()

0 commit comments

Comments
 (0)