From 24015edf2920effb62f44527cf7b85173cf82b82 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 30 Dec 2024 16:48:22 +0800 Subject: [PATCH 1/8] disttask: recreate step executor if step context is done --- pkg/disttask/framework/scheduler/balancer.go | 4 ++ .../framework/taskexecutor/task_executor.go | 5 ++- .../addindextest1/disttask_test.go | 45 +++++++++++++++++++ 3 files changed, 53 insertions(+), 1 deletion(-) diff --git a/pkg/disttask/framework/scheduler/balancer.go b/pkg/disttask/framework/scheduler/balancer.go index 083c0f07315a1..beed8ba83dcf5 100644 --- a/pkg/disttask/framework/scheduler/balancer.go +++ b/pkg/disttask/framework/scheduler/balancer.go @@ -19,6 +19,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/disttask/framework/proto" llog "github.com/pingcap/tidb/pkg/lightning/log" "github.com/pingcap/tidb/pkg/util/intest" @@ -116,6 +117,9 @@ func (b *balancer) doBalanceSubtasks(ctx context.Context, taskID int64, eligible // managed nodes, subtasks of task might not be balanced. adjustedNodes := filterNodesWithEnoughSlots(b.currUsedSlots, b.slotMgr.getCapacity(), eligibleNodes, subtasks[0].Concurrency) + failpoint.Inject("mockNoEnoughSlots", func(_ failpoint.Value) { + adjustedNodes = []string{} + }) if len(adjustedNodes) == 0 { // no node has enough slots to run the subtasks, skip balance and skip // update used slots. diff --git a/pkg/disttask/framework/taskexecutor/task_executor.go b/pkg/disttask/framework/taskexecutor/task_executor.go index bb83a1cbaed7d..1b444c83ca071 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor.go +++ b/pkg/disttask/framework/taskexecutor/task_executor.go @@ -144,6 +144,7 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) { zap.Int64("subtaskID", e.currSubtaskID.Load())) // cancels runStep, but leave the subtask state unchanged. e.cancelRunStepWith(nil) + failpoint.InjectCall("afterCancelRunningSubtask") return } @@ -307,7 +308,9 @@ func (e *BaseTaskExecutor) Run() { // reset it when we get a subtask checkInterval, noSubtaskCheckCnt = SubtaskCheckInterval, 0 - if e.stepExec != nil && e.stepExec.GetStep() != subtask.Step { + if e.stepExec != nil && + (e.stepExec.GetStep() != subtask.Step || + e.stepCtx.Err() != nil) { // Previous step ctx is done, cleanup and use a new one. e.cleanStepExecutor() } if e.stepExec == nil { diff --git a/tests/realtikvtest/addindextest1/disttask_test.go b/tests/realtikvtest/addindextest1/disttask_test.go index d4216e496a3d0..96a3b76cb98d2 100644 --- a/tests/realtikvtest/addindextest1/disttask_test.go +++ b/tests/realtikvtest/addindextest1/disttask_test.go @@ -391,3 +391,48 @@ func TestAddIndexDistLockAcquireFailed(t *testing.T) { testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/owner/mockAcquireDistLockFailed", "1*return(true)") tk.MustExec("alter table t add index idx(b);") } + +func TestAddIndexScheduleAway(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set global tidb_enable_dist_task = on;") + t.Cleanup(func() { + tk.MustExec("set global tidb_enable_dist_task = off;") + }) + tk.MustExec("create table t (a int, b int);") + tk.MustExec("insert into t values (1, 1);") + + var jobID atomic.Int64 + // Acquire the job ID. + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep", func(job *model.Job) { + if job.Type == model.ActionAddIndex { + jobID.Store(job.ID) + } + }) + // Do not balance subtasks automatically. + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockNoEnoughSlots", "return") + afterCancel := make(chan struct{}) + // Capture the cancel operation from checkBalanceLoop. + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/afterCancelRunningSubtask", func() { + close(afterCancel) + }) + var once sync.Once + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/mockDMLExecutionAddIndexSubTaskFinish", func() { + once.Do(func() { + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + updateExecID := fmt.Sprintf(` + update mysql.tidb_background_subtask set exec_id = 'other' where task_key in + (select id from mysql.tidb_global_task where task_key like '%%%d')`, jobID.Load()) + tk1.MustExec(updateExecID) + <-afterCancel + updateExecID = fmt.Sprintf(` + update mysql.tidb_background_subtask set exec_id = ':4000' where task_key in + (select id from mysql.tidb_global_task where task_key like '%%%d')`, jobID.Load()) + tk1.MustExec(updateExecID) + }) + }) + tk.MustExec("alter table t add index idx(b);") + require.NotEqual(t, int64(0), jobID.Load()) +} From 4a2dba9348a742dddcd7294f02869a2382d768a6 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 31 Dec 2024 12:28:24 +0800 Subject: [PATCH 2/8] address comment --- .../framework/taskexecutor/task_executor.go | 14 +++++++++----- tests/realtikvtest/addindextest1/disttask_test.go | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/pkg/disttask/framework/taskexecutor/task_executor.go b/pkg/disttask/framework/taskexecutor/task_executor.go index 1b444c83ca071..5a17dced8ace4 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor.go +++ b/pkg/disttask/framework/taskexecutor/task_executor.go @@ -143,8 +143,8 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) { e.logger.Info("subtask is scheduled away, cancel running", zap.Int64("subtaskID", e.currSubtaskID.Load())) // cancels runStep, but leave the subtask state unchanged. - e.cancelRunStepWith(nil) - failpoint.InjectCall("afterCancelRunningSubtask") + e.Cancel() + failpoint.InjectCall("afterCancelSubtaskExec") return } @@ -308,9 +308,7 @@ func (e *BaseTaskExecutor) Run() { // reset it when we get a subtask checkInterval, noSubtaskCheckCnt = SubtaskCheckInterval, 0 - if e.stepExec != nil && - (e.stepExec.GetStep() != subtask.Step || - e.stepCtx.Err() != nil) { // Previous step ctx is done, cleanup and use a new one. + if e.stepExec != nil && e.stepExec.GetStep() != subtask.Step { e.cleanStepExecutor() } if e.stepExec == nil { @@ -320,6 +318,12 @@ func (e *BaseTaskExecutor) Run() { continue } } + if err := e.stepCtx.Err(); err != nil { + e.logger.Error("step executor context is done, the task should have been reverted", + zap.String("step", proto.Step2Str(task.Type, task.Step)), + zap.Error(err)) + continue + } err = e.runSubtask(subtask) if err != nil { // task executor keeps running its subtasks even though some subtask diff --git a/tests/realtikvtest/addindextest1/disttask_test.go b/tests/realtikvtest/addindextest1/disttask_test.go index 96a3b76cb98d2..ceebf0fc51294 100644 --- a/tests/realtikvtest/addindextest1/disttask_test.go +++ b/tests/realtikvtest/addindextest1/disttask_test.go @@ -414,7 +414,7 @@ func TestAddIndexScheduleAway(t *testing.T) { testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockNoEnoughSlots", "return") afterCancel := make(chan struct{}) // Capture the cancel operation from checkBalanceLoop. - testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/afterCancelRunningSubtask", func() { + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/afterCancelSubtaskExec", func() { close(afterCancel) }) var once sync.Once From 6937ceb93e4895b089e97c793670c0eb6a28ac93 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 31 Dec 2024 14:50:38 +0800 Subject: [PATCH 3/8] fix TestCheckBalanceSubtask --- pkg/disttask/framework/taskexecutor/task_executor_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/disttask/framework/taskexecutor/task_executor_test.go b/pkg/disttask/framework/taskexecutor/task_executor_test.go index 17203ddfcc326..6430cf134c782 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor_test.go +++ b/pkg/disttask/framework/taskexecutor/task_executor_test.go @@ -897,8 +897,8 @@ func TestCheckBalanceSubtask(t *testing.T) { task.ID, task.Step, proto.SubtaskStateRunning).Return(nil, errors.New("error")) mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", task.ID, task.Step, proto.SubtaskStateRunning).Return([]*proto.Subtask{}, nil) - runCtx, cancelCause := context.WithCancelCause(ctx) - taskExecutor.mu.runtimeCancel = cancelCause + runCtx, cancel := context.WithCancel(ctx) + taskExecutor.cancel = cancel require.NoError(t, runCtx.Err()) taskExecutor.checkBalanceSubtask(ctx) require.ErrorIs(t, runCtx.Err(), context.Canceled) From b146bacf692f6d4e18e87b37239bb74cf93c9dae Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 31 Dec 2024 18:01:47 +0800 Subject: [PATCH 4/8] fix TestTaskExecutorRun --- .../framework/taskexecutor/task_executor_test.go | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/pkg/disttask/framework/taskexecutor/task_executor_test.go b/pkg/disttask/framework/taskexecutor/task_executor_test.go index 6430cf134c782..63107282cb6c3 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor_test.go +++ b/pkg/disttask/framework/taskexecutor/task_executor_test.go @@ -315,7 +315,7 @@ func TestTaskExecutorRun(t *testing.T) { // mock for checkBalanceSubtask, returns empty subtask list e.taskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, proto.SubtaskStateRunning).Return([]*proto.Subtask{}, nil) - // this subtask is scheduled awsy during running + // this subtask is scheduled away during running e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil) @@ -326,18 +326,6 @@ func TestTaskExecutorRun(t *testing.T) { <-ctx.Done() return ctx.Err() }) - // keep running next subtask - nextSubtask := &proto.Subtask{SubtaskBase: proto.SubtaskBase{ - ID: 2, Type: e.task1.Type, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}} - e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) - e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(nextSubtask, nil) - e.stepExecutor.EXPECT().GetStep().Return(proto.StepOne) - e.taskTable.EXPECT().StartSubtask(gomock.Any(), nextSubtask.ID, "id").Return(nil) - e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), nextSubtask).Return(nil) - e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", nextSubtask.ID, gomock.Any()).Return(nil) - // exit - e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil) e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) e.taskExecutor.Run() require.True(t, e.ctrl.Satisfied()) From d09240a236d82318d008621728322984ef6c8e85 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 6 Jan 2025 15:12:10 +0800 Subject: [PATCH 5/8] introduce subtask context and cancel it when subtask is scheduled away --- .../framework/taskexecutor/task_executor.go | 22 +++++++++++++++---- .../taskexecutor/task_executor_test.go | 15 ++++++++++++- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/pkg/disttask/framework/taskexecutor/task_executor.go b/pkg/disttask/framework/taskexecutor/task_executor.go index 5a17dced8ace4..e5de734c1748c 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor.go +++ b/pkg/disttask/framework/taskexecutor/task_executor.go @@ -90,6 +90,7 @@ type BaseTaskExecutor struct { sync.RWMutex // runtimeCancel is used to cancel the Run/Rollback when error occurs. runtimeCancel context.CancelCauseFunc + subtaskCancel context.CancelFunc } stepExec execute.StepExecutor @@ -143,7 +144,7 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) { e.logger.Info("subtask is scheduled away, cancel running", zap.Int64("subtaskID", e.currSubtaskID.Load())) // cancels runStep, but leave the subtask state unchanged. - e.Cancel() + e.cancelSubtaskCtx() failpoint.InjectCall("afterCancelSubtaskExec") return } @@ -425,23 +426,28 @@ func (e *BaseTaskExecutor) runSubtask(subtask *proto.Subtask) (resErr error) { logTask := llog.BeginTask(logger, "run subtask") subtaskErr := func() error { e.currSubtaskID.Store(subtask.ID) + subtaskCtx, subtaskCancelCtx := context.WithCancel(e.stepCtx) + e.mu.Lock() + e.mu.subtaskCancel = subtaskCancelCtx + e.mu.Unlock() var wg util.WaitGroupWrapper - checkCtx, checkCancel := context.WithCancel(e.stepCtx) + checkCtx, checkCancel := context.WithCancel(subtaskCtx) wg.RunWithLog(func() { e.checkBalanceSubtask(checkCtx) }) if e.hasRealtimeSummary(e.stepExec) { wg.RunWithLog(func() { - e.updateSubtaskSummaryLoop(checkCtx, e.stepCtx, e.stepExec) + e.updateSubtaskSummaryLoop(checkCtx, subtaskCtx, e.stepExec) }) } defer func() { checkCancel() + subtaskCancelCtx() wg.Wait() }() - return e.stepExec.RunSubtask(e.stepCtx, subtask) + return e.stepExec.RunSubtask(subtaskCtx, subtask) }() failpoint.InjectCall("afterRunSubtask", e, &subtaskErr) logTask.End2(zap.InfoLevel, subtaskErr) @@ -494,6 +500,14 @@ func (e *BaseTaskExecutor) cancelRunStepWith(cause error) { } } +func (e *BaseTaskExecutor) cancelSubtaskCtx() { + e.mu.Lock() + defer e.mu.Unlock() + if e.mu.subtaskCancel != nil { + e.mu.subtaskCancel() + } +} + func (e *BaseTaskExecutor) updateSubtaskStateAndErrorImpl(ctx context.Context, execID string, subtaskID int64, state proto.SubtaskState, subTaskErr error) error { start := time.Now() // retry for 3+6+12+24+(30-4)*30 ~= 825s ~= 14 minutes diff --git a/pkg/disttask/framework/taskexecutor/task_executor_test.go b/pkg/disttask/framework/taskexecutor/task_executor_test.go index 63107282cb6c3..02e8aacac860f 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor_test.go +++ b/pkg/disttask/framework/taskexecutor/task_executor_test.go @@ -326,6 +326,19 @@ func TestTaskExecutorRun(t *testing.T) { <-ctx.Done() return ctx.Err() }) + e.taskExecExt.EXPECT().IsRetryableError(gomock.Any()).Return(true) + // keep running next subtask + nextSubtask := &proto.Subtask{SubtaskBase: proto.SubtaskBase{ + ID: 2, Type: e.task1.Type, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}} + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil) + e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(nextSubtask, nil) + e.stepExecutor.EXPECT().GetStep().Return(proto.StepOne) + e.taskTable.EXPECT().StartSubtask(gomock.Any(), nextSubtask.ID, "id").Return(nil) + e.stepExecutor.EXPECT().RunSubtask(gomock.Any(), nextSubtask).Return(nil) + e.taskTable.EXPECT().FinishSubtask(gomock.Any(), "id", nextSubtask.ID, gomock.Any()).Return(nil) + // exit + e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.succeedTask1, nil) e.stepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) e.taskExecutor.Run() require.True(t, e.ctrl.Satisfied()) @@ -886,7 +899,7 @@ func TestCheckBalanceSubtask(t *testing.T) { mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", task.ID, task.Step, proto.SubtaskStateRunning).Return([]*proto.Subtask{}, nil) runCtx, cancel := context.WithCancel(ctx) - taskExecutor.cancel = cancel + taskExecutor.mu.subtaskCancel = cancel require.NoError(t, runCtx.Err()) taskExecutor.checkBalanceSubtask(ctx) require.ErrorIs(t, runCtx.Err(), context.Canceled) From 83c7c6a6edbf0bb8dfdf9c3bd015485e08993e98 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 6 Jan 2025 15:21:37 +0800 Subject: [PATCH 6/8] don't store subtask context cancel function --- .../framework/taskexecutor/task_executor.go | 18 +++--------------- .../taskexecutor/task_executor_test.go | 11 +++++------ 2 files changed, 8 insertions(+), 21 deletions(-) diff --git a/pkg/disttask/framework/taskexecutor/task_executor.go b/pkg/disttask/framework/taskexecutor/task_executor.go index e5de734c1748c..71269bca2c406 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor.go +++ b/pkg/disttask/framework/taskexecutor/task_executor.go @@ -90,7 +90,6 @@ type BaseTaskExecutor struct { sync.RWMutex // runtimeCancel is used to cancel the Run/Rollback when error occurs. runtimeCancel context.CancelCauseFunc - subtaskCancel context.CancelFunc } stepExec execute.StepExecutor @@ -123,7 +122,7 @@ func NewBaseTaskExecutor(ctx context.Context, task *proto.Task, param Param) *Ba // `pending` state, to make sure subtasks can be balanced later when node scale out. // - If current running subtask are scheduled away from this node, i.e. this node // is taken as down, cancel running. -func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) { +func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context, subtaskCancelCtx context.CancelFunc) { ticker := time.NewTicker(checkBalanceSubtaskInterval) defer ticker.Stop() for { @@ -144,7 +143,7 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) { e.logger.Info("subtask is scheduled away, cancel running", zap.Int64("subtaskID", e.currSubtaskID.Load())) // cancels runStep, but leave the subtask state unchanged. - e.cancelSubtaskCtx() + subtaskCancelCtx() failpoint.InjectCall("afterCancelSubtaskExec") return } @@ -427,14 +426,11 @@ func (e *BaseTaskExecutor) runSubtask(subtask *proto.Subtask) (resErr error) { subtaskErr := func() error { e.currSubtaskID.Store(subtask.ID) subtaskCtx, subtaskCancelCtx := context.WithCancel(e.stepCtx) - e.mu.Lock() - e.mu.subtaskCancel = subtaskCancelCtx - e.mu.Unlock() var wg util.WaitGroupWrapper checkCtx, checkCancel := context.WithCancel(subtaskCtx) wg.RunWithLog(func() { - e.checkBalanceSubtask(checkCtx) + e.checkBalanceSubtask(checkCtx, subtaskCancelCtx) }) if e.hasRealtimeSummary(e.stepExec) { @@ -500,14 +496,6 @@ func (e *BaseTaskExecutor) cancelRunStepWith(cause error) { } } -func (e *BaseTaskExecutor) cancelSubtaskCtx() { - e.mu.Lock() - defer e.mu.Unlock() - if e.mu.subtaskCancel != nil { - e.mu.subtaskCancel() - } -} - func (e *BaseTaskExecutor) updateSubtaskStateAndErrorImpl(ctx context.Context, execID string, subtaskID int64, state proto.SubtaskState, subTaskErr error) error { start := time.Now() // retry for 3+6+12+24+(30-4)*30 ~= 825s ~= 14 minutes diff --git a/pkg/disttask/framework/taskexecutor/task_executor_test.go b/pkg/disttask/framework/taskexecutor/task_executor_test.go index 02e8aacac860f..4f434ca694f92 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor_test.go +++ b/pkg/disttask/framework/taskexecutor/task_executor_test.go @@ -890,7 +890,7 @@ func TestCheckBalanceSubtask(t *testing.T) { // context canceled canceledCtx, cancel := context.WithCancel(ctx) cancel() - taskExecutor.checkBalanceSubtask(canceledCtx) + taskExecutor.checkBalanceSubtask(canceledCtx, nil) }) t.Run("subtask scheduled away", func(t *testing.T) { @@ -899,9 +899,8 @@ func TestCheckBalanceSubtask(t *testing.T) { mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", task.ID, task.Step, proto.SubtaskStateRunning).Return([]*proto.Subtask{}, nil) runCtx, cancel := context.WithCancel(ctx) - taskExecutor.mu.subtaskCancel = cancel require.NoError(t, runCtx.Err()) - taskExecutor.checkBalanceSubtask(ctx) + taskExecutor.checkBalanceSubtask(ctx, cancel) require.ErrorIs(t, runCtx.Err(), context.Canceled) require.True(t, ctrl.Satisfied()) }) @@ -914,7 +913,7 @@ func TestCheckBalanceSubtask(t *testing.T) { mockExtension.EXPECT().IsIdempotent(subtasks[0]).Return(false) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "tidb1", subtasks[0].ID, proto.SubtaskStateFailed, ErrNonIdempotentSubtask).Return(nil) - taskExecutor.checkBalanceSubtask(ctx) + taskExecutor.checkBalanceSubtask(ctx, nil) require.True(t, ctrl.Satisfied()) // if we failed to change state of non-idempotent subtask, will retry @@ -931,7 +930,7 @@ func TestCheckBalanceSubtask(t *testing.T) { mockExtension.EXPECT().IsIdempotent(subtasks[0]).Return(false) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "tidb1", subtasks[0].ID, proto.SubtaskStateFailed, ErrNonIdempotentSubtask).Return(nil) - taskExecutor.checkBalanceSubtask(ctx) + taskExecutor.checkBalanceSubtask(ctx, nil) require.True(t, ctrl.Satisfied()) }) @@ -946,7 +945,7 @@ func TestCheckBalanceSubtask(t *testing.T) { // used to break the loop mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", task.ID, task.Step, proto.SubtaskStateRunning).Return(nil, nil) - taskExecutor.checkBalanceSubtask(ctx) + taskExecutor.checkBalanceSubtask(ctx, nil) require.True(t, ctrl.Satisfied()) }) } From 8681e00961832d22b97dbc0106b0f81a0f192ae0 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 6 Jan 2025 16:25:59 +0800 Subject: [PATCH 7/8] cancel contexts in order --- pkg/disttask/framework/taskexecutor/task_executor.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/disttask/framework/taskexecutor/task_executor.go b/pkg/disttask/framework/taskexecutor/task_executor.go index 71269bca2c406..99406ef712342 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor.go +++ b/pkg/disttask/framework/taskexecutor/task_executor.go @@ -122,7 +122,7 @@ func NewBaseTaskExecutor(ctx context.Context, task *proto.Task, param Param) *Ba // `pending` state, to make sure subtasks can be balanced later when node scale out. // - If current running subtask are scheduled away from this node, i.e. this node // is taken as down, cancel running. -func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context, subtaskCancelCtx context.CancelFunc) { +func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context, subtaskCtxCancel context.CancelFunc) { ticker := time.NewTicker(checkBalanceSubtaskInterval) defer ticker.Stop() for { @@ -143,7 +143,7 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context, subtaskCance e.logger.Info("subtask is scheduled away, cancel running", zap.Int64("subtaskID", e.currSubtaskID.Load())) // cancels runStep, but leave the subtask state unchanged. - subtaskCancelCtx() + subtaskCtxCancel() failpoint.InjectCall("afterCancelSubtaskExec") return } @@ -425,12 +425,12 @@ func (e *BaseTaskExecutor) runSubtask(subtask *proto.Subtask) (resErr error) { logTask := llog.BeginTask(logger, "run subtask") subtaskErr := func() error { e.currSubtaskID.Store(subtask.ID) - subtaskCtx, subtaskCancelCtx := context.WithCancel(e.stepCtx) + subtaskCtx, subtaskCtxCancel := context.WithCancel(e.stepCtx) var wg util.WaitGroupWrapper checkCtx, checkCancel := context.WithCancel(subtaskCtx) wg.RunWithLog(func() { - e.checkBalanceSubtask(checkCtx, subtaskCancelCtx) + e.checkBalanceSubtask(checkCtx, subtaskCtxCancel) }) if e.hasRealtimeSummary(e.stepExec) { @@ -440,8 +440,8 @@ func (e *BaseTaskExecutor) runSubtask(subtask *proto.Subtask) (resErr error) { } defer func() { checkCancel() - subtaskCancelCtx() wg.Wait() + subtaskCtxCancel() }() return e.stepExec.RunSubtask(subtaskCtx, subtask) }() From 6976bf37969747dd079dea04e7513051a8f4bdb5 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 7 Jan 2025 11:27:50 +0800 Subject: [PATCH 8/8] fix TestCheckBalanceSubtask --- pkg/disttask/framework/taskexecutor/task_executor.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/disttask/framework/taskexecutor/task_executor.go b/pkg/disttask/framework/taskexecutor/task_executor.go index 99406ef712342..f0ead608c8eb0 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor.go +++ b/pkg/disttask/framework/taskexecutor/task_executor.go @@ -143,7 +143,9 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context, subtaskCtxCa e.logger.Info("subtask is scheduled away, cancel running", zap.Int64("subtaskID", e.currSubtaskID.Load())) // cancels runStep, but leave the subtask state unchanged. - subtaskCtxCancel() + if subtaskCtxCancel != nil { + subtaskCtxCancel() + } failpoint.InjectCall("afterCancelSubtaskExec") return }