diff --git a/ddl/index.go b/ddl/index.go index acbb737ffd584..a97a1c32cb337 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -501,7 +501,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t tab return &addIndexWorker{ id: id, ddlWorker: worker, - batchCnt: DefaultTaskHandleCnt, + batchCnt: int(variable.GetDDLReorgBatchSize()), sessCtx: sessCtx, taskCh: make(chan *reorgIndexTask, 1), resultCh: make(chan *addIndexResult, 1), @@ -778,7 +778,8 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad handleRange := *task result := &addIndexResult{addedCount: 0, nextHandle: handleRange.startHandle, err: nil} lastLogCount := 0 - startTime := time.Now() + lastLogTime := time.Now() + startTime := lastLogTime for { taskCtx, err := w.backfillIndexInTxn(handleRange) @@ -795,10 +796,11 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad mergeAddIndexCtxToResult(&taskCtx, result) w.ddlWorker.reorgCtx.increaseRowCount(int64(taskCtx.addedCount)) - if result.scanCount-lastLogCount >= 30000 { + if num := result.scanCount - lastLogCount; num >= 30000 { lastLogCount = result.scanCount - log.Infof("[ddl-reorg] worker(%v), finish batch addedCount:%v backfill, task addedCount:%v, task scanCount:%v, nextHandle:%v", - w.id, taskCtx.addedCount, result.addedCount, result.scanCount, taskCtx.nextHandle) + log.Infof("[ddl-reorg] worker(%v), finish batch addedCount:%v backfill, task addedCount:%v, task scanCount:%v, nextHandle:%v, avg row time(ms):%v", + w.id, taskCtx.addedCount, result.addedCount, result.scanCount, taskCtx.nextHandle, time.Since(lastLogTime).Seconds()*1000/float64(num)) + lastLogTime = time.Now() } handleRange.startHandle = taskCtx.nextHandle @@ -844,6 +846,8 @@ func (w *addIndexWorker) run(d *ddlCtx) { // continue //} + // Dynamic change batch size. + w.batchCnt = int(variable.GetDDLReorgBatchSize()) result := w.handleBackfillTask(d, task) w.resultCh <- result } diff --git a/executor/ddl_test.go b/executor/ddl_test.go index 627fc758dc863..7e1577c2e192c 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -450,3 +450,32 @@ func (s *testSuite) TestSetDDLReorgWorkerCnt(c *C) { res = tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt") res.Check(testkit.Rows("100")) } + +func (s *testSuite) TestSetDDLReorgBatchSize(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(variable.DefTiDBDDLReorgBatchSize)) + + tk.MustExec("set tidb_ddl_reorg_batch_size = 1") + tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_reorg_batch_size value: '1'")) + c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(variable.MinDDLReorgBatchSize)) + tk.MustExec(fmt.Sprintf("set tidb_ddl_reorg_batch_size = %v", variable.MaxDDLReorgBatchSize+1)) + tk.MustQuery("show warnings;").Check(testkit.Rows(fmt.Sprintf("Warning 1292 Truncated incorrect tidb_ddl_reorg_batch_size value: '%d'", variable.MaxDDLReorgBatchSize+1))) + c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(variable.MaxDDLReorgBatchSize)) + _, err := tk.Exec("set tidb_ddl_reorg_batch_size = invalid_val") + c.Assert(terror.ErrorEqual(err, variable.ErrWrongTypeForVar), IsTrue, Commentf("err %v", err)) + tk.MustExec("set tidb_ddl_reorg_batch_size = 100") + c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(100)) + tk.MustExec("set tidb_ddl_reorg_batch_size = -1") + tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_reorg_batch_size value: '-1'")) + + tk.MustExec("set tidb_ddl_reorg_batch_size = 100") + res := tk.MustQuery("select @@tidb_ddl_reorg_batch_size") + res.Check(testkit.Rows("100")) + + res = tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size") + res.Check(testkit.Rows(fmt.Sprintf("%v", variable.DefTiDBDDLReorgBatchSize))) + tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 1000") + res = tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size") + res.Check(testkit.Rows("1000")) +} diff --git a/session/session.go b/session/session.go index 98184192e9fc5..928cde4b81e3d 100644 --- a/session/session.go +++ b/session/session.go @@ -1402,6 +1402,7 @@ const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variab variable.TiDBBackoffLockFast + quoteCommaQuote + variable.TiDBConstraintCheckInPlace + quoteCommaQuote + variable.TiDBDDLReorgWorkerCount + quoteCommaQuote + + variable.TiDBDDLReorgBatchSize + quoteCommaQuote + variable.TiDBOptInSubqToJoinAndAgg + quoteCommaQuote + variable.TiDBDistSQLScanConcurrency + quoteCommaQuote + variable.TiDBMaxChunkSize + quoteCommaQuote + diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 31ce4ed71ec33..a4e50fa83a118 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -689,6 +689,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.EnableTablePartition = val case TiDBDDLReorgWorkerCount: SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount))) + case TiDBDDLReorgBatchSize: + SetDDLReorgBatchSize(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgBatchSize))) case TiDBDDLReorgPriority: s.setDDLReorgPriority(val) case TiDBForcePriority: diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 40a4b7cf6c952..26a03c098a550 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -671,6 +671,7 @@ var defaultSysVars = []*SysVar{ {ScopeSession, TiDBQueryLogMaxLen, strconv.Itoa(logutil.DefaultQueryLogMaxLen)}, {ScopeSession, TiDBConfig, ""}, {ScopeGlobal | ScopeSession, TiDBDDLReorgWorkerCount, strconv.Itoa(DefTiDBDDLReorgWorkerCount)}, + {ScopeGlobal | ScopeSession, TiDBDDLReorgBatchSize, strconv.Itoa(DefTiDBDDLReorgBatchSize)}, {ScopeSession, TiDBDDLReorgPriority, "PRIORITY_LOW"}, {ScopeSession, TiDBForcePriority, mysql.Priority2Str[DefTiDBForcePriority]}, {ScopeSession, TiDBEnableRadixJoin, boolToIntStr(DefTiDBUseRadixJoin)}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 2038881a50c60..cbc69ee8e2142 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -200,6 +200,9 @@ const ( // tidb_ddl_reorg_worker_cnt defines the count of ddl reorg workers. TiDBDDLReorgWorkerCount = "tidb_ddl_reorg_worker_cnt" + // tidb_ddl_reorg_batch_size defines the transaction batch size of ddl reorg workers. + TiDBDDLReorgBatchSize = "tidb_ddl_reorg_batch_size" + // tidb_ddl_reorg_priority defines the operations priority of adding indices. // It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH TiDBDDLReorgPriority = "tidb_ddl_reorg_priority" @@ -260,6 +263,7 @@ const ( DefTiDBProjectionConcurrency = 4 DefTiDBOptimizerSelectivityLevel = 0 DefTiDBDDLReorgWorkerCount = 16 + DefTiDBDDLReorgBatchSize = 1024 DefTiDBHashAggPartialConcurrency = 4 DefTiDBHashAggFinalConcurrency = 4 DefTiDBForcePriority = mysql.NoPriority @@ -270,9 +274,13 @@ const ( // Process global variables. var ( ProcessGeneralLog uint32 - ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount - maxDDLReorgWorkerCount int32 = 128 - DDLSlowOprThreshold uint32 = 300 // DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond. - ForcePriority = int32(DefTiDBForcePriority) - ServerHostname, _ = os.Hostname() + ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount + maxDDLReorgWorkerCount int32 = 128 + ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize + // Export for testing. + MaxDDLReorgBatchSize int32 = 10240 + MinDDLReorgBatchSize int32 = 32 + DDLSlowOprThreshold uint32 = 300 // DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond. + ForcePriority = int32(DefTiDBForcePriority) + ServerHostname, _ = os.Hostname() ) diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index ad81e0ff4da69..4d55292c61ff3 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -46,6 +46,23 @@ func GetDDLReorgWorkerCounter() int32 { return atomic.LoadInt32(&ddlReorgWorkerCounter) } +// SetDDLReorgBatchSize sets ddlReorgBatchSize size. +// Max batch size is MaxDDLReorgBatchSize. +func SetDDLReorgBatchSize(cnt int32) { + if cnt > MaxDDLReorgBatchSize { + cnt = MaxDDLReorgBatchSize + } + if cnt < MinDDLReorgBatchSize { + cnt = MinDDLReorgBatchSize + } + atomic.StoreInt32(&ddlReorgBatchSize, cnt) +} + +// GetDDLReorgBatchSize gets ddlReorgBatchSize. +func GetDDLReorgBatchSize() int32 { + return atomic.LoadInt32(&ddlReorgBatchSize) +} + // GetSessionSystemVar gets a system variable. // If it is a session only variable, use the default value defined in code. // Returns error if there is no such variable. @@ -329,6 +346,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, return "auto", nil } return value, ErrWrongValueForVar.GenWithStackByArgs(name, value) + case TiDBDDLReorgBatchSize: + return checkUInt64SystemVar(name, value, uint64(MinDDLReorgBatchSize), uint64(MaxDDLReorgBatchSize), vars) case TiDBIndexLookupConcurrency, TiDBIndexLookupJoinConcurrency, TiDBIndexJoinBatchSize, TiDBIndexLookupSize, TiDBHashJoinConcurrency,