From ba259650787f1af2e7f40f6d476f6bc6915e107f Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Mon, 1 Apr 2019 13:13:39 +0800 Subject: [PATCH 1/4] executor: control Chunk size for TableReader&IndexReader&IndexLookup (#9452) --- executor/builder.go | 37 ++- executor/distsql.go | 66 ++++- executor/executor_required_rows_test.go | 1 + executor/table_reader.go | 25 +- executor/table_readers_required_rows_test.go | 241 +++++++++++++++++++ 5 files changed, 353 insertions(+), 17 deletions(-) create mode 100644 executor/table_readers_required_rows_test.go diff --git a/executor/builder.go b/executor/builder.go index fee88b522a325..c17fe6ac56c2c 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1622,7 +1622,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin) filter: outerFilter, }, innerCtx: innerCtx{ - readerBuilder: &dataReaderBuilder{innerPlan, b}, + readerBuilder: &dataReaderBuilder{Plan: innerPlan, executorBuilder: b}, rowTypes: innerTypes, }, workerWg: new(sync.WaitGroup), @@ -1851,6 +1851,8 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo type dataReaderBuilder struct { plannercore.Plan *executorBuilder + + selectResultHook // for testing } func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, datums [][]types.Datum, @@ -1866,6 +1868,25 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, return nil, errors.New("Wrong plan type for dataReaderBuilder") } +//<<<<<<< HEAD +//======= +func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context, v *plannercore.PhysicalUnionScan, + values [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) { + childBuilder := &dataReaderBuilder{Plan: v.Children()[0], executorBuilder: builder.executorBuilder} + reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff) + if err != nil { + return nil, err + } + e, err := builder.buildUnionScanFromReader(reader, v) + if err != nil { + return nil, err + } + us := e.(*UnionScanExec) + us.snapshotChunkBuffer = us.newFirstChunk() + return us, nil +} + +//>>>>>>> 435a08140... executor: control Chunk size for TableReader&IndexReader&IndexLookup (#9452) func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalTableReader, datums [][]types.Datum) (Executor, error) { e, err := buildNoRangeTableReader(builder.executorBuilder, v) if err != nil { @@ -1892,7 +1913,11 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex return nil, errors.Trace(err) } e.resultHandler = &tableResultHandler{} - result, err := distsql.Select(ctx, builder.ctx, kvReq, e.retTypes(), e.feedback) +//<<<<<<< HEAD +// result, err := distsql.Select(ctx, builder.ctx, kvReq, e.retTypes(), e.feedback) +//======= + result, err := builder.SelectResult(ctx, builder.ctx, kvReq, e.retTypes(), e.feedback, getPhysicalPlanIDs(e.plans)) +//>>>>>>> 435a08140... executor: control Chunk size for TableReader&IndexReader&IndexLookup (#9452) if err != nil { return nil, errors.Trace(err) } @@ -1921,11 +1946,15 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context if err != nil { return nil, errors.Trace(err) } - kvRanges, err := buildKvRangesForIndexJoin(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, values, indexRanges, keyOff2IdxOff) +//<<<<<<< HEAD +// kvRanges, err := buildKvRangesForIndexJoin(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, values, indexRanges, keyOff2IdxOff) +//======= + e.kvRanges, err = buildKvRangesForIndexJoin(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableIDs(e.table), e.index.ID, values, indexRanges, keyOff2IdxOff) +//>>>>>>> 435a08140... executor: control Chunk size for TableReader&IndexReader&IndexLookup (#9452) if err != nil { return nil, errors.Trace(err) } - err = e.open(ctx, kvRanges) + err = e.open(ctx) return e, errors.Trace(err) } diff --git a/executor/distsql.go b/executor/distsql.go index f11a1f89e20bb..d6935fcc5770b 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -233,6 +233,8 @@ type IndexReaderExecutor struct { idxCols []*expression.Column colLens []int plans []plannercore.PhysicalPlan + + selectResultHook // for testing } // Close clears all resources hold by current object. @@ -294,7 +296,11 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) e.feedback.Invalidate() return errors.Trace(err) } - e.result, err = distsql.Select(ctx, e.ctx, kvReq, e.retTypes(), e.feedback) +//<<<<<<< HEAD +// e.result, err = distsql.Select(ctx, e.ctx, kvReq, e.retTypes(), e.feedback) +//======= + e.result, err = e.SelectResult(ctx, e.ctx, kvReq, e.retTypes(), e.feedback, getPhysicalPlanIDs(e.plans)) +//>>>>>>> 435a08140... executor: control Chunk size for TableReader&IndexReader&IndexLookup (#9452) if err != nil { e.feedback.Invalidate() return errors.Trace(err) @@ -328,6 +334,9 @@ type IndexLookUpExecutor struct { tblWorkerWg sync.WaitGroup finished chan struct{} + kvRanges []kv.KeyRange + workerStarted bool + resultCh chan *lookupTableTask resultCurr *lookupTableTask feedback *statistics.QueryFeedback @@ -356,19 +365,23 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error { return errors.Trace(err) } } - kvRanges, err := distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, e.ranges, e.feedback) +//<<<<<<< HEAD +// kvRanges, err := distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, e.ranges, e.feedback) +//======= + e.kvRanges, err = distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.index.ID, e.ranges, e.feedback) +//>>>>>>> 435a08140... executor: control Chunk size for TableReader&IndexReader&IndexLookup (#9452) if err != nil { e.feedback.Invalidate() return errors.Trace(err) } - err = e.open(ctx, kvRanges) + err = e.open(ctx) if err != nil { e.feedback.Invalidate() } return errors.Trace(err) } -func (e *IndexLookUpExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error { +func (e *IndexLookUpExecutor) open(ctx context.Context) error { // We have to initialize "memTracker" and other execution resources in here // instead of in function "Open", because this "IndexLookUpExecutor" may be // constructed by a "IndexLookUpJoin" and "Open" will not be called in that @@ -393,20 +406,32 @@ func (e *IndexLookUpExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) return errors.Trace(err) } } + return nil +} +func (e *IndexLookUpExecutor) startWorkers(ctx context.Context, initBatchSize int) error { // indexWorker will write to workCh and tableWorker will read from workCh, // so fetching index and getting table data can run concurrently. workCh := make(chan *lookupTableTask, 1) - err = e.startIndexWorker(ctx, kvRanges, workCh) - if err != nil { + if err := e.startIndexWorker(ctx, e.kvRanges, workCh, initBatchSize); err != nil { return errors.Trace(err) } e.startTableWorker(ctx, workCh) + e.workerStarted = true return nil } // startIndexWorker launch a background goroutine to fetch handles, send the results to workCh. -func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []kv.KeyRange, workCh chan<- *lookupTableTask) error { +//<<<<<<< HEAD +//func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []kv.KeyRange, workCh chan<- *lookupTableTask) error { +//======= +func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []kv.KeyRange, workCh chan<- *lookupTableTask, initBatchSize int) error { + if e.runtimeStats != nil { + collExec := true + e.dagPB.CollectExecutionSummaries = &collExec + } + +//>>>>>>> 435a08140... executor: control Chunk size for TableReader&IndexReader&IndexLookup (#9452) var builder distsql.RequestBuilder kvReq, err := builder.SetKeyRanges(kvRanges). SetDAGRequest(e.dagPB). @@ -425,11 +450,12 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k } result.Fetch(ctx) worker := &indexWorker{ + idxLookup: e, workCh: workCh, finished: e.finished, resultCh: e.resultCh, keepOrder: e.keepOrder, - batchSize: e.maxChunkSize, + batchSize: initBatchSize, maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize, maxChunkSize: e.maxChunkSize, } @@ -503,7 +529,7 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []in // Close implements Exec Close interface. func (e *IndexLookUpExecutor) Close() error { - if e.finished == nil { + if !e.workerStarted || e.finished == nil { return nil } @@ -515,6 +541,7 @@ func (e *IndexLookUpExecutor) Close() error { e.idxWorkerWg.Wait() e.tblWorkerWg.Wait() e.finished = nil + e.workerStarted = false e.memTracker.Detach() e.memTracker = nil return nil @@ -526,7 +553,16 @@ func (e *IndexLookUpExecutor) Next(ctx context.Context, chk *chunk.Chunk) error start := time.Now() defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() } - chk.Reset() +//<<<<<<< HEAD +// chk.Reset() +//======= + if !e.workerStarted { + if err := e.startWorkers(ctx, req.RequiredRows()); err != nil { + return errors.Trace(err) + } + } + req.Reset() +//>>>>>>> 435a08140... executor: control Chunk size for TableReader&IndexReader&IndexLookup (#9452) for { resultTask, err := e.getResultTask() if err != nil { @@ -538,7 +574,11 @@ func (e *IndexLookUpExecutor) Next(ctx context.Context, chk *chunk.Chunk) error for resultTask.cursor < len(resultTask.rows) { chk.AppendRow(resultTask.rows[resultTask.cursor]) resultTask.cursor++ - if chk.NumRows() >= e.maxChunkSize { +//<<<<<<< HEAD +// if chk.NumRows() >= e.maxChunkSize { +//======= + if req.IsFull() { +//>>>>>>> 435a08140... executor: control Chunk size for TableReader&IndexReader&IndexLookup (#9452) return nil } } @@ -567,6 +607,7 @@ func (e *IndexLookUpExecutor) getResultTask() (*lookupTableTask, error) { // indexWorker is used by IndexLookUpExecutor to maintain index lookup background goroutines. type indexWorker struct { + idxLookup *IndexLookUpExecutor workCh chan<- *lookupTableTask finished <-chan struct{} resultCh chan<- *lookupTableTask @@ -599,7 +640,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes } } }() - chk := chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.maxChunkSize) + chk := chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.idxLookup.maxChunkSize) for { handles, err := w.extractTaskHandles(ctx, chk, result) if err != nil { @@ -628,6 +669,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) (handles []int64, err error) { handles = make([]int64, 0, w.batchSize) for len(handles) < w.batchSize { + chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize) err = errors.Trace(idxResult.Next(ctx, chk)) if err != nil { return handles, err diff --git a/executor/executor_required_rows_test.go b/executor/executor_required_rows_test.go index 7b51c6932a82e..b16ee95404c09 100644 --- a/executor/executor_required_rows_test.go +++ b/executor/executor_required_rows_test.go @@ -202,6 +202,7 @@ func defaultCtx() sessionctx.Context { ctx.GetSessionVars().MaxChunkSize = 1024 ctx.GetSessionVars().MemQuotaSort = variable.DefTiDBMemQuotaSort ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker("", ctx.GetSessionVars().MemQuotaQuery) + ctx.GetSessionVars().SnapshotTS = uint64(1) return ctx } diff --git a/executor/table_reader.go b/executor/table_reader.go index 9e3a94c9092dd..47740701e08d4 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -19,9 +19,12 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/distsql" + "github.com/pingcap/tidb/kv" plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/ranger" tipb "github.com/pingcap/tipb/go-tipb" @@ -31,6 +34,20 @@ import ( // make sure `TableReaderExecutor` implements `Executor`. var _ Executor = &TableReaderExecutor{} +// selectResultHook is used to hack distsql.SelectWithRuntimeStats safely for testing. +type selectResultHook struct { + selectResultFunc func(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, + fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []string) (distsql.SelectResult, error) +} + +func (sr selectResultHook) SelectResult(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, + fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []string) (distsql.SelectResult, error) { + if sr.selectResultFunc == nil { + return distsql.SelectWithRuntimeStats(ctx, sctx, kvReq, fieldTypes, fb, copPlanIDs) + } + return sr.selectResultFunc(ctx, sctx, kvReq, fieldTypes, fb, copPlanIDs) +} + // TableReaderExecutor sends DAG request and reads table data from kv layer. type TableReaderExecutor struct { baseExecutor @@ -55,6 +72,8 @@ type TableReaderExecutor struct { // corColInAccess tells whether there's correlated column in access conditions. corColInAccess bool plans []plannercore.PhysicalPlan + + selectResultHook // for testing } // Open initialzes necessary variables for using this executor. @@ -132,7 +151,11 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra if err != nil { return nil, errors.Trace(err) } - result, err := distsql.Select(ctx, e.ctx, kvReq, e.retTypes(), e.feedback) +//<<<<<<< HEAD +// result, err := distsql.Select(ctx, e.ctx, kvReq, e.retTypes(), e.feedback) +//======= + result, err := e.SelectResult(ctx, e.ctx, kvReq, e.retTypes(), e.feedback, getPhysicalPlanIDs(e.plans)) +//>>>>>>> 435a08140... executor: control Chunk size for TableReader&IndexReader&IndexLookup (#9452) if err != nil { return nil, errors.Trace(err) } diff --git a/executor/table_readers_required_rows_test.go b/executor/table_readers_required_rows_test.go new file mode 100644 index 0000000000000..e1fa48e019109 --- /dev/null +++ b/executor/table_readers_required_rows_test.go @@ -0,0 +1,241 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + "fmt" + "math/rand" + + "github.com/cznic/mathutil" + . "github.com/pingcap/check" + "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/distsql" + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tipb/go-tipb" +) + +type requiredRowsSelectResult struct { + retTypes []*types.FieldType + totalRows int + count int + expectedRowsRet []int + numNextCalled int +} + +func (r *requiredRowsSelectResult) Fetch(context.Context) {} +func (r *requiredRowsSelectResult) NextRaw(context.Context) ([]byte, error) { return nil, nil } +func (r *requiredRowsSelectResult) Close() error { return nil } + +func (r *requiredRowsSelectResult) Next(ctx context.Context, chk *chunk.Chunk) error { + defer func() { + if r.numNextCalled >= len(r.expectedRowsRet) { + return + } + rowsRet := chk.NumRows() + expected := r.expectedRowsRet[r.numNextCalled] + if rowsRet != expected { + panic(fmt.Sprintf("unexpected number of rows returned, obtain: %v, expected: %v", rowsRet, expected)) + } + r.numNextCalled++ + }() + chk.Reset() + if r.count > r.totalRows { + return nil + } + required := mathutil.Min(chk.RequiredRows(), r.totalRows-r.count) + for i := 0; i < required; i++ { + chk.AppendRow(r.genOneRow()) + } + r.count += required + return nil +} + +func (r *requiredRowsSelectResult) genOneRow() chunk.Row { + row := chunk.MutRowFromTypes(r.retTypes) + for i := range r.retTypes { + row.SetValue(i, r.genValue(r.retTypes[i])) + } + return row.ToRow() +} + +func (r *requiredRowsSelectResult) genValue(valType *types.FieldType) interface{} { + switch valType.Tp { + case mysql.TypeLong, mysql.TypeLonglong: + return int64(rand.Int()) + case mysql.TypeDouble: + return rand.Float64() + default: + panic("not implement") + } +} + +func mockDistsqlSelectCtxSet(totalRows int, expectedRowsRet []int) context.Context { + ctx := context.Background() + ctx = context.WithValue(ctx, "totalRows", totalRows) + ctx = context.WithValue(ctx, "expectedRowsRet", expectedRowsRet) + return ctx +} + +func mockDistsqlSelectCtxGet(ctx context.Context) (totalRows int, expectedRowsRet []int) { + totalRows = ctx.Value("totalRows").(int) + expectedRowsRet = ctx.Value("expectedRowsRet").([]int) + return +} + +func mockSelectResult(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, + fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []string) (distsql.SelectResult, error) { + totalRows, expectedRowsRet := mockDistsqlSelectCtxGet(ctx) + return &requiredRowsSelectResult{ + retTypes: fieldTypes, + totalRows: totalRows, + expectedRowsRet: expectedRowsRet, + }, nil +} + +func mockSelectResultWithoutCheck(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, + fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []string) (distsql.SelectResult, error) { + return &requiredRowsSelectResult{retTypes: fieldTypes}, nil +} + +func buildTableReader(sctx sessionctx.Context) Executor { + e := &TableReaderExecutor{ + baseExecutor: buildMockBaseExec(sctx), + table: &tables.Table{}, + dagPB: buildMockDAGRequest(sctx), + selectResultHook: selectResultHook{mockSelectResult}, + } + return e +} + +func buildMockDAGRequest(sctx sessionctx.Context) *tipb.DAGRequest { + builder := newExecutorBuilder(sctx, nil) + req, _, err := builder.constructDAGReq(nil) + if err != nil { + panic(err) + } + return req +} + +func buildMockBaseExec(sctx sessionctx.Context) baseExecutor { + retTypes := []*types.FieldType{types.NewFieldType(mysql.TypeDouble), types.NewFieldType(mysql.TypeLonglong)} + cols := make([]*expression.Column, len(retTypes)) + for i := range retTypes { + cols[i] = &expression.Column{Index: i, RetType: retTypes[i]} + } + schema := expression.NewSchema(cols...) + baseExec := newBaseExecutor(sctx, schema, "") + return baseExec +} + +func (s *testExecSuite) TestTableReaderRequiredRows(c *C) { + maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize + testCases := []struct { + totalRows int + requiredRows []int + expectedRows []int + expectedRowsDS []int + }{ + { + totalRows: 10, + requiredRows: []int{1, 5, 3, 10}, + expectedRows: []int{1, 5, 3, 1}, + expectedRowsDS: []int{1, 5, 3, 1}, + }, + { + totalRows: maxChunkSize + 1, + requiredRows: []int{1, 5, 3, 10, maxChunkSize}, + expectedRows: []int{1, 5, 3, 10, (maxChunkSize + 1) - 1 - 5 - 3 - 10}, + expectedRowsDS: []int{1, 5, 3, 10, (maxChunkSize + 1) - 1 - 5 - 3 - 10}, + }, + { + totalRows: 3*maxChunkSize + 1, + requiredRows: []int{3, 10, maxChunkSize}, + expectedRows: []int{3, 10, maxChunkSize}, + expectedRowsDS: []int{3, 10, maxChunkSize}, + }, + } + for _, testCase := range testCases { + sctx := defaultCtx() + ctx := mockDistsqlSelectCtxSet(testCase.totalRows, testCase.expectedRowsDS) + exec := buildTableReader(sctx) + c.Assert(exec.Open(ctx), IsNil) + chk := exec.newFirstChunk() + for i := range testCase.requiredRows { + chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) + c.Assert(exec.Next(ctx, chunk.NewRecordBatch(chk)), IsNil) + c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i]) + } + c.Assert(exec.Close(), IsNil) + } +} + +func buildIndexReader(sctx sessionctx.Context) Executor { + e := &IndexReaderExecutor{ + baseExecutor: buildMockBaseExec(sctx), + dagPB: buildMockDAGRequest(sctx), + index: &model.IndexInfo{}, + selectResultHook: selectResultHook{mockSelectResult}, + } + return e +} + +func (s *testExecSuite) TestIndexReaderRequiredRows(c *C) { + maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize + testCases := []struct { + totalRows int + requiredRows []int + expectedRows []int + expectedRowsDS []int + }{ + { + totalRows: 10, + requiredRows: []int{1, 5, 3, 10}, + expectedRows: []int{1, 5, 3, 1}, + expectedRowsDS: []int{1, 5, 3, 1}, + }, + { + totalRows: maxChunkSize + 1, + requiredRows: []int{1, 5, 3, 10, maxChunkSize}, + expectedRows: []int{1, 5, 3, 10, (maxChunkSize + 1) - 1 - 5 - 3 - 10}, + expectedRowsDS: []int{1, 5, 3, 10, (maxChunkSize + 1) - 1 - 5 - 3 - 10}, + }, + { + totalRows: 3*maxChunkSize + 1, + requiredRows: []int{3, 10, maxChunkSize}, + expectedRows: []int{3, 10, maxChunkSize}, + expectedRowsDS: []int{3, 10, maxChunkSize}, + }, + } + for _, testCase := range testCases { + sctx := defaultCtx() + ctx := mockDistsqlSelectCtxSet(testCase.totalRows, testCase.expectedRowsDS) + exec := buildIndexReader(sctx) + c.Assert(exec.Open(ctx), IsNil) + chk := exec.newFirstChunk() + for i := range testCase.requiredRows { + chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) + c.Assert(exec.Next(ctx, chunk.NewRecordBatch(chk)), IsNil) + c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i]) + } + c.Assert(exec.Close(), IsNil) + } +} From f05d85e68c5d2d95260458e8495f14e2846c1f0a Mon Sep 17 00:00:00 2001 From: qw4990 Date: Tue, 16 Apr 2019 15:57:06 +0800 Subject: [PATCH 2/4] fix --- executor/builder.go | 31 ++----------------------------- executor/distsql.go | 31 +++++-------------------------- executor/table_reader.go | 14 +++++--------- 3 files changed, 12 insertions(+), 64 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index c17fe6ac56c2c..e11e0e8fc2dda 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1868,25 +1868,6 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, return nil, errors.New("Wrong plan type for dataReaderBuilder") } -//<<<<<<< HEAD -//======= -func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context, v *plannercore.PhysicalUnionScan, - values [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) { - childBuilder := &dataReaderBuilder{Plan: v.Children()[0], executorBuilder: builder.executorBuilder} - reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff) - if err != nil { - return nil, err - } - e, err := builder.buildUnionScanFromReader(reader, v) - if err != nil { - return nil, err - } - us := e.(*UnionScanExec) - us.snapshotChunkBuffer = us.newFirstChunk() - return us, nil -} - -//>>>>>>> 435a08140... executor: control Chunk size for TableReader&IndexReader&IndexLookup (#9452) func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalTableReader, datums [][]types.Datum) (Executor, error) { e, err := buildNoRangeTableReader(builder.executorBuilder, v) if err != nil { @@ -1913,11 +1894,7 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex return nil, errors.Trace(err) } e.resultHandler = &tableResultHandler{} -//<<<<<<< HEAD -// result, err := distsql.Select(ctx, builder.ctx, kvReq, e.retTypes(), e.feedback) -//======= - result, err := builder.SelectResult(ctx, builder.ctx, kvReq, e.retTypes(), e.feedback, getPhysicalPlanIDs(e.plans)) -//>>>>>>> 435a08140... executor: control Chunk size for TableReader&IndexReader&IndexLookup (#9452) + result, err := builder.SelectResult(ctx, builder.ctx, kvReq, e.retTypes(), e.feedback) if err != nil { return nil, errors.Trace(err) } @@ -1946,11 +1923,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context if err != nil { return nil, errors.Trace(err) } -//<<<<<<< HEAD -// kvRanges, err := buildKvRangesForIndexJoin(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, values, indexRanges, keyOff2IdxOff) -//======= - e.kvRanges, err = buildKvRangesForIndexJoin(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableIDs(e.table), e.index.ID, values, indexRanges, keyOff2IdxOff) -//>>>>>>> 435a08140... executor: control Chunk size for TableReader&IndexReader&IndexLookup (#9452) + e.kvRanges, err = buildKvRangesForIndexJoin(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, values, indexRanges, keyOff2IdxOff) if err != nil { return nil, errors.Trace(err) } diff --git a/executor/distsql.go b/executor/distsql.go index d6935fcc5770b..81a8690a2ce71 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -296,11 +296,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) e.feedback.Invalidate() return errors.Trace(err) } -//<<<<<<< HEAD -// e.result, err = distsql.Select(ctx, e.ctx, kvReq, e.retTypes(), e.feedback) -//======= - e.result, err = e.SelectResult(ctx, e.ctx, kvReq, e.retTypes(), e.feedback, getPhysicalPlanIDs(e.plans)) -//>>>>>>> 435a08140... executor: control Chunk size for TableReader&IndexReader&IndexLookup (#9452) + e.result, err = e.SelectResult(ctx, e.ctx, kvReq, e.retTypes(), e.feedback) if err != nil { e.feedback.Invalidate() return errors.Trace(err) @@ -365,11 +361,7 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error { return errors.Trace(err) } } -//<<<<<<< HEAD -// kvRanges, err := distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, e.ranges, e.feedback) -//======= - e.kvRanges, err = distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.index.ID, e.ranges, e.feedback) -//>>>>>>> 435a08140... executor: control Chunk size for TableReader&IndexReader&IndexLookup (#9452) + e.kvRanges, err = distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, e.ranges, e.feedback) if err != nil { e.feedback.Invalidate() return errors.Trace(err) @@ -421,17 +413,12 @@ func (e *IndexLookUpExecutor) startWorkers(ctx context.Context, initBatchSize in return nil } -// startIndexWorker launch a background goroutine to fetch handles, send the results to workCh. -//<<<<<<< HEAD -//func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []kv.KeyRange, workCh chan<- *lookupTableTask) error { -//======= func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []kv.KeyRange, workCh chan<- *lookupTableTask, initBatchSize int) error { if e.runtimeStats != nil { collExec := true e.dagPB.CollectExecutionSummaries = &collExec } -//>>>>>>> 435a08140... executor: control Chunk size for TableReader&IndexReader&IndexLookup (#9452) var builder distsql.RequestBuilder kvReq, err := builder.SetKeyRanges(kvRanges). SetDAGRequest(e.dagPB). @@ -553,16 +540,12 @@ func (e *IndexLookUpExecutor) Next(ctx context.Context, chk *chunk.Chunk) error start := time.Now() defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() } -//<<<<<<< HEAD -// chk.Reset() -//======= if !e.workerStarted { - if err := e.startWorkers(ctx, req.RequiredRows()); err != nil { + if err := e.startWorkers(ctx, chk.RequiredRows()); err != nil { return errors.Trace(err) } } - req.Reset() -//>>>>>>> 435a08140... executor: control Chunk size for TableReader&IndexReader&IndexLookup (#9452) + chk.Reset() for { resultTask, err := e.getResultTask() if err != nil { @@ -574,11 +557,7 @@ func (e *IndexLookUpExecutor) Next(ctx context.Context, chk *chunk.Chunk) error for resultTask.cursor < len(resultTask.rows) { chk.AppendRow(resultTask.rows[resultTask.cursor]) resultTask.cursor++ -//<<<<<<< HEAD -// if chk.NumRows() >= e.maxChunkSize { -//======= - if req.IsFull() { -//>>>>>>> 435a08140... executor: control Chunk size for TableReader&IndexReader&IndexLookup (#9452) + if chk.IsFull() { return nil } } diff --git a/executor/table_reader.go b/executor/table_reader.go index 47740701e08d4..45aeef858860d 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -37,15 +37,15 @@ var _ Executor = &TableReaderExecutor{} // selectResultHook is used to hack distsql.SelectWithRuntimeStats safely for testing. type selectResultHook struct { selectResultFunc func(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, - fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []string) (distsql.SelectResult, error) + fieldTypes []*types.FieldType, fb *statistics.QueryFeedback) (distsql.SelectResult, error) } func (sr selectResultHook) SelectResult(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, - fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []string) (distsql.SelectResult, error) { + fieldTypes []*types.FieldType, fb *statistics.QueryFeedback) (distsql.SelectResult, error) { if sr.selectResultFunc == nil { - return distsql.SelectWithRuntimeStats(ctx, sctx, kvReq, fieldTypes, fb, copPlanIDs) + return distsql.Select(ctx, sctx, kvReq, fieldTypes, fb) } - return sr.selectResultFunc(ctx, sctx, kvReq, fieldTypes, fb, copPlanIDs) + return sr.selectResultFunc(ctx, sctx, kvReq, fieldTypes, fb) } // TableReaderExecutor sends DAG request and reads table data from kv layer. @@ -151,11 +151,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra if err != nil { return nil, errors.Trace(err) } -//<<<<<<< HEAD -// result, err := distsql.Select(ctx, e.ctx, kvReq, e.retTypes(), e.feedback) -//======= - result, err := e.SelectResult(ctx, e.ctx, kvReq, e.retTypes(), e.feedback, getPhysicalPlanIDs(e.plans)) -//>>>>>>> 435a08140... executor: control Chunk size for TableReader&IndexReader&IndexLookup (#9452) + result, err := e.SelectResult(ctx, e.ctx, kvReq, e.retTypes(), e.feedback) if err != nil { return nil, errors.Trace(err) } From 205d62b5cb0914b005120630c29378e7f5ed82a5 Mon Sep 17 00:00:00 2001 From: qw4990 Date: Tue, 16 Apr 2019 16:04:45 +0800 Subject: [PATCH 3/4] fix --- executor/distsql.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/executor/distsql.go b/executor/distsql.go index 81a8690a2ce71..3fd656e38c13d 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -414,11 +414,6 @@ func (e *IndexLookUpExecutor) startWorkers(ctx context.Context, initBatchSize in } func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []kv.KeyRange, workCh chan<- *lookupTableTask, initBatchSize int) error { - if e.runtimeStats != nil { - collExec := true - e.dagPB.CollectExecutionSummaries = &collExec - } - var builder distsql.RequestBuilder kvReq, err := builder.SetKeyRanges(kvRanges). SetDAGRequest(e.dagPB). From 27cf874d88189bf7cde9ae13c21c9cc38747db3f Mon Sep 17 00:00:00 2001 From: qw4990 Date: Tue, 16 Apr 2019 16:11:39 +0800 Subject: [PATCH 4/4] fix CI --- executor/table_readers_required_rows_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/executor/table_readers_required_rows_test.go b/executor/table_readers_required_rows_test.go index e1fa48e019109..e1fe396056d77 100644 --- a/executor/table_readers_required_rows_test.go +++ b/executor/table_readers_required_rows_test.go @@ -102,7 +102,7 @@ func mockDistsqlSelectCtxGet(ctx context.Context) (totalRows int, expectedRowsRe } func mockSelectResult(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, - fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []string) (distsql.SelectResult, error) { + fieldTypes []*types.FieldType, fb *statistics.QueryFeedback) (distsql.SelectResult, error) { totalRows, expectedRowsRet := mockDistsqlSelectCtxGet(ctx) return &requiredRowsSelectResult{ retTypes: fieldTypes, @@ -112,7 +112,7 @@ func mockSelectResult(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Re } func mockSelectResultWithoutCheck(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, - fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []string) (distsql.SelectResult, error) { + fieldTypes []*types.FieldType, fb *statistics.QueryFeedback) (distsql.SelectResult, error) { return &requiredRowsSelectResult{retTypes: fieldTypes}, nil } @@ -181,7 +181,7 @@ func (s *testExecSuite) TestTableReaderRequiredRows(c *C) { chk := exec.newFirstChunk() for i := range testCase.requiredRows { chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) - c.Assert(exec.Next(ctx, chunk.NewRecordBatch(chk)), IsNil) + c.Assert(exec.Next(ctx, chk), IsNil) c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i]) } c.Assert(exec.Close(), IsNil) @@ -233,7 +233,7 @@ func (s *testExecSuite) TestIndexReaderRequiredRows(c *C) { chk := exec.newFirstChunk() for i := range testCase.requiredRows { chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) - c.Assert(exec.Next(ctx, chunk.NewRecordBatch(chk)), IsNil) + c.Assert(exec.Next(ctx, chk), IsNil) c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i]) } c.Assert(exec.Close(), IsNil)