Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

预防并发搞垮友军的几个方法 #63

Open
kevinyan815 opened this issue Jun 28, 2021 · 0 comments
Open

预防并发搞垮友军的几个方法 #63

kevinyan815 opened this issue Jun 28, 2021 · 0 comments

Comments

@kevinyan815
Copy link
Owner

kevinyan815 commented Jun 28, 2021

巧用WaitGroup

因为go对并发的原生支持使得并发编程难度大大降低,刚学会Go语言的人特别喜欢在开发的时候尝试并发,其实并发并不是解决所有问题的银弹,反而是一味想尝试并发造成了不少线上BUG/事故。比如说,有的人会误以为起几十个线程休眠一下,再接着起线程就能控制并发数了,其实不是,比如像下面这么写

func badConcurrency() {
	batchSize := 50
	for {
		data, _ := queryDataWithSizeN(batchSize)
		if len(data) == 0 {
			break
		}

		for _, item := range data {
			go func(i int) {
				doSomething(i)
			}(item)
		}

		time.Sleep(time.Second * 1)
	}
}

对于调用者来说,看起来确实是控制了一秒中只发出去了50个请求,我们不能只从调用者的角度考虑事情,如果恰巧赶上对方正忙在,你程序休眠的时候下游服务并没有处理完你发过去的这批请求,这时你再发一批过去,累计下来无疑是对对方的服务器雪上加霜。最好的是等到上一批并发都返回了再去开启下一批,这个可以通过WaitGroup实现。

func useWaitGroup() {

	batchSize := 50
	for {
		data, _ := queryDataWithSizeN(batchSize)
		if len(data) == 0 {
			fmt.Println("End of all data")
			break
		}
		var wg sync.WaitGroup
		for _, item := range data {
			wg.Add(1)
			go func(i int) {
				doSomething(i)
				wg.Done()
			}(item)
		}
		wg.Wait()

		fmt.Println("Next bunch of data")
	}
}

巧用Semaphore

上面是一批处理完再开启下一批并发,也可以一个处理完后下一个补上,但同时发起的请求书最多不超过N个的限制,这个可以通过信号量实现。

func useSemaphore() {
	var concurrentNum int64 = 10
	var weight int64 = 1
	var batchSize int = 50
	s := semaphore.NewWeighted(concurrentNum)
	for {
		data, _ := queryDataWithSizeN(batchSize)
		if len(data) == 0 {
			fmt.Println("End of all data")
			break
		}

		for _, item := range data {
                        s.Acquire(context.Background(), weight)
			go func(i int) {
				doSomething(i)
				s.Release(weight)
			}(item)
		}

	}
}

使用限速器

再有就是使用限速器了,这个不像上面两个可以等到请求返回再开启下一个/一批,而是实实在在的限流,可以通过官方库提供的 time/rate 限流器实现。

func useRateLimit() {
	limiter := rate.NewLimiter(rate.Every(1*time.Second), 50)
	batchSize := 50
	for {
		data, _ :=queryDataWithSizeN(batchSize)
		if len(data) == 0 {
			fmt.Println("End of all data")
			break
		}

		for _, item := range data {
			// blocking until the bucket have sufficient token
			err := limiter.Wait(context.Background())
			if err != nil {
				fmt.Println("Error: ", err)
				return
			}
			go func(i int) {
				doSomething(i)
			}(item)
		}
	}
}

使用生产者消费者模式

利用channel实现一个生产者消费者队列模式,生产者从库里捞数据发送到通道,消费者通过通道接收数据做处理。

func useChannel() {
	batchSize := 50
	dataChan := make(chan int)
	var wg sync.WaitGroup
	wg.Add(batchSize + 1)
	// 生产者
	go func() {
		for {
			data, _ := queryDataWithSizeN(batchSize)
			if len(data) == 0 {
				break
			}
			for _, item := range data {
				dataChan <- item
			}
		}
		close(dataChan)
		wg.Done()
	}()
    // 消费者
	go func() {
		for i := 0; i < 50; i++ {
			go func() {
				for {
					select {
					case v, ok := <- dataChan:
						if !ok {
							wg.Done()
							return
						}
						doSomething(v)
					}
				}
			}()
		}
	}()

	wg.Wait()
}

完整代码参考:https://github.com/kevinyan815/gocookbook/blob/master/codes/prevent_over_concurrency/main.go

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant