无缓冲通道引发的BUG
目录
最近在刚玩具级的 MapReduce
,其中充斥着大量的并发编程的代码,稍有不慎就可能导致 Map
或者 Reduce
的时候遭遇到未知的阻塞状态 (Blocking),导致整个系统无法按照计划
处理所有的 Task 并将输出的结果写入文件,经过一番 DEBUG ,发现竟是无缓冲通道的基本用法没有掌握熟练,当遇上多个 channel 协作的时候就难免会踩坑
先说重点:
无缓冲通道要求发送方和接收方同时准备好,才能"进行"发送和接收操作
show me the code
在 Task 的容错阶段我们要 re-execute 那些失败的任务,我的方法是将每个任务的执行结果保存在一个布尔数组里,下标对应每个任务,元素对应的是当前 Task 是否完成
for t, failed := range taskFailed {
if failed == true {
wg.Add(1)
addr := <-registerChan
go func(i int, workerAddr string, p jobPhase) {
res := call(workerAddr, "Worker.DoTask", DoTaskArgs{
JobName: jobName,
File: mapFiles[i],
Phase: p,
TaskNumber: i,
NumOtherPhase: n_other,
}, nil)
if res == true {
registerChan <- workerAddr
wg.Done()
}
}(t, addr, phase)
}
}
wg.Wait()
我们遍历了任务状态数组,找出那些失败的任务,让空闲的 worker
去执行,然后一直 WaitGroup 就行了
但仔细观察,联系刚才说到的无阻塞通道的双方都要准备好才能进行操作,那么言外之意,如果只是一方在进行操作,但是另一方根本没上线的情况下,那么这个操作将会阻塞 goroutine
看回代码的这两行
registerChan <- workerAddr
wg.Done()
注意这个时候未必有对应的 registerChan
接收者,因为我们只为失败的 Task
重新分配 Worker
,因此如果只有一个 Task 失败的话,wg.Wait()
将永远阻塞下去,因此我们应该调换一下顺序
wg.Done()
registerChan <- workerAddr
这样才不会让 WaitGroup 一直阻塞下去