15
15
import org .elasticsearch .action .search .SearchPhaseController .TopDocsStats ;
16
16
import org .elasticsearch .common .breaker .CircuitBreaker ;
17
17
import org .elasticsearch .common .breaker .CircuitBreakingException ;
18
+ import org .elasticsearch .common .collect .Iterators ;
18
19
import org .elasticsearch .common .io .stream .DelayableWriteable ;
19
20
import org .elasticsearch .common .lucene .search .TopDocsAndMaxScore ;
20
21
import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
22
+ import org .elasticsearch .core .Releasable ;
23
+ import org .elasticsearch .core .Releasables ;
21
24
import org .elasticsearch .search .SearchPhaseResult ;
22
25
import org .elasticsearch .search .SearchService ;
23
26
import org .elasticsearch .search .SearchShardTarget ;
31
34
import java .util .ArrayList ;
32
35
import java .util .Collections ;
33
36
import java .util .Comparator ;
37
+ import java .util .Iterator ;
34
38
import java .util .List ;
35
39
import java .util .concurrent .Executor ;
36
40
import java .util .concurrent .atomic .AtomicReference ;
@@ -174,14 +178,10 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
174
178
this .mergeResult = null ;
175
179
final int resultSize = buffer .size () + (mergeResult == null ? 0 : 1 );
176
180
final List <TopDocs > topDocsList = hasTopDocs ? new ArrayList <>(resultSize ) : null ;
177
- final List <DelayableWriteable <InternalAggregations >> aggsList = hasAggs ? new ArrayList <>(resultSize ) : null ;
178
181
if (mergeResult != null ) {
179
182
if (topDocsList != null ) {
180
183
topDocsList .add (mergeResult .reducedTopDocs );
181
184
}
182
- if (aggsList != null ) {
183
- aggsList .add (DelayableWriteable .referencing (mergeResult .reducedAggs ));
184
- }
185
185
}
186
186
for (QuerySearchResult result : buffer ) {
187
187
topDocsStats .add (result .topDocs (), result .searchTimedOut (), result .terminatedEarly ());
@@ -190,34 +190,39 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
190
190
setShardIndex (topDocs .topDocs , result .getShardIndex ());
191
191
topDocsList .add (topDocs .topDocs );
192
192
}
193
- if (aggsList != null ) {
194
- aggsList .add (result .getAggs ());
195
- }
196
193
}
197
194
SearchPhaseController .ReducedQueryPhase reducePhase ;
198
195
long breakerSize = circuitBreakerBytes ;
196
+ final InternalAggregations aggs ;
199
197
try {
200
- if (aggsList != null ) {
198
+ if (hasAggs ) {
201
199
// Add an estimate of the final reduce size
202
200
breakerSize = addEstimateAndMaybeBreak (estimateRamBytesUsedForReduce (breakerSize ));
201
+ aggs = aggregate (
202
+ buffer .iterator (),
203
+ mergeResult ,
204
+ resultSize ,
205
+ performFinalReduce ? aggReduceContextBuilder .forFinalReduction () : aggReduceContextBuilder .forPartialReduction ()
206
+ );
207
+ } else {
208
+ aggs = null ;
203
209
}
204
210
reducePhase = SearchPhaseController .reducedQueryPhase (
205
211
results .asList (),
206
- aggsList ,
212
+ aggs ,
207
213
topDocsList == null ? Collections .emptyList () : topDocsList ,
208
214
topDocsStats ,
209
215
numReducePhases ,
210
216
false ,
211
- aggReduceContextBuilder ,
212
- queryPhaseRankCoordinatorContext ,
213
- performFinalReduce
217
+ queryPhaseRankCoordinatorContext
214
218
);
219
+ buffer = null ;
215
220
} finally {
216
221
releaseAggs (buffer );
217
222
}
218
223
if (hasAggs
219
224
// reduced aggregations can be null if all shards failed
220
- && reducePhase . aggregations () != null ) {
225
+ && aggs != null ) {
221
226
222
227
// Update the circuit breaker to replace the estimation with the serialized size of the newly reduced result
223
228
long finalSize = DelayableWriteable .getSerializedSize (reducePhase .aggregations ()) - breakerSize ;
@@ -249,17 +254,7 @@ private MergeResult partialReduce(
249
254
toConsume .sort (RESULT_COMPARATOR );
250
255
251
256
final TopDocs newTopDocs ;
252
- final InternalAggregations newAggs ;
253
- final List <DelayableWriteable <InternalAggregations >> aggsList ;
254
257
final int resultSetSize = toConsume .size () + (lastMerge != null ? 1 : 0 );
255
- if (hasAggs ) {
256
- aggsList = new ArrayList <>(resultSetSize );
257
- if (lastMerge != null ) {
258
- aggsList .add (DelayableWriteable .referencing (lastMerge .reducedAggs ));
259
- }
260
- } else {
261
- aggsList = null ;
262
- }
263
258
List <TopDocs > topDocsList ;
264
259
if (hasTopDocs ) {
265
260
topDocsList = new ArrayList <>(resultSetSize );
@@ -269,14 +264,12 @@ private MergeResult partialReduce(
269
264
} else {
270
265
topDocsList = null ;
271
266
}
267
+ final InternalAggregations newAggs ;
272
268
try {
273
269
for (QuerySearchResult result : toConsume ) {
274
270
topDocsStats .add (result .topDocs (), result .searchTimedOut (), result .terminatedEarly ());
275
271
SearchShardTarget target = result .getSearchShardTarget ();
276
272
processedShards .add (new SearchShard (target .getClusterAlias (), target .getShardId ()));
277
- if (aggsList != null ) {
278
- aggsList .add (result .getAggs ());
279
- }
280
273
if (topDocsList != null ) {
281
274
TopDocsAndMaxScore topDocs = result .consumeTopDocs ();
282
275
setShardIndex (topDocs .topDocs , result .getShardIndex ());
@@ -285,9 +278,10 @@ private MergeResult partialReduce(
285
278
}
286
279
// we have to merge here in the same way we collect on a shard
287
280
newTopDocs = topDocsList == null ? null : mergeTopDocs (topDocsList , topNSize , 0 );
288
- newAggs = aggsList == null
289
- ? null
290
- : InternalAggregations .topLevelReduceDelayable (aggsList , aggReduceContextBuilder .forPartialReduction ());
281
+ newAggs = hasAggs
282
+ ? aggregate (toConsume .iterator (), lastMerge , resultSetSize , aggReduceContextBuilder .forPartialReduction ())
283
+ : null ;
284
+ toConsume = null ;
291
285
} finally {
292
286
releaseAggs (toConsume );
293
287
}
@@ -302,6 +296,45 @@ private MergeResult partialReduce(
302
296
return new MergeResult (processedShards , newTopDocs , newAggs , newAggs != null ? DelayableWriteable .getSerializedSize (newAggs ) : 0 );
303
297
}
304
298
299
+ private static InternalAggregations aggregate (
300
+ Iterator <QuerySearchResult > toConsume ,
301
+ MergeResult lastMerge ,
302
+ int resultSetSize ,
303
+ AggregationReduceContext reduceContext
304
+ ) {
305
+ interface ReleasableIterator extends Iterator <InternalAggregations >, Releasable {}
306
+ try (var aggsIter = new ReleasableIterator () {
307
+
308
+ private Releasable toRelease ;
309
+
310
+ @ Override
311
+ public void close () {
312
+ Releasables .close (toRelease );
313
+ }
314
+
315
+ @ Override
316
+ public boolean hasNext () {
317
+ return toConsume .hasNext ();
318
+ }
319
+
320
+ @ Override
321
+ public InternalAggregations next () {
322
+ var res = toConsume .next ().consumeAggs ();
323
+ Releasables .close (toRelease );
324
+ toRelease = res ;
325
+ return res .expand ();
326
+ }
327
+ }) {
328
+ return InternalAggregations .topLevelReduce (
329
+ lastMerge == null ? aggsIter : Iterators .concat (Iterators .single (lastMerge .reducedAggs ), aggsIter ),
330
+ resultSetSize ,
331
+ reduceContext
332
+ );
333
+ } finally {
334
+ toConsume .forEachRemaining (QuerySearchResult ::releaseAggs );
335
+ }
336
+ }
337
+
305
338
public int getNumReducePhases () {
306
339
return numReducePhases ;
307
340
}
@@ -517,8 +550,10 @@ public void onFailure(Exception exc) {
517
550
}
518
551
519
552
private static void releaseAggs (List <QuerySearchResult > toConsume ) {
520
- for (QuerySearchResult result : toConsume ) {
521
- result .releaseAggs ();
553
+ if (toConsume != null ) {
554
+ for (QuerySearchResult result : toConsume ) {
555
+ result .releaseAggs ();
556
+ }
522
557
}
523
558
}
524
559
0 commit comments