From 81d0ddc0cfb6c05ef1d07160a8372c079e3ffeb2 Mon Sep 17 00:00:00 2001 From: Lynn Date: Wed, 10 Jan 2018 13:25:27 +0800 Subject: [PATCH] ddl: update the behavior when 'RunWorker's' is false (#5598) --- ddl/ddl.go | 13 ++++++++---- ddl/ddl_worker.go | 3 --- ddl/ddl_worker_test.go | 48 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 7 deletions(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index da4b0c5f6cc57..744c3f5905550 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -326,10 +326,15 @@ func (d *ddl) Stop() error { func (d *ddl) start(ctx goctx.Context) { d.quitCh = make(chan struct{}) - err := d.ownerManager.CampaignOwner(ctx) - terror.Log(errors.Trace(err)) - d.wait.Add(1) - go d.onDDLWorker() + + // If RunWorker is true, we need campaign owner and do DDL job. + // Otherwise, we needn't do that. + if RunWorker { + err := d.ownerManager.CampaignOwner(ctx) + terror.Log(errors.Trace(err)) + d.wait.Add(1) + go d.onDDLWorker() + } // For every start, we will send a fake job to let worker // check owner firstly and try to find whether a job exists and run. diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 3ac18b655d2ab..580a76e72c908 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -34,9 +34,6 @@ var RunWorker = true // then wait or pull the job queue to handle a schema change job. func (d *ddl) onDDLWorker() { defer d.wait.Done() - if !RunWorker { - return - } // We use 4 * lease time to check owner's timeout, so here, we will update owner's status // every 2 * lease time. If lease is 0, we will use default 1s. diff --git a/ddl/ddl_worker_test.go b/ddl/ddl_worker_test.go index 3f91594d383d7..51af96e18e3d3 100644 --- a/ddl/ddl_worker_test.go +++ b/ddl/ddl_worker_test.go @@ -52,6 +52,54 @@ func (s *testDDLSuite) TestCheckOwner(c *C) { c.Assert(d1.GetLease(), Equals, 2*time.Second) } +// TestRunWorker tests no job is handled when the value of RunWorker is false. +func (s *testDDLSuite) TestRunWorker(c *C) { + defer testleak.AfterTest(c)() + store := testCreateStore(c, "test_run_worker") + defer store.Close() + + RunWorker = false + d := testNewDDL(goctx.Background(), nil, store, nil, nil, testLease) + testCheckOwner(c, d, false) + defer d.Stop() + ctx := testNewContext(d) + + dbInfo := testSchemaInfo(c, d, "test") + job := &model.Job{ + SchemaID: dbInfo.ID, + Type: model.ActionCreateSchema, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{dbInfo}, + } + + exitCh := make(chan struct{}) + go func(ch chan struct{}) { + err := d.doDDLJob(ctx, job) + c.Assert(err, IsNil) + close(ch) + }(exitCh) + // Make sure the DDL job is in the DDL job queue. + // The reason for doing it twice is to eliminate the operation in the start function. + <-d.ddlJobCh + <-d.ddlJobCh + // Make sure the DDL job doesn't be handled. + kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) + job, err := d.getFirstDDLJob(t) + c.Assert(err, IsNil) + c.Assert(job, NotNil) + c.Assert(job.SchemaID, Equals, dbInfo.ID, Commentf("job %s", job)) + return nil + }) + // Make sure the DDL job can be done and exit that goroutine. + RunWorker = true + d1 := testNewDDL(goctx.Background(), nil, store, nil, nil, testLease) + testCheckOwner(c, d1, true) + defer d1.Stop() + asyncNotify(d1.ddlJobCh) + <-exitCh +} + func (s *testDDLSuite) TestSchemaError(c *C) { defer testleak.AfterTest(c)() store := testCreateStore(c, "test_schema_error")