无缓冲通道引发的BUG

目录

最近在刚玩具级的 MapReduce ,其中充斥着大量的并发编程的代码,稍有不慎就可能导致 Map 或者 Reduce 的时候遭遇到未知的阻塞状态 (Blocking),导致整个系统无法按照计划 处理所有的 Task 并将输出的结果写入文件,经过一番 DEBUG ,发现竟是无缓冲通道的基本用法没有掌握熟练,当遇上多个 channel 协作的时候就难免会踩坑

先说重点:

无缓冲通道要求发送方和接收方同时准备好,才能"进行"发送和接收操作

show me the code

在 Task 的容错阶段我们要 re-execute 那些失败的任务,我的方法是将每个任务的执行结果保存在一个布尔数组里,下标对应每个任务,元素对应的是当前 Task 是否完成

for t, failed := range taskFailed {
    if failed == true {
        wg.Add(1)
        addr := <-registerChan
        go func(i int, workerAddr string, p jobPhase) {
            res := call(workerAddr, "Worker.DoTask", DoTaskArgs{
                JobName:       jobName,
                File:          mapFiles[i],
                Phase:         p,
                TaskNumber:    i,
                NumOtherPhase: n_other,
            }, nil)
            if res == true {
                registerChan <- workerAddr
                wg.Done()
            }
        }(t, addr, phase)
    }
}

wg.Wait()

我们遍历了任务状态数组,找出那些失败的任务,让空闲的 worker 去执行,然后一直 WaitGroup 就行了

但仔细观察,联系刚才说到的无阻塞通道的双方都要准备好才能进行操作,那么言外之意,如果只是一方在进行操作,但是另一方根本没上线的情况下,那么这个操作将会阻塞 goroutine

看回代码的这两行

registerChan <- workerAddr
wg.Done()

注意这个时候未必有对应的 registerChan 接收者,因为我们只为失败的 Task 重新分配 Worker,因此如果只有一个 Task 失败的话,wg.Wait() 将永远阻塞下去,因此我们应该调换一下顺序

wg.Done()
registerChan <- workerAddr

这样才不会让 WaitGroup 一直阻塞下去