Skip to content

Commit d6eed3b

Browse files
authored
[refactor](query ctx) Remove useless state in query ctx (#39948)
1 parent f33c5cf commit d6eed3b

File tree

4 files changed

+2
-126
lines changed

4 files changed

+2
-126
lines changed

be/src/pipeline/exec/scan_operator.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "pipeline/exec/operator.h"
3333
#include "runtime/types.h"
3434
#include "util/runtime_profile.h"
35+
#include "vec/exec/scan/scanner_context.h"
3536
#include "vec/exprs/vcast_expr.h"
3637
#include "vec/exprs/vcompound_pred.h"
3738
#include "vec/exprs/vectorized_fn_call.h"

be/src/runtime/query_context.cpp

+1-28
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env,
8787
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker);
8888
_query_watcher.start();
8989
_shared_hash_table_controller.reset(new vectorized::SharedHashTableController());
90-
_shared_scanner_controller.reset(new vectorized::SharedScannerController());
9190
_execution_dependency = pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency");
9291
_runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(
9392
TUniqueId(), RuntimeFilterParamsContext::create(this), query_mem_tracker);
@@ -193,7 +192,6 @@ QueryContext::~QueryContext() {
193192
_runtime_filter_mgr.reset();
194193
_execution_dependency.reset();
195194
_shared_hash_table_controller.reset();
196-
_shared_scanner_controller.reset();
197195
_runtime_predicates.clear();
198196
file_scan_range_params_map.clear();
199197
obj_pool.clear();
@@ -206,24 +204,14 @@ QueryContext::~QueryContext() {
206204

207205
void QueryContext::set_ready_to_execute(Status reason) {
208206
set_execution_dependency_ready();
209-
{
210-
std::lock_guard<std::mutex> l(_start_lock);
211-
_exec_status.update(reason);
212-
_ready_to_execute = true;
213-
}
207+
_exec_status.update(reason);
214208
if (query_mem_tracker && !reason.ok()) {
215209
query_mem_tracker->set_is_query_cancelled(!reason.ok());
216210
}
217-
_start_cond.notify_all();
218211
}
219212

220213
void QueryContext::set_ready_to_execute_only() {
221214
set_execution_dependency_ready();
222-
{
223-
std::lock_guard<std::mutex> l(_start_lock);
224-
_ready_to_execute = true;
225-
}
226-
_start_cond.notify_all();
227215
}
228216

229217
void QueryContext::set_execution_dependency_ready() {
@@ -284,21 +272,6 @@ std::string QueryContext::print_all_pipeline_context() {
284272
return fmt::to_string(debug_string_buffer);
285273
}
286274

287-
Status QueryContext::cancel_pipeline_context(const int fragment_id, const Status& reason) {
288-
std::weak_ptr<pipeline::PipelineFragmentContext> ctx_to_cancel;
289-
{
290-
std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
291-
if (!_fragment_id_to_pipeline_ctx.contains(fragment_id)) {
292-
return Status::InternalError("fragment_id_to_pipeline_ctx is empty!");
293-
}
294-
ctx_to_cancel = _fragment_id_to_pipeline_ctx[fragment_id];
295-
}
296-
if (auto pipeline_ctx = ctx_to_cancel.lock()) {
297-
pipeline_ctx->cancel(reason);
298-
}
299-
return Status::OK();
300-
}
301-
302275
void QueryContext::set_pipeline_context(
303276
const int fragment_id, std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx) {
304277
std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);

be/src/runtime/query_context.h

-29
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
#include "util/threadpool.h"
4040
#include "vec/exec/scan/scanner_scheduler.h"
4141
#include "vec/runtime/shared_hash_table_controller.h"
42-
#include "vec/runtime/shared_scanner_controller.h"
4342
#include "workload_group/workload_group.h"
4443

4544
namespace doris {
@@ -111,41 +110,20 @@ class QueryContext {
111110

112111
void cancel_all_pipeline_context(const Status& reason, int fragment_id = -1);
113112
std::string print_all_pipeline_context();
114-
Status cancel_pipeline_context(const int fragment_id, const Status& reason);
115113
void set_pipeline_context(const int fragment_id,
116114
std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx);
117115
void cancel(Status new_status, int fragment_id = -1);
118116

119-
void set_exec_status(Status new_status) { _exec_status.update(new_status); }
120-
121117
[[nodiscard]] Status exec_status() { return _exec_status.status(); }
122118

123119
void set_execution_dependency_ready();
124120

125121
void set_ready_to_execute_only();
126122

127-
bool is_ready_to_execute() {
128-
std::lock_guard<std::mutex> l(_start_lock);
129-
return _ready_to_execute;
130-
}
131-
132-
bool wait_for_start() {
133-
int wait_time = config::max_fragment_start_wait_time_seconds;
134-
std::unique_lock<std::mutex> l(_start_lock);
135-
while (!_ready_to_execute.load() && _exec_status.ok() && --wait_time > 0) {
136-
_start_cond.wait_for(l, std::chrono::seconds(1));
137-
}
138-
return _ready_to_execute.load() && _exec_status.ok();
139-
}
140-
141123
std::shared_ptr<vectorized::SharedHashTableController> get_shared_hash_table_controller() {
142124
return _shared_hash_table_controller;
143125
}
144126

145-
std::shared_ptr<vectorized::SharedScannerController> get_shared_scanner_controller() {
146-
return _shared_scanner_controller;
147-
}
148-
149127
bool has_runtime_predicate(int source_node_id) {
150128
return _runtime_predicates.contains(source_node_id);
151129
}
@@ -283,16 +261,9 @@ class QueryContext {
283261
// If this token is not set, the scanner will be executed in "_scan_thread_pool" in exec env.
284262
std::unique_ptr<ThreadPoolToken> _thread_token;
285263

286-
std::mutex _start_lock;
287-
std::condition_variable _start_cond;
288-
// Only valid when _need_wait_execution_trigger is set to true in PlanFragmentExecutor.
289-
// And all fragments of this query will start execution when this is set to true.
290-
std::atomic<bool> _ready_to_execute {false};
291-
292264
void _init_query_mem_tracker();
293265

294266
std::shared_ptr<vectorized::SharedHashTableController> _shared_hash_table_controller;
295-
std::shared_ptr<vectorized::SharedScannerController> _shared_scanner_controller;
296267
std::unordered_map<int, vectorized::RuntimePredicate> _runtime_predicates;
297268

298269
WorkloadGroupPtr _workload_group = nullptr;

be/src/vec/runtime/shared_scanner_controller.h

-69
This file was deleted.

0 commit comments

Comments
 (0)