From e8f8d7b612ae245467433529473dad4ad4c518c6 Mon Sep 17 00:00:00 2001 From: lysu Date: Fri, 16 Nov 2018 15:23:14 +0800 Subject: [PATCH 1/5] executor: make sure hashjoin's fetch&build-inner goroutine return before close return --- executor/join.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/executor/join.go b/executor/join.go index 241abb0cc0c9c..5bc8983335ff8 100644 --- a/executor/join.go +++ b/executor/join.go @@ -553,6 +553,9 @@ func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) { go util.WithRecovery(func() { e.fetchInnerRows(ctx, innerResultCh, doneCh) }, nil) if e.finished.Load().(bool) { + // wait fetchInnerRows goroutine exit. + for range innerResultCh { + } return } // TODO: Parallel build hash table. Currently not support because `mvmap` is not thread-safe. @@ -587,6 +590,9 @@ func (e *HashJoinExec) buildHashTableForList(innerResultCh chan *chunk.Chunk) er chkIdx := uint32(0) for chk := range innerResultCh { if e.finished.Load().(bool) { + // wait fetchInnerRows goroutine exit. + for range innerResultCh { + } return nil } numRows := chk.NumRows() From 360317fb4b8e3be660c2401af9e85248af57c96c Mon Sep 17 00:00:00 2001 From: lysu Date: Mon, 19 Nov 2018 14:24:43 +0800 Subject: [PATCH 2/5] address comments, use waitgroup --- executor/join.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/executor/join.go b/executor/join.go index 5bc8983335ff8..36c7f208216f6 100644 --- a/executor/join.go +++ b/executor/join.go @@ -259,8 +259,11 @@ func (e *HashJoinExec) wait4Inner() (finished bool, err error) { // fetchInnerRows fetches all rows from inner executor, // and append them to e.innerResult. -func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh chan struct{}) { - defer close(chkCh) +func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh chan struct{}, fetchInnerDone *sync.WaitGroup) { + defer func() { + close(chkCh) + fetchInnerDone.Done() + }() e.innerResult = chunk.NewList(e.innerExec.retTypes(), e.initCap, e.maxChunkSize) e.innerResult.GetMemTracker().AttachTo(e.memTracker) e.innerResult.GetMemTracker().SetLabel("innerResult") @@ -550,16 +553,17 @@ func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) { // innerResultCh transfer inner result chunk from inner fetch to build hash table. innerResultCh := make(chan *chunk.Chunk, e.concurrency) doneCh := make(chan struct{}) - go util.WithRecovery(func() { e.fetchInnerRows(ctx, innerResultCh, doneCh) }, nil) + var fetchInnerDone sync.WaitGroup + fetchInnerDone.Add(1) + go util.WithRecovery(func() { e.fetchInnerRows(ctx, innerResultCh, doneCh, &fetchInnerDone) }, nil) if e.finished.Load().(bool) { // wait fetchInnerRows goroutine exit. - for range innerResultCh { - } + fetchInnerDone.Wait() return } // TODO: Parallel build hash table. Currently not support because `mvmap` is not thread-safe. - err := e.buildHashTableForList(innerResultCh) + err := e.buildHashTableForList(innerResultCh, &fetchInnerDone) if err != nil { e.innerFinished <- errors.Trace(err) close(doneCh) @@ -574,7 +578,7 @@ func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) { // buildHashTableForList builds hash table from `list`. // key of hash table: hash value of key columns // value of hash table: RowPtr of the corresponded row -func (e *HashJoinExec) buildHashTableForList(innerResultCh chan *chunk.Chunk) error { +func (e *HashJoinExec) buildHashTableForList(innerResultCh chan *chunk.Chunk, fetchInnerDone *sync.WaitGroup) error { e.hashTable = mvmap.NewMVMap() e.innerKeyColIdx = make([]int, len(e.innerKeys)) for i := range e.innerKeys { @@ -591,8 +595,7 @@ func (e *HashJoinExec) buildHashTableForList(innerResultCh chan *chunk.Chunk) er for chk := range innerResultCh { if e.finished.Load().(bool) { // wait fetchInnerRows goroutine exit. - for range innerResultCh { - } + fetchInnerDone.Wait() return nil } numRows := chk.NumRows() From 2f8d3963776d2ef66da1959717acc8296858a0d8 Mon Sep 17 00:00:00 2001 From: lysu Date: Mon, 19 Nov 2018 20:02:45 +0800 Subject: [PATCH 3/5] address comments --- executor/join.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/executor/join.go b/executor/join.go index 36c7f208216f6..991721cfcc72b 100644 --- a/executor/join.go +++ b/executor/join.go @@ -259,11 +259,7 @@ func (e *HashJoinExec) wait4Inner() (finished bool, err error) { // fetchInnerRows fetches all rows from inner executor, // and append them to e.innerResult. -func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh chan struct{}, fetchInnerDone *sync.WaitGroup) { - defer func() { - close(chkCh) - fetchInnerDone.Done() - }() +func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh chan struct{}) { e.innerResult = chunk.NewList(e.innerExec.retTypes(), e.initCap, e.maxChunkSize) e.innerResult.GetMemTracker().AttachTo(e.memTracker) e.innerResult.GetMemTracker().SetLabel("innerResult") @@ -292,6 +288,11 @@ func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.C } } +func (e *HashJoinExec) finishFetchInnerRows(chkCh chan<- *chunk.Chunk, fetchInnerDone *sync.WaitGroup) { + close(chkCh) + fetchInnerDone.Done() +} + // evalRadixBitNum evaluates the radix bit numbers. func (e *HashJoinExec) evalRadixBitNum() { sv := e.ctx.GetSessionVars() @@ -555,7 +556,10 @@ func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) { doneCh := make(chan struct{}) var fetchInnerDone sync.WaitGroup fetchInnerDone.Add(1) - go util.WithRecovery(func() { e.fetchInnerRows(ctx, innerResultCh, doneCh, &fetchInnerDone) }, nil) + go util.WithRecovery( + func() { e.fetchInnerRows(ctx, innerResultCh, doneCh) }, + func(r interface{}) { e.finishFetchInnerRows(innerResultCh, &fetchInnerDone) }, + ) if e.finished.Load().(bool) { // wait fetchInnerRows goroutine exit. From 2c8120f664493d3c34a7e09a93d5d80d32885194 Mon Sep 17 00:00:00 2001 From: lysu Date: Mon, 19 Nov 2018 20:22:42 +0800 Subject: [PATCH 4/5] address comments --- executor/join.go | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/executor/join.go b/executor/join.go index 991721cfcc72b..fe6a7ce8d8de8 100644 --- a/executor/join.go +++ b/executor/join.go @@ -54,6 +54,7 @@ type HashJoinExec struct { innerFinished chan error hashJoinBuffers []*hashJoinBuffer workerWaitGroup sync.WaitGroup // workerWaitGroup is for sync multiple join workers. + fetchInnerDone sync.WaitGroup finished atomic.Value closeCh chan struct{} // closeCh add a lock for closing executor. joinType plannercore.JoinType @@ -134,6 +135,7 @@ func (e *HashJoinExec) Close() error { e.outerChkResourceCh = nil e.joinChkResourceCh = nil } + e.fetchInnerDone.Wait() e.memTracker.Detach() e.memTracker = nil @@ -288,9 +290,9 @@ func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.C } } -func (e *HashJoinExec) finishFetchInnerRows(chkCh chan<- *chunk.Chunk, fetchInnerDone *sync.WaitGroup) { +func (e *HashJoinExec) finishFetchInnerRows(chkCh chan<- *chunk.Chunk) { close(chkCh) - fetchInnerDone.Done() + e.fetchInnerDone.Done() } // evalRadixBitNum evaluates the radix bit numbers. @@ -554,20 +556,17 @@ func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) { // innerResultCh transfer inner result chunk from inner fetch to build hash table. innerResultCh := make(chan *chunk.Chunk, e.concurrency) doneCh := make(chan struct{}) - var fetchInnerDone sync.WaitGroup - fetchInnerDone.Add(1) + e.fetchInnerDone.Add(1) go util.WithRecovery( func() { e.fetchInnerRows(ctx, innerResultCh, doneCh) }, - func(r interface{}) { e.finishFetchInnerRows(innerResultCh, &fetchInnerDone) }, + func(r interface{}) { e.finishFetchInnerRows(innerResultCh) }, ) if e.finished.Load().(bool) { - // wait fetchInnerRows goroutine exit. - fetchInnerDone.Wait() return } // TODO: Parallel build hash table. Currently not support because `mvmap` is not thread-safe. - err := e.buildHashTableForList(innerResultCh, &fetchInnerDone) + err := e.buildHashTableForList(innerResultCh) if err != nil { e.innerFinished <- errors.Trace(err) close(doneCh) @@ -582,7 +581,7 @@ func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) { // buildHashTableForList builds hash table from `list`. // key of hash table: hash value of key columns // value of hash table: RowPtr of the corresponded row -func (e *HashJoinExec) buildHashTableForList(innerResultCh chan *chunk.Chunk, fetchInnerDone *sync.WaitGroup) error { +func (e *HashJoinExec) buildHashTableForList(innerResultCh chan *chunk.Chunk) error { e.hashTable = mvmap.NewMVMap() e.innerKeyColIdx = make([]int, len(e.innerKeys)) for i := range e.innerKeys { @@ -598,8 +597,6 @@ func (e *HashJoinExec) buildHashTableForList(innerResultCh chan *chunk.Chunk, fe chkIdx := uint32(0) for chk := range innerResultCh { if e.finished.Load().(bool) { - // wait fetchInnerRows goroutine exit. - fetchInnerDone.Wait() return nil } numRows := chk.NumRows() From 42eafaeae29befd49e6f20b47881e8ebc5270921 Mon Sep 17 00:00:00 2001 From: lysu Date: Tue, 20 Nov 2018 11:33:06 +0800 Subject: [PATCH 5/5] address comment --- executor/join.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/executor/join.go b/executor/join.go index fe6a7ce8d8de8..770f666f1e0a9 100644 --- a/executor/join.go +++ b/executor/join.go @@ -262,6 +262,10 @@ func (e *HashJoinExec) wait4Inner() (finished bool, err error) { // fetchInnerRows fetches all rows from inner executor, // and append them to e.innerResult. func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh chan struct{}) { + defer func() { + close(chkCh) + e.fetchInnerDone.Done() + }() e.innerResult = chunk.NewList(e.innerExec.retTypes(), e.initCap, e.maxChunkSize) e.innerResult.GetMemTracker().AttachTo(e.memTracker) e.innerResult.GetMemTracker().SetLabel("innerResult") @@ -290,11 +294,6 @@ func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.C } } -func (e *HashJoinExec) finishFetchInnerRows(chkCh chan<- *chunk.Chunk) { - close(chkCh) - e.fetchInnerDone.Done() -} - // evalRadixBitNum evaluates the radix bit numbers. func (e *HashJoinExec) evalRadixBitNum() { sv := e.ctx.GetSessionVars() @@ -557,10 +556,7 @@ func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) { innerResultCh := make(chan *chunk.Chunk, e.concurrency) doneCh := make(chan struct{}) e.fetchInnerDone.Add(1) - go util.WithRecovery( - func() { e.fetchInnerRows(ctx, innerResultCh, doneCh) }, - func(r interface{}) { e.finishFetchInnerRows(innerResultCh) }, - ) + go util.WithRecovery(func() { e.fetchInnerRows(ctx, innerResultCh, doneCh) }, nil) if e.finished.Load().(bool) { return