1 channel使用不当引发的问题
示例1 重复关闭channel
func main(){ ci:=make(chan int) close(ci) close(ci)}
「编译不会提示错误,运行直接报错」panic: close of closed channel goroutine 1 [running]:
示例2 向已关闭的channel中发送消息
func main() { ci := make(chan int) close(ci) ci <- 1}
「编译不会提示错误,运行直接报错」panic: send on closed channel goroutine 1 [running]:
总结起来有两点:
1、重复关闭channel会引发panic,类似于c语言的重复free释放地址空间也会引发错误,所以在不明确channel是否已经关闭的情况下贸然进行channel关闭是件非常危险的事情。
2、向已关闭的channel中发送数据也会引发panic, 所以不明确channel是否已经关闭的情况下,向里面发送数据也是件非常危险的事情。
有同学会说,既然不正确操作channel会引发上述问题,那我写程序的时候都不关闭channel,不就避免了上述问题嘛。「理论上可行,实际是不行的」。channel是占资源的,在计算机中当资源不在使用的时候,要尽早规划给操作系统,让其他程序可以使用。
那有没有检测channel是否关闭的函数呢?不好意思,golang 官方没有提供检测方法。我们自己实现一个可以了吧。
func IsChannelClosed(ch <-chan int) bool { select { case <-ch: return true default: } return false}func main() { ci := make(chan int) fmt.Println("channel是否已经关闭:", IsChannelClosed(ci))}
输出:channel是否已经关闭: false
func main() { ci := make(chan int) close(ci) fmt.Println("channel是否已经关闭:", IsChannelClosed(ci))}
输出:channel是否已经关闭: true
我们重新在仔细思考一下,看看IsChannelClosed
存在什么问题:
问题1:这个函数只能检查非缓冲区channel,对于有缓存区的channel无能为力,见下面的验证程序
func main() { ci := make(chan int, 1) ci <- 1 fmt.Println("channel是否已经关闭:", IsChannelClosed(ci))}
输出:channel是否已经关闭: true
问题2:这个函数检查到channel没有关闭,并不代表channel真的没有关闭,因为在检查函数和业务逻辑发送数据并不是原子操作,像下面的示例,发送数据的时候还是会引发panic
func main() { ci := make(chan int) if !IsChannelClosed(ci){ time.Sleep(time.Duration(1000)) //模拟耗时操作 //这里发送的时候,ci可能已经被其他goroutine操作关闭 ci<-1 } ... go func(){ close(ci) }() ...}
channel关闭原则:
❝don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders ❞
即不要在receiver(接收端)关闭,也不要在有多个sender(发送端)的时候关闭。
2 正确的关闭channel方法
1中的IsChannelClosed
并不能真正检查channel是否关闭,那有没有真正可判断channel是否关闭的方法,有三种方法,一是通过defer+recover机制来判断,另一种是采用sync.Once,保证channel只关闭一次,最后一种与sync.Once类似,不过是采用sync.Mutex加锁实现。
方法1:defer+recover
func SafeCloseChannel(ch chan int) (justClosed bool) { defer func() { if recover() != nil { justClosed = false } }() close(ch) return true}
SafeCloseChannel
对于未关闭的ch, 执行close(ch)会正常关闭,对于已关闭的ch, 执行close(ch)会引发panic, 由defer捕获,recover返回非空,识别出ch是已经关闭。
方法2:sync.Once
type MyChannel struct{ C chan struct{} once sync.Once}func NewMyChannel() *MyChannel{ return &MyChannel{C:make(chan struct{})}}func (mc *MyChannel) SafeClose(){ mc.once.Do(func(){ close(mc.C) })}
方法3:sync.Mutex
type MyChannel2 struct{ C chan struct{} closed bool mutex sync.Mutex}func NewMyChannel2() *MyChannel2{ return &MyChannel2{C:make(chan struct{})}}func (mc *MyChannel2) SafeClose(){ mc.mutex.Lock() if !mc.closed{ close(mc.C) mc.closed=true } mc.mutex.Unlock()}func (mc *MyChannel2) IsClosed() bool{ mc.mutex.Lock() defer mc.mutex.Unlock() return mc.closed}
3 优雅关闭channel
2中关闭channel的方法虽然都是正确的,在生产环境是可用的,但并不是优雅的做法。下面介绍优雅关闭channel的方法,按照receiver(接受者)和sender(发送者)的数量关系,可以分成4种情况:
发送者:接收者=1:1
发送者:接收者=1:N
发送者:接收者=N:1
发送者:接收者=M:N
发送者:接收者=1:1 直接在发送端关闭
// 生产者:消费者=1:1func test11() { chanInt := make(chan int) wg := sync.WaitGroup{} wg.Add(2) //生产者1个 go func(ci chan int) { defer wg.Done() for i := 0; i < 10; i++ { chanInt <- i } //关闭channel close(chanInt) }(chanInt) //消费者1个 go func(ci chan int) { defer wg.Done() for v := range ci { fmt.Println(v) } }(chanInt) wg.Wait()}
发送者:接收者=1:N 也直接在发送端关闭
// 生产者:消费者=1:Nfunc test1N() { chanInt := make(chan int) wg := sync.WaitGroup{} wg.Add(3) //生产者1个 go func(ci chan int) { defer wg.Done() for i := 0; i < 10; i++ { ci <- i } //关闭channel close(ci) }(chanInt) //消费者2个 go func(ci chan int) { defer wg.Done() for v := range ci { fmt.Println("consumer 1, ", v) } }(chanInt) go func(ci chan int) { defer wg.Done() for v := range ci { fmt.Println("consumer 2, ", v) } }(chanInt) wg.Wait()}
发送者:接收者=N:1 不能在发送者中关闭,因为发送者有多个,一个思路是将关闭的操作从发送者处理逻辑内部提取到外面,放在一个单独的goroutine中去做,等待所有的发送者发送完成之后,在关闭的goroutine中进行关闭。
// 生产者:消费者=N:1func testN1() { chanInt := make(chan int) wg := sync.WaitGroup{} wgProducer := sync.WaitGroup{} wg.Add(4) //生产者2个 wgProducer.Add(2) //生产者1 go func(ci chan int) { defer wg.Done() defer wgProducer.Done() for i := 0; i < 10; i++ { ci <- i } }(chanInt) //生产者2 go func(ci chan int) { defer wg.Done() defer wgProducer.Done() for i := 10; i < 20; i++ { ci <- i } }(chanInt) //消费者1个 go func(ci chan int) { defer wg.Done() for v := range ci { fmt.Println(v) } }(chanInt) //关闭channel goroutine go func(ci chan int) { defer wg.Done() wgProducer.Wait() close(ci) }(chanInt) wg.Wait()}
发送者:接收者=M:N 与上述处理方法相同
// 生产者:消费者=M:Nfunc testMN() { chanInt := make(chan int) wg := sync.WaitGroup{} wgProducer := sync.WaitGroup{} //生产者2个 wgProducer.Add(2) wg.Add(5) //生产者1 go func(ci chan int) { defer wg.Done() defer wgProducer.Done() for i := 0; i < 10; i++ { ci <- i } }(chanInt) //生产者2 go func(ci chan int) { defer wg.Done() defer wgProducer.Done() for i := 10; i < 20; i++ { ci <- i } }(chanInt) //消费者1 for i := 0; i < 2; i++ { go func(ci chan int, id int) { defer wg.Done() for v := range ci { fmt.Printf("receive from consumer %d, %d\n", id, v) } }(chanInt, i) } //消费者2 go func() { defer wg.Done() wgProducer.Wait() close(chanInt) }() wg.Wait()}
上面的发送者:接收者为N:1和发送者:接收者为M:N两种情况讨论的都是发送者最终会执行完循环,主动退出goroutine的情况。对于某些情况下,发送者的goroutine是死循环不会退出的情况,优雅关闭channel方法分析如下:
发送者:接收者=N:1 发送者goroutine不退出
func TestN1NoExit() { ci := make(chan int, 10) exitCh := make(chan struct{}) const numSenders = 10 const MaxValue = 100 wg := sync.WaitGroup{} //senders for i := 0; i < numSenders; i++ { wg.Add(1) go func(index int) { defer wg.Done() for { ri := rand.Intn(MaxValue) select { case ci <- ri: fmt.Printf("put %d to channel from sender %d\n", ri, index) case <-exitCh: fmt.Printf("sender %d exit\n", index) return } } }(i) } //receivers wg.Add(1) go func() { defer wg.Done() for v := range ci { if v == MaxValue-1 { close(exitCh) fmt.Println("send exit signal to senders") return } fmt.Println(v) } }() wg.Wait()}
可以看到上面的处理中,并没有执行close(ci)操作,有同学会有疑问,上面的ci最后关闭了吗?「是关闭了,借东风(gc)来关闭的」。当一个channel没有被引用的时候,它会被gc回收,那channel什么时候是没有被引用呢,即没有发送者goroutine和接收者goroutine与之关联的时候。上面的代码中,执行完close(exitCh)之后,接着者直接return了, 这时channel ci的接着者goroutine已退出,然后发送者在select的时候,可能会选择case ci <- ri
(这个时候ci还未满),不管它被选择多少次,最终肯定会选择case <-exitCh
(ci满的时候),当选择到case <-exitCh
的时候,发送者goroutine也退出了,当所有的发送者goroutine都退出的时候,channel ci就处于未被引用的状态,它会gc回收,也就不用关闭了。所以上面做法很巧妙的借助goroutine退出达到关闭的目的,通过引入一个exitCh channel,从接收者来close(exitCh),因为这种情况下接收者为1,发送者为N。这种方法可以概括为在接收方close 引入的中间channel间接实现关闭真正channel。
发送者:接收者=M:N 发送者goroutine不退出
func TestMNNoExit() { ci := make(chan int, 10) exitCh := make(chan struct{}) toStop := make(chan struct{}, 1) const numSenders = 10 const numReceivers = 5 const MaxValue = 100 wg := sync.WaitGroup{} //senders for i := 0; i < numSenders; i++ { wg.Add(1) go func(index int) { defer wg.Done() for { ri := rand.Intn(MaxValue) if ri == 0 { select { case toStop <- struct{}{}: default: } return } select { case ci <- ri: fmt.Printf("put %d to channel from sender %d\n", ri, index) case <-exitCh: fmt.Printf("sender %d exit\n", index) return } } }(i) } //receivers for i := 0; i < numReceivers; i++ { wg.Add(1) go func() { defer wg.Done() for { select { case v := <-ci: if v == MaxValue-1 { select { case toStop <- struct{}{}: fmt.Println("send exit signal to senders") default: } return } fmt.Println(v) case <-exitCh: return } } }() } wg.Add(1) go func() { defer wg.Done() <-toStop close(exitCh) }() wg.Wait()}
对于M:N的场景,就不能在receivers中直接close(exitCh)中进行了,因为存在多个receiver,会有重复关闭的情况。那怎么处理呢?找一个代理角色(新开一个goroutine),在代理角色中执行close(exitCh), 这个时候代理角色只要一个,所以不存在重复关闭的exitCh的情况。那还有一个问题要解决,什么时候执行close(exitCh)呢?我们可以在接收者中或发送者中通知代理角色,接收者、发送者和代理角色本质都是goroutine,那通知方式就是goroutine之间的通信方式,在golang用channel就可以达到次目的,所以要新申请一个channel,来通知代理角色执行close(exitCh), 新申请的channel就是上面代码中的toStop,注意,这里的「toStop要申请为带缓冲区的」,读者可以想一想,如果申请成非缓冲区会有什么问题。不申请带缓冲区的,第一个发生的关闭channel请求可能会丢失。
《本文》有 0 条评论