@@ -760,13 +760,21 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, Cancella
760
760
) {
761
761
tracer .startTrace ("executeQueryPhase" , Map .of ());
762
762
final long afterQueryTime ;
763
- try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (context )) {
763
+ final long beforeQueryTime = System .nanoTime ();
764
+ var opsListener = context .indexShard ().getSearchOperationListener ();
765
+ opsListener .onPreQueryPhase (context );
766
+ try {
764
767
loadOrExecuteQueryPhase (request , context );
765
768
if (context .queryResult ().hasSearchContext () == false && readerContext .singleSession ()) {
766
769
freeReaderContext (readerContext .id ());
767
770
}
768
- afterQueryTime = executor .success ();
771
+ afterQueryTime = System .nanoTime ();
772
+ opsListener .onQueryPhase (context , afterQueryTime - beforeQueryTime );
773
+ opsListener = null ;
769
774
} finally {
775
+ if (opsListener != null ) {
776
+ opsListener .onFailedQueryPhase (context );
777
+ }
770
778
tracer .stopTrace (task );
771
779
}
772
780
if (request .numberOfShards () == 1 && (request .source () == null || request .source ().rankBuilder () == null )) {
@@ -824,15 +832,19 @@ public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShard
824
832
}
825
833
826
834
private QueryFetchSearchResult executeFetchPhase (ReaderContext reader , SearchContext context , long afterQueryTime ) {
827
- try (
828
- Releasable scope = tracer .withScope (context .getTask ());
829
- SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (context , true , afterQueryTime )
830
- ) {
835
+ var opsListener = context .indexShard ().getSearchOperationListener ();
836
+ try (Releasable scope = tracer .withScope (context .getTask ());) {
837
+ opsListener .onPreFetchPhase (context );
831
838
fetchPhase .execute (context , shortcutDocIdsToLoad (context ), null );
832
839
if (reader .singleSession ()) {
833
840
freeReaderContext (reader .id ());
834
841
}
835
- executor .success ();
842
+ opsListener .onFetchPhase (context , System .nanoTime () - afterQueryTime );
843
+ opsListener = null ;
844
+ } finally {
845
+ if (opsListener != null ) {
846
+ opsListener .onPreFetchPhase (context );
847
+ }
836
848
}
837
849
// This will incRef the QuerySearchResult when it gets created
838
850
return QueryFetchSearchResult .of (context .queryResult (), context .fetchResult ());
@@ -856,14 +868,21 @@ public void executeQueryPhase(
856
868
}
857
869
runAsync (getExecutor (readerContext .indexShard ()), () -> {
858
870
final ShardSearchRequest shardSearchRequest = readerContext .getShardSearchRequest (null );
859
- try (
860
- SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .QUERY , false );
861
- SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (searchContext )
862
- ) {
863
- searchContext .searcher ().setAggregatedDfs (readerContext .getAggregatedDfs (null ));
864
- processScroll (request , searchContext );
865
- QueryPhase .execute (searchContext );
866
- executor .success ();
871
+ try (SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .QUERY , false );) {
872
+ var opsListener = searchContext .indexShard ().getSearchOperationListener ();
873
+ final long beforeQueryTime = System .nanoTime ();
874
+ opsListener .onPreQueryPhase (searchContext );
875
+ try {
876
+ searchContext .searcher ().setAggregatedDfs (readerContext .getAggregatedDfs (null ));
877
+ processScroll (request , searchContext );
878
+ QueryPhase .execute (searchContext );
879
+ opsListener .onQueryPhase (searchContext , System .nanoTime () - beforeQueryTime );
880
+ opsListener = null ;
881
+ } finally {
882
+ if (opsListener != null ) {
883
+ opsListener .onFailedQueryPhase (searchContext );
884
+ }
885
+ }
867
886
readerContext .setRescoreDocIds (searchContext .rescoreDocIds ());
868
887
// ScrollQuerySearchResult will incRef the QuerySearchResult when it gets constructed.
869
888
return new ScrollQuerySearchResult (searchContext .queryResult (), searchContext .shardTarget ());
@@ -894,18 +913,26 @@ public void executeQueryPhase(
894
913
// fork the execution in the search thread pool
895
914
runAsync (getExecutor (readerContext .indexShard ()), () -> {
896
915
readerContext .setAggregatedDfs (request .dfs ());
897
- try (
898
- SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .QUERY , true );
899
- SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (searchContext )
900
- ) {
901
- searchContext .searcher ().setAggregatedDfs (request .dfs ());
902
- QueryPhase .execute (searchContext );
903
- final QuerySearchResult queryResult = searchContext .queryResult ();
904
- if (queryResult .hasSearchContext () == false && readerContext .singleSession ()) {
905
- // no hits, we can release the context since there will be no fetch phase
906
- freeReaderContext (readerContext .id ());
916
+ try (SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .QUERY , true );) {
917
+ final QuerySearchResult queryResult ;
918
+ var opsListener = searchContext .indexShard ().getSearchOperationListener ();
919
+ final long before = System .nanoTime ();
920
+ opsListener .onPreQueryPhase (searchContext );
921
+ try {
922
+ searchContext .searcher ().setAggregatedDfs (request .dfs ());
923
+ QueryPhase .execute (searchContext );
924
+ queryResult = searchContext .queryResult ();
925
+ if (queryResult .hasSearchContext () == false && readerContext .singleSession ()) {
926
+ // no hits, we can release the context since there will be no fetch phase
927
+ freeReaderContext (readerContext .id ());
928
+ }
929
+ opsListener .onQueryPhase (searchContext , System .nanoTime () - before );
930
+ opsListener = null ;
931
+ } finally {
932
+ if (opsListener != null ) {
933
+ opsListener .onFailedQueryPhase (searchContext );
934
+ }
907
935
}
908
- executor .success ();
909
936
// Pass the rescoreDocIds to the queryResult to send them the coordinating node
910
937
// and receive them back in the fetch phase.
911
938
// We also pass the rescoreDocIds to the LegacyReaderContext in case the search state needs to stay in the data node.
@@ -954,16 +981,25 @@ public void executeFetchPhase(
954
981
}
955
982
runAsync (getExecutor (readerContext .indexShard ()), () -> {
956
983
final ShardSearchRequest shardSearchRequest = readerContext .getShardSearchRequest (null );
957
- try (
958
- SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .FETCH , false );
959
- SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (searchContext )
960
- ) {
961
- searchContext .assignRescoreDocIds (readerContext .getRescoreDocIds (null ));
962
- searchContext .searcher ().setAggregatedDfs (readerContext .getAggregatedDfs (null ));
963
- processScroll (request , searchContext );
964
- searchContext .addQueryResult ();
965
- QueryPhase .execute (searchContext );
966
- final long afterQueryTime = executor .success ();
984
+ var opsListener = readerContext .indexShard ().getSearchOperationListener ();
985
+ try (SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .FETCH , false );) {
986
+ final long afterQueryTime ;
987
+ try {
988
+ final long beforeQueryTime = System .nanoTime ();
989
+ opsListener .onPreQueryPhase (searchContext );
990
+ searchContext .assignRescoreDocIds (readerContext .getRescoreDocIds (null ));
991
+ searchContext .searcher ().setAggregatedDfs (readerContext .getAggregatedDfs (null ));
992
+ processScroll (request , searchContext );
993
+ searchContext .addQueryResult ();
994
+ QueryPhase .execute (searchContext );
995
+ afterQueryTime = System .nanoTime ();
996
+ opsListener .onQueryPhase (searchContext , afterQueryTime - beforeQueryTime );
997
+ opsListener = null ;
998
+ } finally {
999
+ if (opsListener != null ) {
1000
+ opsListener .onFailedQueryPhase (searchContext );
1001
+ }
1002
+ }
967
1003
QueryFetchSearchResult fetchSearchResult = executeFetchPhase (readerContext , searchContext , afterQueryTime );
968
1004
return new ScrollQueryFetchSearchResult (fetchSearchResult , searchContext .shardTarget ());
969
1005
} catch (Exception e ) {
@@ -987,18 +1023,20 @@ public void executeFetchPhase(ShardFetchRequest request, CancellableTask task, A
987
1023
}
988
1024
searchContext .assignRescoreDocIds (readerContext .getRescoreDocIds (request .getRescoreDocIds ()));
989
1025
searchContext .searcher ().setAggregatedDfs (readerContext .getAggregatedDfs (request .getAggregatedDfs ()));
990
- try (
991
- SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (
992
- searchContext ,
993
- true ,
994
- System .nanoTime ()
995
- )
996
- ) {
1026
+ final long startTime = System .nanoTime ();
1027
+ var opsListener = searchContext .indexShard ().getSearchOperationListener ();
1028
+ opsListener .onPreFetchPhase (searchContext );
1029
+ try {
997
1030
fetchPhase .execute (searchContext , request .docIds (), request .getRankDocks ());
998
1031
if (readerContext .singleSession ()) {
999
1032
freeReaderContext (request .contextId ());
1000
1033
}
1001
- executor .success ();
1034
+ opsListener .onFetchPhase (searchContext , System .nanoTime () - startTime );
1035
+ opsListener = null ;
1036
+ } finally {
1037
+ if (opsListener != null ) {
1038
+ opsListener .onFailedFetchPhase (searchContext );
1039
+ }
1002
1040
}
1003
1041
var fetchResult = searchContext .fetchResult ();
1004
1042
// inc-ref fetch result because we close the SearchContext that references it in this try-with-resources block
@@ -1972,58 +2010,4 @@ public AggregationReduceContext forFinalReduction() {
1972
2010
}
1973
2011
};
1974
2012
}
1975
-
1976
- /**
1977
- * This helper class ensures we only execute either the success or the failure path for {@link SearchOperationListener}.
1978
- * This is crucial for some implementations like {@link org.elasticsearch.index.search.stats.ShardSearchStats}.
1979
- */
1980
- private static final class SearchOperationListenerExecutor implements AutoCloseable {
1981
- private final SearchOperationListener listener ;
1982
- private final SearchContext context ;
1983
- private final long time ;
1984
- private final boolean fetch ;
1985
- private long afterQueryTime = -1 ;
1986
- private boolean closed = false ;
1987
-
1988
- SearchOperationListenerExecutor (SearchContext context ) {
1989
- this (context , false , System .nanoTime ());
1990
- }
1991
-
1992
- SearchOperationListenerExecutor (SearchContext context , boolean fetch , long startTime ) {
1993
- this .listener = context .indexShard ().getSearchOperationListener ();
1994
- this .context = context ;
1995
- time = startTime ;
1996
- this .fetch = fetch ;
1997
- if (fetch ) {
1998
- listener .onPreFetchPhase (context );
1999
- } else {
2000
- listener .onPreQueryPhase (context );
2001
- }
2002
- }
2003
-
2004
- long success () {
2005
- return afterQueryTime = System .nanoTime ();
2006
- }
2007
-
2008
- @ Override
2009
- public void close () {
2010
- assert closed == false : "already closed - while technically ok double closing is a likely a bug in this case" ;
2011
- if (closed == false ) {
2012
- closed = true ;
2013
- if (afterQueryTime != -1 ) {
2014
- if (fetch ) {
2015
- listener .onFetchPhase (context , afterQueryTime - time );
2016
- } else {
2017
- listener .onQueryPhase (context , afterQueryTime - time );
2018
- }
2019
- } else {
2020
- if (fetch ) {
2021
- listener .onFailedFetchPhase (context );
2022
- } else {
2023
- listener .onFailedQueryPhase (context );
2024
- }
2025
- }
2026
- }
2027
- }
2028
- }
2029
2013
}
0 commit comments