From d1ff903db1338f4f31e693e49fdb0785427af13c Mon Sep 17 00:00:00 2001 From: crazycs Date: Tue, 4 Dec 2018 19:07:12 +0800 Subject: [PATCH] ddl: add ddl_reorg_batch_size variable to control ddl worker batch size and enlarge default batch size. (#8365) --- ddl/index.go | 14 +++++++++----- executor/ddl_test.go | 29 +++++++++++++++++++++++++++++ session/session.go | 1 + sessionctx/variable/session.go | 2 ++ sessionctx/variable/sysvar.go | 1 + sessionctx/variable/tidb_vars.go | 18 +++++++++++++----- sessionctx/variable/varsutil.go | 19 +++++++++++++++++++ 7 files changed, 74 insertions(+), 10 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index 993258e7b1913..55371e3a0409b 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -498,7 +498,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t tab return &addIndexWorker{ id: id, ddlWorker: worker, - batchCnt: DefaultTaskHandleCnt, + batchCnt: int(variable.GetDDLReorgBatchSize()), sessCtx: sessCtx, taskCh: make(chan *reorgIndexTask, 1), resultCh: make(chan *addIndexResult, 1), @@ -775,7 +775,8 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad handleRange := *task result := &addIndexResult{addedCount: 0, nextHandle: handleRange.startHandle, err: nil} lastLogCount := 0 - startTime := time.Now() + lastLogTime := time.Now() + startTime := lastLogTime for { taskCtx, err := w.backfillIndexInTxn(handleRange) @@ -792,10 +793,11 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad mergeAddIndexCtxToResult(&taskCtx, result) w.ddlWorker.reorgCtx.increaseRowCount(int64(taskCtx.addedCount)) - if result.scanCount-lastLogCount >= 30000 { + if num := result.scanCount - lastLogCount; num >= 30000 { lastLogCount = result.scanCount - log.Infof("[ddl-reorg] worker(%v), finish batch addedCount:%v backfill, task addedCount:%v, task scanCount:%v, nextHandle:%v", - w.id, taskCtx.addedCount, result.addedCount, result.scanCount, taskCtx.nextHandle) + log.Infof("[ddl-reorg] worker(%v), finish batch addedCount:%v backfill, task addedCount:%v, task scanCount:%v, nextHandle:%v, avg row time(ms):%v", + w.id, taskCtx.addedCount, result.addedCount, result.scanCount, taskCtx.nextHandle, time.Since(lastLogTime).Seconds()*1000/float64(num)) + lastLogTime = time.Now() } handleRange.startHandle = taskCtx.nextHandle @@ -841,6 +843,8 @@ func (w *addIndexWorker) run(d *ddlCtx) { // continue //} + // Dynamic change batch size. + w.batchCnt = int(variable.GetDDLReorgBatchSize()) result := w.handleBackfillTask(d, task) w.resultCh <- result } diff --git a/executor/ddl_test.go b/executor/ddl_test.go index 627fc758dc863..7e1577c2e192c 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -450,3 +450,32 @@ func (s *testSuite) TestSetDDLReorgWorkerCnt(c *C) { res = tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt") res.Check(testkit.Rows("100")) } + +func (s *testSuite) TestSetDDLReorgBatchSize(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(variable.DefTiDBDDLReorgBatchSize)) + + tk.MustExec("set tidb_ddl_reorg_batch_size = 1") + tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_reorg_batch_size value: '1'")) + c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(variable.MinDDLReorgBatchSize)) + tk.MustExec(fmt.Sprintf("set tidb_ddl_reorg_batch_size = %v", variable.MaxDDLReorgBatchSize+1)) + tk.MustQuery("show warnings;").Check(testkit.Rows(fmt.Sprintf("Warning 1292 Truncated incorrect tidb_ddl_reorg_batch_size value: '%d'", variable.MaxDDLReorgBatchSize+1))) + c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(variable.MaxDDLReorgBatchSize)) + _, err := tk.Exec("set tidb_ddl_reorg_batch_size = invalid_val") + c.Assert(terror.ErrorEqual(err, variable.ErrWrongTypeForVar), IsTrue, Commentf("err %v", err)) + tk.MustExec("set tidb_ddl_reorg_batch_size = 100") + c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(100)) + tk.MustExec("set tidb_ddl_reorg_batch_size = -1") + tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_reorg_batch_size value: '-1'")) + + tk.MustExec("set tidb_ddl_reorg_batch_size = 100") + res := tk.MustQuery("select @@tidb_ddl_reorg_batch_size") + res.Check(testkit.Rows("100")) + + res = tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size") + res.Check(testkit.Rows(fmt.Sprintf("%v", variable.DefTiDBDDLReorgBatchSize))) + tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 1000") + res = tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size") + res.Check(testkit.Rows("1000")) +} diff --git a/session/session.go b/session/session.go index afa423136cb31..3d5a003af6b2d 100644 --- a/session/session.go +++ b/session/session.go @@ -1313,6 +1313,7 @@ const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variab variable.TiDBBackoffLockFast + quoteCommaQuote + variable.TiDBDDLReorgWorkerCount + quoteCommaQuote + variable.TiDBOptInSubqUnFolding + quoteCommaQuote + + variable.TiDBDDLReorgBatchSize + quoteCommaQuote + variable.TiDBDistSQLScanConcurrency + quoteCommaQuote + variable.TiDBMaxChunkSize + quoteCommaQuote + variable.TiDBRetryLimit + quoteCommaQuote + diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 57446b6e1d951..3ecb985959ff5 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -617,6 +617,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.EnableTablePartition = TiDBOptOn(val) case TiDBDDLReorgWorkerCount: SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount))) + case TiDBDDLReorgBatchSize: + SetDDLReorgBatchSize(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgBatchSize))) case TiDBDDLReorgPriority: s.setDDLReorgPriority(val) case TiDBForcePriority: diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 712c0525fadb5..72b9c77e7baa1 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -665,6 +665,7 @@ var defaultSysVars = []*SysVar{ {ScopeSession, TiDBQueryLogMaxLen, strconv.Itoa(logutil.DefaultQueryLogMaxLen)}, {ScopeSession, TiDBConfig, ""}, {ScopeGlobal | ScopeSession, TiDBDDLReorgWorkerCount, strconv.Itoa(DefTiDBDDLReorgWorkerCount)}, + {ScopeGlobal | ScopeSession, TiDBDDLReorgBatchSize, strconv.Itoa(DefTiDBDDLReorgBatchSize)}, {ScopeSession, TiDBDDLReorgPriority, "PRIORITY_LOW"}, {ScopeSession, TiDBForcePriority, mysql.Priority2Str[DefTiDBForcePriority]}, } diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 7b2801d4212ee..f89e4ac141aae 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -193,6 +193,9 @@ const ( // tidb_ddl_reorg_worker_cnt defines the count of ddl reorg workers. TiDBDDLReorgWorkerCount = "tidb_ddl_reorg_worker_cnt" + // tidb_ddl_reorg_batch_size defines the transaction batch size of ddl reorg workers. + TiDBDDLReorgBatchSize = "tidb_ddl_reorg_batch_size" + // tidb_ddl_reorg_priority defines the operations priority of adding indices. // It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH TiDBDDLReorgPriority = "tidb_ddl_reorg_priority" @@ -239,6 +242,7 @@ const ( DefTiDBProjectionConcurrency = 4 DefTiDBOptimizerSelectivityLevel = 0 DefTiDBDDLReorgWorkerCount = 16 + DefTiDBDDLReorgBatchSize = 1024 DefTiDBHashAggPartialConcurrency = 4 DefTiDBHashAggFinalConcurrency = 4 DefTiDBForcePriority = mysql.NoPriority @@ -247,9 +251,13 @@ const ( // Process global variables. var ( ProcessGeneralLog uint32 - ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount - maxDDLReorgWorkerCount int32 = 128 - DDLSlowOprThreshold uint32 = 300 // DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond. - ForcePriority = int32(DefTiDBForcePriority) - ServerHostname, _ = os.Hostname() + ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount + maxDDLReorgWorkerCount int32 = 128 + ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize + // Export for testing. + MaxDDLReorgBatchSize int32 = 10240 + MinDDLReorgBatchSize int32 = 32 + DDLSlowOprThreshold uint32 = 300 // DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond. + ForcePriority = int32(DefTiDBForcePriority) + ServerHostname, _ = os.Hostname() ) diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index f7bc5f3e1c796..56bc6dce9bbca 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -46,6 +46,23 @@ func GetDDLReorgWorkerCounter() int32 { return atomic.LoadInt32(&ddlReorgWorkerCounter) } +// SetDDLReorgBatchSize sets ddlReorgBatchSize size. +// Max batch size is MaxDDLReorgBatchSize. +func SetDDLReorgBatchSize(cnt int32) { + if cnt > MaxDDLReorgBatchSize { + cnt = MaxDDLReorgBatchSize + } + if cnt < MinDDLReorgBatchSize { + cnt = MinDDLReorgBatchSize + } + atomic.StoreInt32(&ddlReorgBatchSize, cnt) +} + +// GetDDLReorgBatchSize gets ddlReorgBatchSize. +func GetDDLReorgBatchSize() int32 { + return atomic.LoadInt32(&ddlReorgBatchSize) +} + // GetSessionSystemVar gets a system variable. // If it is a session only variable, use the default value defined in code. // Returns error if there is no such variable. @@ -303,6 +320,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, return value, nil } return value, ErrWrongValueForVar.GenWithStackByArgs(name, value) + case TiDBDDLReorgBatchSize: + return checkUInt64SystemVar(name, value, uint64(MinDDLReorgBatchSize), uint64(MaxDDLReorgBatchSize), vars) case TiDBIndexLookupConcurrency, TiDBIndexLookupJoinConcurrency, TiDBIndexJoinBatchSize, TiDBIndexLookupSize, TiDBHashJoinConcurrency,