首页 > golang > golang中redis对redigo的发布订阅机制的使用
2022
01-11

golang中redis对redigo的发布订阅机制的使用

redigo 对redis的订阅机制放在pubsub.go里面,

PubSubConn封装Conn以实现订阅者提供简便方法。Subscribe,PSubscribe,Unsubscribe和PUnsubscribe方法发送和刷新订阅。receive方法将推送的消息转换对应的类型

// Receive returns a pushed message as a Subscription, Message, Pong or error.
// The return value is intended to be used directly in a type switch as
// illustrated in the PubSubConn example

func (c PubSubConn) Receive() interface{
    } {
    return c.receiveInternal(c.Conn.Receive())
}

返回的是一个空接口类型 interface{},由于空接口没有方法,因此所有类型都实现了空接口,也就是说可以返回任意类型。

具体会返回哪些类型在receiveInternal()里面可以看到,
目前返回的三种Message、Subscription、Pong都定义在了pubsub.go 里面。

  1. func (c PubSubConn) receiveInternal(replyArg interface{

  2.    }, errArg error) interface{

  3.    } {


  4.    reply, err := Values(replyArg, errArg)

  5.    if err != nil {


  6.        return err

  7.    }


  8.    var kind string

  9.    reply, err = Scan(reply, &kind)

  10.    if err != nil {


  11.        return err

  12.    }


  13.    switch kind {


  14.    case "message":

  15.        var m Message

  16.        if _, err := Scan(reply, &m.Channel, &m.Data); err != nil {


  17.            return err

  18.        }

  19.        return m

  20.    case "pmessage":

  21.        var m Message

  22.        if _, err := Scan(reply, &m.Pattern, &m.Channel, &m.Data); err != nil {


  23.            return err

  24.        }

  25.        return m

  26.    case "subscribe", "psubscribe", "unsubscribe", "punsubscribe":

  27.        s := Subscription{

  28.    Kind: kind}

  29.        if _, err := Scan(reply, &s.Channel, &s.Count); err != nil {


  30.            return err

  31.        }

  32.        return s

  33.    case "pong":

  34.        var p Pong

  35.        if _, err := Scan(reply, &p.Data); err != nil {


  36.            return err

  37.        }

  38.        return p

  39.    }

  40.    r

订阅示例

示例代码用map存放回调函数

  1. package main


  2. import(

  3.    //"github.com/go-redis/redis"

  4.  "fmt"

  5.  "time"

  6.  //"reflect"

  7.  "unsafe"

  8.    "github.com/gomodule/redigo/redis"

  9.    log "github.com/astaxie/beego/logs"

  10. )


  11. type SubscribeCallback func (channel, message string)


  12. type Subscriber struct {


  13.  client redis.PubSubConn

  14.  cbMap map[string]SubscribeCallback

  15. }


  16. func (c *Subscriber) Connect(ip string, port uint16) {


  17.  conn, err := redis.Dial("tcp", "127.0.0.1:6379")

  18.  if err != nil {


  19.      log.Critical("redis dial failed.")

  20.  }


  21.  c.client = redis.PubSubConn{

  22.    conn}

  23.  c.cbMap = make(map[string]SubscribeCallback)


  24.  go func() {


  25.    for {


  26.        log.Debug("wait...")

  27.        switch res := c.client.Receive().(type) {


  28.          case redis.Message:

  29.              channel := (*string)(unsafe.Pointer(&res.Channel))

  30.              message := (*string)(unsafe.Pointer(&res.Data))

  31.              c.cbMap[*channel](*channel, *message)

  32.          case redis.Subscription:

  33.              fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)

  34.          case error:

  35.            log.Error("error handle...")

  36.            continue

  37.        }

  38.    }

  39.  }()


  40. }


  41. func (c *Subscriber) Close() {


  42.  err := c.client.Close()

  43.  if err != nil{


  44.    log.Error("redis close error.")

  45.  }

  46. }


  47. func (c *Subscriber) Subscribe(channel interface{

  48.    }, cb SubscribeCallback) {


  49.  err := c.client.Subscribe(channel)

  50.  if err != nil{


  51.    log.Critical("redis Subscribe error.")

  52.  }


  53.  c.cbMap[channel.(string)] = cb

  54. }


  55. func TestCallback1(chann, msg string){


  56.  log.Debug("TestCallback1 channel : ", chann, " message : ", msg)

  57. }


  58. func TestCallback2(chann, msg string){


  59.  log.Debug("TestCallback2 channel : ", chann, " message : ", msg)

  60. }


  61. func TestCallback3(chann, msg string){


  62.  log.Debug("TestCallback3 channel : ", chann, " message : ", msg)

  63. }


  64. func main() {



  65.  log.Info("===========main start============")


  66.  var sub Subscriber

  67.  sub.Connect("127.0.0.1", 6397)

  68.  sub.Subscribe("test_chan1", TestCallback1)

  69.  sub.Subscribe("test_chan2", TestCallback2)

  70.  sub.Subscribe("test_chan3", TestCallback3)


  71.  for{


  72.   time.Sleep(1 * time.Second)

  73.  }

  74. }

Alt text

常见问题:

这地方如果用redis的连接池的话,要注意不要设置读取数据的超时时间,否则到了超时时间,就会断开连接了,报错如下:

2022/07/12 15:43:14 wait...
test1: subscribe 1
2022/07/12 15:43:14 wait...
[GIN] 2022/07/12 - 15:43:26 | 200 |    1.129741ms |  222.128.58.255 | GET      "
/gotest/pub"
2022/07/12 15:43:26 {test1  [104 101 108 108 111]}
2022/07/12 15:43:26 0xc00009e540 0xc00009e560
2022/07/12 15:43:26 wait...
2022/07/12 15:43:46 error handle... read tcp 10.10.2.8:57724->123.207.190.86:637
9: i/o timeout
2022/07/12 15:43:46 wait...
2022/07/12 15:43:46 error handle... read tcp 10.10.2.8:57724->123.207.190.86:637
9: use of closed network connection
2022/07/12 15:43:46 wait...
2022/07/12 15:43:46 error handle... read tcp 10.10.2.8:57724->123.207.190.86:637
9: use of closed network connection
2022/07/12 15:43:46 wait...
2022/07/12 15:43:46 error handle... read tcp 10.10.2.8:57724->123.207.190.86:637
9: use of closed network connection
2022/07/12 15:43:46 wait...


发布示例

发布直接使用默认的Conn来Send “Publish“ 命令即可.
redigo的管道的使用方法设计到三个函数,Do函数也是下面这三个函数的合并:

c.Send("SUBSCRIBE", "example")
c.Flush()
for {
    reply, err := c.Receive()
    if err != nil {
        return err
    }
    // process pushed message
}

send()方法把命令写到输出缓冲区,Flush()把缓冲区的命令刷新到redis服务器,Receive()函数接收redis给予的响应,三个操作共同完成一套命令流程。

  1. package main


  2. import(

  3.  //"github.com/go-redis/redis"

  4.  "github.com/gomodule/redigo/redis"

  5.  log "github.com/astaxie/beego/logs"

  6. )


  7. func main() {



  8.  client, err := redis.Dial("tcp", "127.0.0.1:6379")

  9.  if err != nil {


  10.      log.Critical("redis dial failed.")

  11.  }

  12.  defer client.Close()


  13.  _, err = client.Do("Publish", "test_chan1", "hello")

  14.  if err != nil {


  15.    log.Critical("redis Publish failed.")

  16.  }


  17.  _, err = client.Do("Publish", "test_chan2", "hello")

  18.  if err != nil {


  19.    log.Critical("redis Publish failed.")

  20.  }


  21.  _, err = client.Do("Publish", "test_chan3", "hello")

  22.  if err != nil {


  23.    log.Critical("redis Publish failed.")

  24.  }


  25. }

Alt text


PubSubConn

定义

type PubSubConn struct {    Conn Conn}

提供的方法:

1.Close 关闭连接
func (c PubSubConn) Close() error

2.PSubscribe PSubscribe发布
func (c PubSubConn) PSubscribe(channel ...interface{}) error

3.PUnsubscribe 取消发布, 如果没有给定, 则取消所有
func (c PubSubConn) PUnsubscribe(channel ...interface{}) error

4.Ping 指定的数据向服务器发送PING 调用此方法时,连接必须至少订阅一个通道或模式
func (c PubSubConn) Ping(data string) error

5.Receive 获取消息
func (c PubSubConn) Receive() interface{}

6.ReceiveWithTimeout 带有超时时间的获取消息函数
func (c PubSubConn) ReceiveWithTimeout(timeout time.Duration) interface{}

7.Subscribe 订阅
func (c PubSubConn) Subscribe(channel ...interface{}) error

8.Unsubscribe 取消订阅
func (c PubSubConn) Unsubscribe(channel ...interface{}) error


本文》有 0 条评论

留下一个回复