首页 > golang > 优雅关闭channel
2023
03-01

优雅关闭channel

1 channel使用不当引发的问题

示例1 重复关闭channel

func main(){
 ci:=make(chan int)

 close(ci)
 close(ci)}

「编译不会提示错误,运行直接报错」panic: close of closed channel goroutine 1 [running]:

示例2 向已关闭的channel中发送消息

func main() {
 ci := make(chan int)
 close(ci)
 ci <- 1}

「编译不会提示错误,运行直接报错」panic: send on closed channel goroutine 1 [running]:

总结起来有两点:

1、重复关闭channel会引发panic,类似于c语言的重复free释放地址空间也会引发错误,所以在不明确channel是否已经关闭的情况下贸然进行channel关闭是件非常危险的事情。

2、向已关闭的channel中发送数据也会引发panic, 所以不明确channel是否已经关闭的情况下,向里面发送数据也是件非常危险的事情。

有同学会说,既然不正确操作channel会引发上述问题,那我写程序的时候都不关闭channel,不就避免了上述问题嘛。「理论上可行,实际是不行的」。channel是占资源的,在计算机中当资源不在使用的时候,要尽早规划给操作系统,让其他程序可以使用。

那有没有检测channel是否关闭的函数呢?不好意思,golang 官方没有提供检测方法。我们自己实现一个可以了吧。

func IsChannelClosed(ch <-chan int) bool {
 select {
 case <-ch:
  return true
 default:
 }

 return false}func main() {
 ci := make(chan int)
 fmt.Println("channel是否已经关闭:", IsChannelClosed(ci))}

输出:channel是否已经关闭: false

func main() {
 ci := make(chan int)
 close(ci)
 fmt.Println("channel是否已经关闭:", IsChannelClosed(ci))}

输出:channel是否已经关闭: true

我们重新在仔细思考一下,看看IsChannelClosed存在什么问题:

问题1:这个函数只能检查非缓冲区channel,对于有缓存区的channel无能为力,见下面的验证程序

func main() {
 ci := make(chan int, 1)
 ci <- 1
 fmt.Println("channel是否已经关闭:", IsChannelClosed(ci))}

输出:channel是否已经关闭: true

问题2:这个函数检查到channel没有关闭,并不代表channel真的没有关闭,因为在检查函数和业务逻辑发送数据并不是原子操作,像下面的示例,发送数据的时候还是会引发panic

func main() {
 ci := make(chan int)
 
 if !IsChannelClosed(ci){
  time.Sleep(time.Duration(1000)) //模拟耗时操作
  //这里发送的时候,ci可能已经被其他goroutine操作关闭
  ci<-1
 }
 
 ...
 go func(){
  close(ci)
 }()
 ...}

channel关闭原则:

❝don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders ❞

即不要在receiver(接收端)关闭,也不要在有多个sender(发送端)的时候关闭。

2 正确的关闭channel方法

1中的IsChannelClosed并不能真正检查channel是否关闭,那有没有真正可判断channel是否关闭的方法,有三种方法,一是通过defer+recover机制来判断,另一种是采用sync.Once,保证channel只关闭一次,最后一种与sync.Once类似,不过是采用sync.Mutex加锁实现。

方法1:defer+recover

func SafeCloseChannel(ch chan int) (justClosed bool) {
 defer func() {
  if recover() != nil {
   justClosed = false
  }
 }()

 close(ch)
 return true}

SafeCloseChannel对于未关闭的ch, 执行close(ch)会正常关闭,对于已关闭的ch, 执行close(ch)会引发panic, 由defer捕获,recover返回非空,识别出ch是已经关闭。

方法2:sync.Once

type MyChannel struct{
   C chan struct{}
   once sync.Once}func NewMyChannel() *MyChannel{
   return &MyChannel{C:make(chan struct{})}}func (mc *MyChannel) SafeClose(){
   mc.once.Do(func(){
      close(mc.C)
   })}

方法3:sync.Mutex

type MyChannel2 struct{
   C chan struct{}
   closed bool
   mutex sync.Mutex}func NewMyChannel2() *MyChannel2{
   return &MyChannel2{C:make(chan struct{})}}func (mc *MyChannel2) SafeClose(){
   mc.mutex.Lock()
   if !mc.closed{
      close(mc.C)
      mc.closed=true
   }
   
   mc.mutex.Unlock()}func (mc *MyChannel2) IsClosed() bool{
   mc.mutex.Lock()
   defer mc.mutex.Unlock()
   
   return mc.closed}

3 优雅关闭channel

2中关闭channel的方法虽然都是正确的,在生产环境是可用的,但并不是优雅的做法。下面介绍优雅关闭channel的方法,按照receiver(接受者)和sender(发送者)的数量关系,可以分成4种情况:

发送者:接收者=1:1

发送者:接收者=1:N

发送者:接收者=N:1

发送者:接收者=M:N

发送者:接收者=1:1 直接在发送端关闭

// 生产者:消费者=1:1func test11() {
 chanInt := make(chan int)

 wg := sync.WaitGroup{}
 wg.Add(2)

 //生产者1个
 go func(ci chan int) {
  defer wg.Done()

  for i := 0; i < 10; i++ {
   chanInt <- i  }
  //关闭channel
  close(chanInt)

 }(chanInt)

 //消费者1个
 go func(ci chan int) {
  defer wg.Done()

  for v := range ci {
   fmt.Println(v)
  }
 }(chanInt)

 wg.Wait()}

发送者:接收者=1:N 也直接在发送端关闭

// 生产者:消费者=1:Nfunc test1N() {
 chanInt := make(chan int)
 wg := sync.WaitGroup{}

 wg.Add(3)

 //生产者1个
 go func(ci chan int) {
  defer wg.Done()

  for i := 0; i < 10; i++ {
   ci <- i  }

  //关闭channel
  close(ci)

 }(chanInt)

 //消费者2个
 go func(ci chan int) {
  defer wg.Done()

  for v := range ci {
   fmt.Println("consumer 1, ", v)
  }
 }(chanInt)

 go func(ci chan int) {
  defer wg.Done()

  for v := range ci {
   fmt.Println("consumer 2, ", v)
  }

 }(chanInt)

 wg.Wait()}

发送者:接收者=N:1 不能在发送者中关闭,因为发送者有多个,一个思路是将关闭的操作从发送者处理逻辑内部提取到外面,放在一个单独的goroutine中去做,等待所有的发送者发送完成之后,在关闭的goroutine中进行关闭。

// 生产者:消费者=N:1func testN1() {
 chanInt := make(chan int)
 wg := sync.WaitGroup{}

 wgProducer := sync.WaitGroup{}

 wg.Add(4)

 //生产者2个
 wgProducer.Add(2)
 
 //生产者1
 go func(ci chan int) {
  defer wg.Done()
  defer wgProducer.Done()

  for i := 0; i < 10; i++ {
   ci <- i  }
 }(chanInt)
 
 //生产者2
 go func(ci chan int) {
  defer wg.Done()
  defer wgProducer.Done()

  for i := 10; i < 20; i++ {
   ci <- i  }
 }(chanInt)

 //消费者1个
 go func(ci chan int) {
  defer wg.Done()

  for v := range ci {
   fmt.Println(v)
  }
 }(chanInt)

 //关闭channel goroutine
 go func(ci chan int) {
  defer wg.Done()

  wgProducer.Wait()
  close(ci)
 }(chanInt)

 wg.Wait()}

发送者:接收者=M:N 与上述处理方法相同

// 生产者:消费者=M:Nfunc testMN() {
 chanInt := make(chan int)
 wg := sync.WaitGroup{}
 wgProducer := sync.WaitGroup{}
 //生产者2个
 wgProducer.Add(2)
 wg.Add(5)
 
 //生产者1
 go func(ci chan int) {
  defer wg.Done()
  defer wgProducer.Done()

  for i := 0; i < 10; i++ {
   ci <- i  }
 }(chanInt)
 //生产者2
 go func(ci chan int) {
  defer wg.Done()
  defer wgProducer.Done()

  for i := 10; i < 20; i++ {
   ci <- i  }
 }(chanInt)

 //消费者1
 for i := 0; i < 2; i++ {
  go func(ci chan int, id int) {
   defer wg.Done()

   for v := range ci {
    fmt.Printf("receive from consumer %d, %d\n", id, v)
   }
  }(chanInt, i)
 }
 //消费者2
 go func() {
  defer wg.Done()
  wgProducer.Wait()
  close(chanInt)
 }()

 wg.Wait()}

上面的发送者:接收者为N:1和发送者:接收者为M:N两种情况讨论的都是发送者最终会执行完循环,主动退出goroutine的情况。对于某些情况下,发送者的goroutine是死循环不会退出的情况,优雅关闭channel方法分析如下:

发送者:接收者=N:1 发送者goroutine不退出

func TestN1NoExit() {
 ci := make(chan int, 10)
 exitCh := make(chan struct{})
 const numSenders = 10
 const MaxValue = 100
 wg := sync.WaitGroup{}

 //senders
 for i := 0; i < numSenders; i++ {
  wg.Add(1)
  go func(index int) {
   defer wg.Done()
   for {
    ri := rand.Intn(MaxValue)
    select {
    case ci <- ri:
     fmt.Printf("put %d to channel from sender %d\n", ri, index)
    case <-exitCh:
     fmt.Printf("sender %d exit\n", index)
     return
    }
   }
  }(i)
 }

 //receivers
 wg.Add(1)
 go func() {
  defer wg.Done()
  for v := range ci {
   if v == MaxValue-1 {
    close(exitCh)
    fmt.Println("send exit signal to senders")
    return
   }
   fmt.Println(v)
  }
 }()

 wg.Wait()}

可以看到上面的处理中,并没有执行close(ci)操作,有同学会有疑问,上面的ci最后关闭了吗?「是关闭了,借东风(gc)来关闭的」。当一个channel没有被引用的时候,它会被gc回收,那channel什么时候是没有被引用呢,即没有发送者goroutine和接收者goroutine与之关联的时候。上面的代码中,执行完close(exitCh)之后,接着者直接return了, 这时channel ci的接着者goroutine已退出,然后发送者在select的时候,可能会选择case ci <- ri(这个时候ci还未满),不管它被选择多少次,最终肯定会选择case <-exitCh(ci满的时候),当选择到case <-exitCh的时候,发送者goroutine也退出了,当所有的发送者goroutine都退出的时候,channel ci就处于未被引用的状态,它会gc回收,也就不用关闭了。所以上面做法很巧妙的借助goroutine退出达到关闭的目的,通过引入一个exitCh channel,从接收者来close(exitCh),因为这种情况下接收者为1,发送者为N。这种方法可以概括为在接收方close 引入的中间channel间接实现关闭真正channel。

发送者:接收者=M:N 发送者goroutine不退出

func TestMNNoExit() {
 ci := make(chan int, 10)
 exitCh := make(chan struct{})
 toStop := make(chan struct{}, 1)
 const numSenders = 10
 const numReceivers = 5
 const MaxValue = 100
 wg := sync.WaitGroup{}

 //senders
 for i := 0; i < numSenders; i++ {
  wg.Add(1)
  go func(index int) {
   defer wg.Done()
   for {
    ri := rand.Intn(MaxValue)
    if ri == 0 {
     select {
     case toStop <- struct{}{}:
     default:
     }
     return
    }
    select {
    case ci <- ri:
     fmt.Printf("put %d to channel from sender %d\n", ri, index)
    case <-exitCh:
     fmt.Printf("sender %d exit\n", index)
     return
    }
   }
  }(i)
 }

 //receivers
 for i := 0; i < numReceivers; i++ {
  wg.Add(1)
  go func() {
   defer wg.Done()
   for {
    select {
    case v := <-ci:
     if v == MaxValue-1 {
      select {
      case toStop <- struct{}{}:
       fmt.Println("send exit signal to senders")
      default:
      }
      return
     }
     fmt.Println(v)
    case <-exitCh:
     return
    }
   }
  }()
 }

 wg.Add(1)
 go func() {
  defer wg.Done()

  <-toStop  close(exitCh)
 }()

 wg.Wait()}

对于M:N的场景,就不能在receivers中直接close(exitCh)中进行了,因为存在多个receiver,会有重复关闭的情况。那怎么处理呢?找一个代理角色(新开一个goroutine),在代理角色中执行close(exitCh), 这个时候代理角色只要一个,所以不存在重复关闭的exitCh的情况。那还有一个问题要解决,什么时候执行close(exitCh)呢?我们可以在接收者中或发送者中通知代理角色,接收者、发送者和代理角色本质都是goroutine,那通知方式就是goroutine之间的通信方式,在golang用channel就可以达到次目的,所以要新申请一个channel,来通知代理角色执行close(exitCh), 新申请的channel就是上面代码中的toStop,注意,这里的「toStop要申请为带缓冲区的」,读者可以想一想,如果申请成非缓冲区会有什么问题。不申请带缓冲区的,第一个发生的关闭channel请求可能会丢失。


本文》有 0 条评论

留下一个回复