@@ -748,13 +748,21 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh
748
748
) {
749
749
tracer .startTrace ("executeQueryPhase" , Map .of ());
750
750
final long afterQueryTime ;
751
- try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (context )) {
751
+ final long beforeQueryTime = System .nanoTime ();
752
+ var opsListener = context .indexShard ().getSearchOperationListener ();
753
+ opsListener .onPreQueryPhase (context );
754
+ try {
752
755
loadOrExecuteQueryPhase (request , context );
753
756
if (context .queryResult ().hasSearchContext () == false && readerContext .singleSession ()) {
754
757
freeReaderContext (readerContext .id ());
755
758
}
756
- afterQueryTime = executor .success ();
759
+ afterQueryTime = System .nanoTime ();
760
+ opsListener .onQueryPhase (context , afterQueryTime - beforeQueryTime );
761
+ opsListener = null ;
757
762
} finally {
763
+ if (opsListener != null ) {
764
+ opsListener .onFailedQueryPhase (context );
765
+ }
758
766
tracer .stopTrace (task );
759
767
}
760
768
if (request .numberOfShards () == 1 && (request .source () == null || request .source ().rankBuilder () == null )) {
@@ -812,15 +820,19 @@ public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShard
812
820
}
813
821
814
822
private QueryFetchSearchResult executeFetchPhase (ReaderContext reader , SearchContext context , long afterQueryTime ) {
815
- try (
816
- Releasable scope = tracer .withScope (context .getTask ());
817
- SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (context , true , afterQueryTime )
818
- ) {
823
+ var opsListener = context .indexShard ().getSearchOperationListener ();
824
+ try (Releasable scope = tracer .withScope (context .getTask ());) {
825
+ opsListener .onPreFetchPhase (context );
819
826
fetchPhase .execute (context , shortcutDocIdsToLoad (context ), null );
820
827
if (reader .singleSession ()) {
821
828
freeReaderContext (reader .id ());
822
829
}
823
- executor .success ();
830
+ opsListener .onFetchPhase (context , System .nanoTime () - afterQueryTime );
831
+ opsListener = null ;
832
+ } finally {
833
+ if (opsListener != null ) {
834
+ opsListener .onFailedFetchPhase (context );
835
+ }
824
836
}
825
837
// This will incRef the QuerySearchResult when it gets created
826
838
return QueryFetchSearchResult .of (context .queryResult (), context .fetchResult ());
@@ -844,14 +856,21 @@ public void executeQueryPhase(
844
856
}
845
857
runAsync (getExecutor (readerContext .indexShard ()), () -> {
846
858
final ShardSearchRequest shardSearchRequest = readerContext .getShardSearchRequest (null );
847
- try (
848
- SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .QUERY , false );
849
- SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (searchContext )
850
- ) {
851
- searchContext .searcher ().setAggregatedDfs (readerContext .getAggregatedDfs (null ));
852
- processScroll (request , searchContext );
853
- QueryPhase .execute (searchContext );
854
- executor .success ();
859
+ try (SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .QUERY , false );) {
860
+ var opsListener = searchContext .indexShard ().getSearchOperationListener ();
861
+ final long beforeQueryTime = System .nanoTime ();
862
+ opsListener .onPreQueryPhase (searchContext );
863
+ try {
864
+ searchContext .searcher ().setAggregatedDfs (readerContext .getAggregatedDfs (null ));
865
+ processScroll (request , searchContext );
866
+ QueryPhase .execute (searchContext );
867
+ opsListener .onQueryPhase (searchContext , System .nanoTime () - beforeQueryTime );
868
+ opsListener = null ;
869
+ } finally {
870
+ if (opsListener != null ) {
871
+ opsListener .onFailedQueryPhase (searchContext );
872
+ }
873
+ }
855
874
readerContext .setRescoreDocIds (searchContext .rescoreDocIds ());
856
875
// ScrollQuerySearchResult will incRef the QuerySearchResult when it gets constructed.
857
876
return new ScrollQuerySearchResult (searchContext .queryResult (), searchContext .shardTarget ());
@@ -882,18 +901,26 @@ public void executeQueryPhase(
882
901
// fork the execution in the search thread pool
883
902
runAsync (getExecutor (readerContext .indexShard ()), () -> {
884
903
readerContext .setAggregatedDfs (request .dfs ());
885
- try (
886
- SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .QUERY , true );
887
- SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (searchContext )
888
- ) {
889
- searchContext .searcher ().setAggregatedDfs (request .dfs ());
890
- QueryPhase .execute (searchContext );
891
- final QuerySearchResult queryResult = searchContext .queryResult ();
892
- if (queryResult .hasSearchContext () == false && readerContext .singleSession ()) {
893
- // no hits, we can release the context since there will be no fetch phase
894
- freeReaderContext (readerContext .id ());
904
+ try (SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .QUERY , true );) {
905
+ final QuerySearchResult queryResult ;
906
+ var opsListener = searchContext .indexShard ().getSearchOperationListener ();
907
+ final long before = System .nanoTime ();
908
+ opsListener .onPreQueryPhase (searchContext );
909
+ try {
910
+ searchContext .searcher ().setAggregatedDfs (request .dfs ());
911
+ QueryPhase .execute (searchContext );
912
+ queryResult = searchContext .queryResult ();
913
+ if (queryResult .hasSearchContext () == false && readerContext .singleSession ()) {
914
+ // no hits, we can release the context since there will be no fetch phase
915
+ freeReaderContext (readerContext .id ());
916
+ }
917
+ opsListener .onQueryPhase (searchContext , System .nanoTime () - before );
918
+ opsListener = null ;
919
+ } finally {
920
+ if (opsListener != null ) {
921
+ opsListener .onFailedQueryPhase (searchContext );
922
+ }
895
923
}
896
- executor .success ();
897
924
// Pass the rescoreDocIds to the queryResult to send them the coordinating node
898
925
// and receive them back in the fetch phase.
899
926
// We also pass the rescoreDocIds to the LegacyReaderContext in case the search state needs to stay in the data node.
@@ -942,16 +969,25 @@ public void executeFetchPhase(
942
969
}
943
970
runAsync (getExecutor (readerContext .indexShard ()), () -> {
944
971
final ShardSearchRequest shardSearchRequest = readerContext .getShardSearchRequest (null );
945
- try (
946
- SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .FETCH , false );
947
- SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (searchContext )
948
- ) {
949
- searchContext .assignRescoreDocIds (readerContext .getRescoreDocIds (null ));
950
- searchContext .searcher ().setAggregatedDfs (readerContext .getAggregatedDfs (null ));
951
- processScroll (request , searchContext );
952
- searchContext .addQueryResult ();
953
- QueryPhase .execute (searchContext );
954
- final long afterQueryTime = executor .success ();
972
+ try (SearchContext searchContext = createContext (readerContext , shardSearchRequest , task , ResultsType .FETCH , false );) {
973
+ var opsListener = readerContext .indexShard ().getSearchOperationListener ();
974
+ final long beforeQueryTime = System .nanoTime ();
975
+ final long afterQueryTime ;
976
+ try {
977
+ opsListener .onPreQueryPhase (searchContext );
978
+ searchContext .assignRescoreDocIds (readerContext .getRescoreDocIds (null ));
979
+ searchContext .searcher ().setAggregatedDfs (readerContext .getAggregatedDfs (null ));
980
+ processScroll (request , searchContext );
981
+ searchContext .addQueryResult ();
982
+ QueryPhase .execute (searchContext );
983
+ afterQueryTime = System .nanoTime ();
984
+ opsListener .onQueryPhase (searchContext , afterQueryTime - beforeQueryTime );
985
+ opsListener = null ;
986
+ } finally {
987
+ if (opsListener != null ) {
988
+ opsListener .onFailedQueryPhase (searchContext );
989
+ }
990
+ }
955
991
QueryFetchSearchResult fetchSearchResult = executeFetchPhase (readerContext , searchContext , afterQueryTime );
956
992
return new ScrollQueryFetchSearchResult (fetchSearchResult , searchContext .shardTarget ());
957
993
} catch (Exception e ) {
@@ -975,18 +1011,20 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A
975
1011
}
976
1012
searchContext .assignRescoreDocIds (readerContext .getRescoreDocIds (request .getRescoreDocIds ()));
977
1013
searchContext .searcher ().setAggregatedDfs (readerContext .getAggregatedDfs (request .getAggregatedDfs ()));
978
- try (
979
- SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor (
980
- searchContext ,
981
- true ,
982
- System .nanoTime ()
983
- )
984
- ) {
1014
+ final long startTime = System .nanoTime ();
1015
+ var opsListener = searchContext .indexShard ().getSearchOperationListener ();
1016
+ opsListener .onPreFetchPhase (searchContext );
1017
+ try {
985
1018
fetchPhase .execute (searchContext , request .docIds (), request .getRankDocks ());
986
1019
if (readerContext .singleSession ()) {
987
1020
freeReaderContext (request .contextId ());
988
1021
}
989
- executor .success ();
1022
+ opsListener .onFetchPhase (searchContext , System .nanoTime () - startTime );
1023
+ opsListener = null ;
1024
+ } finally {
1025
+ if (opsListener != null ) {
1026
+ opsListener .onFailedFetchPhase (searchContext );
1027
+ }
990
1028
}
991
1029
var fetchResult = searchContext .fetchResult ();
992
1030
// inc-ref fetch result because we close the SearchContext that references it in this try-with-resources block
@@ -2007,58 +2045,4 @@ public AggregationReduceContext forFinalReduction() {
2007
2045
}
2008
2046
};
2009
2047
}
2010
-
2011
- /**
2012
- * This helper class ensures we only execute either the success or the failure path for {@link SearchOperationListener}.
2013
- * This is crucial for some implementations like {@link org.elasticsearch.index.search.stats.ShardSearchStats}.
2014
- */
2015
- private static final class SearchOperationListenerExecutor implements AutoCloseable {
2016
- private final SearchOperationListener listener ;
2017
- private final SearchContext context ;
2018
- private final long time ;
2019
- private final boolean fetch ;
2020
- private long afterQueryTime = -1 ;
2021
- private boolean closed = false ;
2022
-
2023
- SearchOperationListenerExecutor (SearchContext context ) {
2024
- this (context , false , System .nanoTime ());
2025
- }
2026
-
2027
- SearchOperationListenerExecutor (SearchContext context , boolean fetch , long startTime ) {
2028
- this .listener = context .indexShard ().getSearchOperationListener ();
2029
- this .context = context ;
2030
- time = startTime ;
2031
- this .fetch = fetch ;
2032
- if (fetch ) {
2033
- listener .onPreFetchPhase (context );
2034
- } else {
2035
- listener .onPreQueryPhase (context );
2036
- }
2037
- }
2038
-
2039
- long success () {
2040
- return afterQueryTime = System .nanoTime ();
2041
- }
2042
-
2043
- @ Override
2044
- public void close () {
2045
- assert closed == false : "already closed - while technically ok double closing is a likely a bug in this case" ;
2046
- if (closed == false ) {
2047
- closed = true ;
2048
- if (afterQueryTime != -1 ) {
2049
- if (fetch ) {
2050
- listener .onFetchPhase (context , afterQueryTime - time );
2051
- } else {
2052
- listener .onQueryPhase (context , afterQueryTime - time );
2053
- }
2054
- } else {
2055
- if (fetch ) {
2056
- listener .onFailedFetchPhase (context );
2057
- } else {
2058
- listener .onFailedQueryPhase (context );
2059
- }
2060
- }
2061
- }
2062
- }
2063
- }
2064
2048
}
0 commit comments