Go强制关闭goroutine实践

问题排查

起因是线上的一个业务被反馈有bug,同一个资源被扣费了两次,回忆了下那个业务,前人写的时候直接用redis加了个粒度非常大的互斥锁,如果锁状态正常的话不应该出现数据不一致的情况,遂把关注点放在了redis加的锁上,经过一番review,发现加的锁设定了一个30秒的timeout,同时当请求无法获得锁的时候,会不断轮训拿锁直到成功为止。看到这大概能猜出来这个bug是怎么产生的了,又找到了具体的问题数据,发现订单逻辑中创建时间正好相差30秒,bingo。

简单来说当用户要下载某个付费的资源发起请求时,该资源不一定在我们的服务器上,需要先下载到我们机器上再返回给用户,这中间不可控因素太多了,网络的抖动,对方服务的质量,如果30秒内资源还没下载到我们服务器上(即后续记录已对该资源付费的逻辑尚未执行),redis超时自动解锁,在这期间用户如果再次发起请求,那第二个请求此时发现还没有这个资源的扣费记录,就会又执行一次原本在加锁段中只执行一次的扣费逻辑,bug成功复现。

解决方案

扣费逻辑

解决思路无非就是保证上图篮筐中的三个流程需要保持加锁的状态,即要么确保锁必须在这以后释放,要么锁超时释放的时候终止流程回滚,进一步的,第一个方案确保不释放只能放弃timeout,这显然是不可取的。因此决定在代码中加入锁超时回滚的控制。

原本的逻辑因为涉及大资源的下载,故通过goroutie异步下载,每多一个下载任务,WaitGroup.add(1),下载完成WaitGroup.done(),最后在主线程WaitGroup.wait,确保所有资源都到位了再进行之后的逻辑,所以超时实现需要通知子线程(其实是协程)return

贴上一篇写得很精辟的关于几种退出方式的总结文章:goroutine退出方式的总结

下文通过context进行实现:

func DownloadGoruntine(ctx, rpcCtx context.Context, timerCtx context.Context, wc *WaitChan,...) {
beginDownloadChan := make(chan struct{}, 1)
beginDownloadChan <- struct{}{} //避免多次下次
for {
select {
// 超时退出,避免因下载过慢导致redis互斥锁已经解锁而产生数据不一致
case <-timerCtx.Done():
wlerr := constant.NewWlErrorf(...)
gincommon.SetWlError(ctx, wlerr)
return
case <-beginDownloadChan:
go DownloadImage(ctx, rpcCtx, wc, ...)
return
default:
time.Sleep(1 * time.Second)
}
}
}

在每个gotoutine中,控制一个无限循环,每隔一秒读取一次定时器是否已到时间,当timerCtx.Done()后,报错并返回,这样由该线程创建的所有子routine都会强制退出(纠正:只有在main函数退出时对应的所有routine才会结束,此处不是main函数退出只是一个请求的线程退出,所以不会导致子routine退出,正确的退出方法✅应该是通过channel通知退出,具体见goroutine退出方式的总结

timerCtx通过主线程控制,统一传入:

//设置图片下载的超时时间为redis的timeout前5秒,保证在处理后续逻辑时不会超时
timerCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond*(REDIS_MUTEX_TIMEOUT-5000))
defer cancel()
go DownloadImageGoruntine(ctx, rpcCtx, timerCtx, wc, ...)

优化后的实现

嗯,第一版的思想其实没什么问题(第一版这一类的轮询非阻塞管道的做法,很容易导致CPU被拉满),但写法有很多优化的地方,贴上导师改完的代码(膜拜下):

首先项目里已经有一个ctx的传递,没必要再额外创建一个,直接对已有的ctx设置即可:

对ctx的理解推荐这一篇:Go并发模式:context

ctx, cancel := context.WithTimeout(ctx, time.Millisecond*REDIS_MUTEX_TIMEOUT)
defer cancel()
go DownloadImageGoruntine(ctx, rpcCtx, wc, ...)//直接复用了ctx,少一个参数

关于此处是否需要打提前量,我觉得其实应该有,虽然意义不是特别大,如果不加的话可能会在一个场景下出现订单重复扣费的bug:即图片刚下载完,后续创建订单的逻辑还未生效,另一个请求就又进来了,虽然这间隙很短可能只有几百毫秒///

func DownloadGoruntine(ctx, rpcCtx context.Context, wc *WaitChan,...) {
defer wc.Done()
idc := make(chan *Data, 0)
defer close(idc)

go DownloadImage(ctx, rpcCtx, idc, ...)

// 超时退出,避免因下载过慢导致redis互斥锁已经解锁而产生数据不一致
select {
case <-ctx.Done():
wlerr := constant.NewWlErrorf(...)
gincommon.SetWlError(ctx, wlerr)
wc.ch <- &Data{
ID: ...,
Data: nil,
Format: format,
WlError: wlerr,
}
return
case id := <-idc:
wc.ch <- id
return
}
}

其实不难发现我们完全没必要写一个for循环,当chain都不满足的时候,select会自动轮训直到有case满足,当超时后,ctx.Done()报错并会返回一个相关data为nil的结构以便程序后续的处理,同时通过创建一个无缓冲chain idc用于传递异步下载的数据至wc(这里用有缓冲的我理解下来问题也不大),最后程序下载完了对WaitGruop进行Done的操作。

这里其实忘记关闭通道了,虽然我查了说GC会自动处理,但好的代码习惯还是加上吧,最后记得defer close(idc)`

Java 的处理思路

不自觉地又会去思考这个场景在Java中该如何实现,即父线程通知子线程关闭,思路大致是:异步的下载任务通过起一个线程池完成,借助submit提交任务传递下载好的byte,当主线程超时退出时,直接对线程池进行shutdownnow的操作中断所有任务

当然这里的中断不能确保全部中断,其原理是把线程池的状态立刻变成STOP状态,并试图停止所有正在执行的线程,不再处理还在池队列中等待的任务,当然,它会返回那些未执行的任务。而试图终止线程的方法是通过调用Thread.interrupt()方法来实现的,但是大家知道,这种方法的作用有限,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。

Author: Apiao
Link: https://Apiao-1.github.io/2019/05/16/2019-05-16/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.