Skip to content

Commit

Permalink
executor: make sure hashjoin's goroutine exit before Close return (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and crazycs520 committed Nov 22, 2018
1 parent 6bed56d commit 37d54e3
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type HashJoinExec struct {
innerFinished chan error
hashJoinBuffers []*hashJoinBuffer
workerWaitGroup sync.WaitGroup // workerWaitGroup is for sync multiple join workers.
fetchInnerDone sync.WaitGroup
finished atomic.Value
closeCh chan struct{} // closeCh add a lock for closing executor.
joinType plannercore.JoinType
Expand Down Expand Up @@ -134,6 +135,7 @@ func (e *HashJoinExec) Close() error {
e.outerChkResourceCh = nil
e.joinChkResourceCh = nil
}
e.fetchInnerDone.Wait()
e.memTracker.Detach()
e.memTracker = nil

Expand Down Expand Up @@ -260,7 +262,10 @@ func (e *HashJoinExec) wait4Inner() (finished bool, err error) {
// fetchInnerRows fetches all rows from inner executor,
// and append them to e.innerResult.
func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh chan struct{}) {
defer close(chkCh)
defer func() {
close(chkCh)
e.fetchInnerDone.Done()
}()
e.innerResult = chunk.NewList(e.innerExec.retTypes(), e.initCap, e.maxChunkSize)
e.innerResult.GetMemTracker().AttachTo(e.memTracker)
e.innerResult.GetMemTracker().SetLabel("innerResult")
Expand Down Expand Up @@ -557,6 +562,7 @@ func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) {
// innerResultCh transfer inner result chunk from inner fetch to build hash table.
innerResultCh := make(chan *chunk.Chunk, e.concurrency)
doneCh := make(chan struct{})
e.fetchInnerDone.Add(1)
go util.WithRecovery(func() { e.fetchInnerRows(ctx, innerResultCh, doneCh) }, nil)

// TODO: Parallel build hash table. Currently not support because `mvmap` is not thread-safe.
Expand Down

0 comments on commit 37d54e3

Please sign in to comment.