@@ -88,15 +88,12 @@ bool close_task(PipelineTask* task, Status exec_status) {
88
88
print_id (task->query_context ()->query_id ()),
89
89
exec_status.to_string ());
90
90
}
91
- // decrement_running_task may delete fragment context and will core in some defer
92
- // code, because the defer code will access fragment context itself.
93
- auto lock_for_context = task->fragment_context ()->shared_from_this ();
94
91
Status status = task->close (exec_status);
95
92
if (!status.ok ()) {
96
93
task->fragment_context ()->cancel (status);
97
94
}
98
95
task->finalize ();
99
- return task-> fragment_context ()-> decrement_running_task (task-> pipeline_id ()) ;
96
+ return true ;
100
97
}
101
98
102
99
void TaskScheduler::_do_work (int index) {
@@ -114,10 +111,20 @@ void TaskScheduler::_do_work(int index) {
114
111
}
115
112
task->log_detail_if_need ();
116
113
task->set_running (true );
117
- bool fragment_is_finished = false ;
114
+ bool eos = false ;
115
+ auto status = Status::OK ();
118
116
Defer task_running_defer {[&]() {
119
117
// If fragment is finished, fragment context will be de-constructed with all tasks in it.
120
- if (!fragment_is_finished) {
118
+ if (eos || !status.ok ()) {
119
+ // decrement_running_task may delete fragment context and will core in some defer
120
+ // code, because the defer code will access fragment context itself.
121
+ auto lock_for_context = task->fragment_context ()->shared_from_this ();
122
+ bool close = close_task (task, status);
123
+ task->set_running (false );
124
+ if (close ) {
125
+ task->fragment_context ()->decrement_running_task (task->pipeline_id ());
126
+ }
127
+ } else {
121
128
task->set_running (false );
122
129
}
123
130
}};
@@ -127,12 +134,10 @@ void TaskScheduler::_do_work(int index) {
127
134
128
135
// Close task if canceled
129
136
if (canceled) {
130
- fragment_is_finished = close_task (task, fragment_ctx->get_query_ctx ()->exec_status ());
137
+ status = fragment_ctx->get_query_ctx ()->exec_status ();
138
+ DCHECK (!status.ok ());
131
139
continue ;
132
140
}
133
-
134
- bool eos = false ;
135
- auto status = Status::OK ();
136
141
task->set_core_id (index );
137
142
138
143
// Main logics of execution
@@ -155,10 +160,6 @@ void TaskScheduler::_do_work(int index) {
155
160
} else { status = task->execute (&eos); },
156
161
status);
157
162
fragment_ctx->trigger_report_if_necessary ();
158
-
159
- if (eos || !status.ok ()) {
160
- fragment_is_finished = close_task (task, status);
161
- }
162
163
}
163
164
}
164
165
0 commit comments