Skip to content

Commit

Permalink
ddl: add ddl_reorg_batch_size variable to control ddl worker batch si…
Browse files Browse the repository at this point in the history
…ze and enlarge default batch size. (#8365)
  • Loading branch information
crazycs520 authored Dec 4, 2018
1 parent cb4d5bd commit e478be7
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 10 deletions.
14 changes: 9 additions & 5 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t tab
return &addIndexWorker{
id: id,
ddlWorker: worker,
batchCnt: DefaultTaskHandleCnt,
batchCnt: int(variable.GetDDLReorgBatchSize()),
sessCtx: sessCtx,
taskCh: make(chan *reorgIndexTask, 1),
resultCh: make(chan *addIndexResult, 1),
Expand Down Expand Up @@ -778,7 +778,8 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad
handleRange := *task
result := &addIndexResult{addedCount: 0, nextHandle: handleRange.startHandle, err: nil}
lastLogCount := 0
startTime := time.Now()
lastLogTime := time.Now()
startTime := lastLogTime

for {
taskCtx, err := w.backfillIndexInTxn(handleRange)
Expand All @@ -795,10 +796,11 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad
mergeAddIndexCtxToResult(&taskCtx, result)
w.ddlWorker.reorgCtx.increaseRowCount(int64(taskCtx.addedCount))

if result.scanCount-lastLogCount >= 30000 {
if num := result.scanCount - lastLogCount; num >= 30000 {
lastLogCount = result.scanCount
log.Infof("[ddl-reorg] worker(%v), finish batch addedCount:%v backfill, task addedCount:%v, task scanCount:%v, nextHandle:%v",
w.id, taskCtx.addedCount, result.addedCount, result.scanCount, taskCtx.nextHandle)
log.Infof("[ddl-reorg] worker(%v), finish batch addedCount:%v backfill, task addedCount:%v, task scanCount:%v, nextHandle:%v, avg row time(ms):%v",
w.id, taskCtx.addedCount, result.addedCount, result.scanCount, taskCtx.nextHandle, time.Since(lastLogTime).Seconds()*1000/float64(num))
lastLogTime = time.Now()
}

handleRange.startHandle = taskCtx.nextHandle
Expand Down Expand Up @@ -844,6 +846,8 @@ func (w *addIndexWorker) run(d *ddlCtx) {
// continue
//}

// Dynamic change batch size.
w.batchCnt = int(variable.GetDDLReorgBatchSize())
result := w.handleBackfillTask(d, task)
w.resultCh <- result
}
Expand Down
29 changes: 29 additions & 0 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,3 +450,32 @@ func (s *testSuite) TestSetDDLReorgWorkerCnt(c *C) {
res = tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt")
res.Check(testkit.Rows("100"))
}

func (s *testSuite) TestSetDDLReorgBatchSize(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(variable.DefTiDBDDLReorgBatchSize))

tk.MustExec("set tidb_ddl_reorg_batch_size = 1")
tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_reorg_batch_size value: '1'"))
c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(variable.MinDDLReorgBatchSize))
tk.MustExec(fmt.Sprintf("set tidb_ddl_reorg_batch_size = %v", variable.MaxDDLReorgBatchSize+1))
tk.MustQuery("show warnings;").Check(testkit.Rows(fmt.Sprintf("Warning 1292 Truncated incorrect tidb_ddl_reorg_batch_size value: '%d'", variable.MaxDDLReorgBatchSize+1)))
c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(variable.MaxDDLReorgBatchSize))
_, err := tk.Exec("set tidb_ddl_reorg_batch_size = invalid_val")
c.Assert(terror.ErrorEqual(err, variable.ErrWrongTypeForVar), IsTrue, Commentf("err %v", err))
tk.MustExec("set tidb_ddl_reorg_batch_size = 100")
c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(100))
tk.MustExec("set tidb_ddl_reorg_batch_size = -1")
tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_reorg_batch_size value: '-1'"))

tk.MustExec("set tidb_ddl_reorg_batch_size = 100")
res := tk.MustQuery("select @@tidb_ddl_reorg_batch_size")
res.Check(testkit.Rows("100"))

res = tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size")
res.Check(testkit.Rows(fmt.Sprintf("%v", variable.DefTiDBDDLReorgBatchSize)))
tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 1000")
res = tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size")
res.Check(testkit.Rows("1000"))
}
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1402,6 +1402,7 @@ const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variab
variable.TiDBBackoffLockFast + quoteCommaQuote +
variable.TiDBConstraintCheckInPlace + quoteCommaQuote +
variable.TiDBDDLReorgWorkerCount + quoteCommaQuote +
variable.TiDBDDLReorgBatchSize + quoteCommaQuote +
variable.TiDBOptInSubqToJoinAndAgg + quoteCommaQuote +
variable.TiDBDistSQLScanConcurrency + quoteCommaQuote +
variable.TiDBMaxChunkSize + quoteCommaQuote +
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.EnableTablePartition = val
case TiDBDDLReorgWorkerCount:
SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount)))
case TiDBDDLReorgBatchSize:
SetDDLReorgBatchSize(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgBatchSize)))
case TiDBDDLReorgPriority:
s.setDDLReorgPriority(val)
case TiDBForcePriority:
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBQueryLogMaxLen, strconv.Itoa(logutil.DefaultQueryLogMaxLen)},
{ScopeSession, TiDBConfig, ""},
{ScopeGlobal | ScopeSession, TiDBDDLReorgWorkerCount, strconv.Itoa(DefTiDBDDLReorgWorkerCount)},
{ScopeGlobal | ScopeSession, TiDBDDLReorgBatchSize, strconv.Itoa(DefTiDBDDLReorgBatchSize)},
{ScopeSession, TiDBDDLReorgPriority, "PRIORITY_LOW"},
{ScopeSession, TiDBForcePriority, mysql.Priority2Str[DefTiDBForcePriority]},
{ScopeSession, TiDBEnableRadixJoin, boolToIntStr(DefTiDBUseRadixJoin)},
Expand Down
18 changes: 13 additions & 5 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ const (
// tidb_ddl_reorg_worker_cnt defines the count of ddl reorg workers.
TiDBDDLReorgWorkerCount = "tidb_ddl_reorg_worker_cnt"

// tidb_ddl_reorg_batch_size defines the transaction batch size of ddl reorg workers.
TiDBDDLReorgBatchSize = "tidb_ddl_reorg_batch_size"

// tidb_ddl_reorg_priority defines the operations priority of adding indices.
// It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH
TiDBDDLReorgPriority = "tidb_ddl_reorg_priority"
Expand Down Expand Up @@ -260,6 +263,7 @@ const (
DefTiDBProjectionConcurrency = 4
DefTiDBOptimizerSelectivityLevel = 0
DefTiDBDDLReorgWorkerCount = 16
DefTiDBDDLReorgBatchSize = 1024
DefTiDBHashAggPartialConcurrency = 4
DefTiDBHashAggFinalConcurrency = 4
DefTiDBForcePriority = mysql.NoPriority
Expand All @@ -270,9 +274,13 @@ const (
// Process global variables.
var (
ProcessGeneralLog uint32
ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount
maxDDLReorgWorkerCount int32 = 128
DDLSlowOprThreshold uint32 = 300 // DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond.
ForcePriority = int32(DefTiDBForcePriority)
ServerHostname, _ = os.Hostname()
ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount
maxDDLReorgWorkerCount int32 = 128
ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize
// Export for testing.
MaxDDLReorgBatchSize int32 = 10240
MinDDLReorgBatchSize int32 = 32
DDLSlowOprThreshold uint32 = 300 // DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond.
ForcePriority = int32(DefTiDBForcePriority)
ServerHostname, _ = os.Hostname()
)
19 changes: 19 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ func GetDDLReorgWorkerCounter() int32 {
return atomic.LoadInt32(&ddlReorgWorkerCounter)
}

// SetDDLReorgBatchSize sets ddlReorgBatchSize size.
// Max batch size is MaxDDLReorgBatchSize.
func SetDDLReorgBatchSize(cnt int32) {
if cnt > MaxDDLReorgBatchSize {
cnt = MaxDDLReorgBatchSize
}
if cnt < MinDDLReorgBatchSize {
cnt = MinDDLReorgBatchSize
}
atomic.StoreInt32(&ddlReorgBatchSize, cnt)
}

// GetDDLReorgBatchSize gets ddlReorgBatchSize.
func GetDDLReorgBatchSize() int32 {
return atomic.LoadInt32(&ddlReorgBatchSize)
}

// GetSessionSystemVar gets a system variable.
// If it is a session only variable, use the default value defined in code.
// Returns error if there is no such variable.
Expand Down Expand Up @@ -329,6 +346,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return "auto", nil
}
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
case TiDBDDLReorgBatchSize:
return checkUInt64SystemVar(name, value, uint64(MinDDLReorgBatchSize), uint64(MaxDDLReorgBatchSize), vars)
case TiDBIndexLookupConcurrency, TiDBIndexLookupJoinConcurrency, TiDBIndexJoinBatchSize,
TiDBIndexLookupSize,
TiDBHashJoinConcurrency,
Expand Down

0 comments on commit e478be7

Please sign in to comment.