redigo 对redis的订阅机制放在pubsub.go里面,
PubSubConn封装Conn以实现订阅者提供简便方法。Subscribe,PSubscribe,Unsubscribe和PUnsubscribe方法发送和刷新订阅。receive方法将推送的消息转换对应的类型
// Receive returns a pushed message as a Subscription, Message, Pong or error. // The return value is intended to be used directly in a type switch as // illustrated in the PubSubConn example func (c PubSubConn) Receive() interface{ } { return c.receiveInternal(c.Conn.Receive()) }
返回的是一个空接口类型 interface{},由于空接口没有方法,因此所有类型都实现了空接口,也就是说可以返回任意类型。
具体会返回哪些类型在receiveInternal()里面可以看到,
目前返回的三种Message、Subscription、Pong都定义在了pubsub.go 里面。
func (c PubSubConn) receiveInternal(replyArg interface{
}, errArg error) interface{
} {
reply, err := Values(replyArg, errArg)
if err != nil {
return err
}
var kind string
reply, err = Scan(reply, &kind)
if err != nil {
return err
}
switch kind {
case "message":
var m Message
if _, err := Scan(reply, &m.Channel, &m.Data); err != nil {
return err
}
return m
case "pmessage":
var m Message
if _, err := Scan(reply, &m.Pattern, &m.Channel, &m.Data); err != nil {
return err
}
return m
case "subscribe", "psubscribe", "unsubscribe", "punsubscribe":
s := Subscription{
Kind: kind}
if _, err := Scan(reply, &s.Channel, &s.Count); err != nil {
return err
}
return s
case "pong":
var p Pong
if _, err := Scan(reply, &p.Data); err != nil {
return err
}
return p
}
r
订阅示例
示例代码用map存放回调函数
package main
import(
//"github.com/go-redis/redis"
"fmt"
"time"
//"reflect"
"unsafe"
"github.com/gomodule/redigo/redis"
log "github.com/astaxie/beego/logs"
)
type SubscribeCallback func (channel, message string)
type Subscriber struct {
client redis.PubSubConn
cbMap map[string]SubscribeCallback
}
func (c *Subscriber) Connect(ip string, port uint16) {
conn, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
log.Critical("redis dial failed.")
}
c.client = redis.PubSubConn{
conn}
c.cbMap = make(map[string]SubscribeCallback)
go func() {
for {
log.Debug("wait...")
switch res := c.client.Receive().(type) {
case redis.Message:
channel := (*string)(unsafe.Pointer(&res.Channel))
message := (*string)(unsafe.Pointer(&res.Data))
c.cbMap[*channel](*channel, *message)
case redis.Subscription:
fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
case error:
log.Error("error handle...")
continue
}
}
}()
}
func (c *Subscriber) Close() {
err := c.client.Close()
if err != nil{
log.Error("redis close error.")
}
}
func (c *Subscriber) Subscribe(channel interface{
}, cb SubscribeCallback) {
err := c.client.Subscribe(channel)
if err != nil{
log.Critical("redis Subscribe error.")
}
c.cbMap[channel.(string)] = cb
}
func TestCallback1(chann, msg string){
log.Debug("TestCallback1 channel : ", chann, " message : ", msg)
}
func TestCallback2(chann, msg string){
log.Debug("TestCallback2 channel : ", chann, " message : ", msg)
}
func TestCallback3(chann, msg string){
log.Debug("TestCallback3 channel : ", chann, " message : ", msg)
}
func main() {
log.Info("===========main start============")
var sub Subscriber
sub.Connect("127.0.0.1", 6397)
sub.Subscribe("test_chan1", TestCallback1)
sub.Subscribe("test_chan2", TestCallback2)
sub.Subscribe("test_chan3", TestCallback3)
for{
time.Sleep(1 * time.Second)
}
}
常见问题:
这地方如果用redis的连接池的话,要注意不要设置读取数据的超时时间,否则到了超时时间,就会断开连接了,报错如下:
2022/07/12 15:43:14 wait... test1: subscribe 1 2022/07/12 15:43:14 wait... [GIN] 2022/07/12 - 15:43:26 | 200 | 1.129741ms | 222.128.58.255 | GET " /gotest/pub" 2022/07/12 15:43:26 {test1 [104 101 108 108 111]} 2022/07/12 15:43:26 0xc00009e540 0xc00009e560 2022/07/12 15:43:26 wait... 2022/07/12 15:43:46 error handle... read tcp 10.10.2.8:57724->123.207.190.86:637 9: i/o timeout 2022/07/12 15:43:46 wait... 2022/07/12 15:43:46 error handle... read tcp 10.10.2.8:57724->123.207.190.86:637 9: use of closed network connection 2022/07/12 15:43:46 wait... 2022/07/12 15:43:46 error handle... read tcp 10.10.2.8:57724->123.207.190.86:637 9: use of closed network connection 2022/07/12 15:43:46 wait... 2022/07/12 15:43:46 error handle... read tcp 10.10.2.8:57724->123.207.190.86:637 9: use of closed network connection 2022/07/12 15:43:46 wait...
发布示例
发布直接使用默认的Conn来Send “Publish“ 命令即可.
redigo的管道的使用方法设计到三个函数,Do函数也是下面这三个函数的合并:
c.Send("SUBSCRIBE", "example") c.Flush() for { reply, err := c.Receive() if err != nil { return err } // process pushed message }
send()方法把命令写到输出缓冲区,Flush()把缓冲区的命令刷新到redis服务器,Receive()函数接收redis给予的响应,三个操作共同完成一套命令流程。
package main
import(
//"github.com/go-redis/redis"
"github.com/gomodule/redigo/redis"
log "github.com/astaxie/beego/logs"
)
func main() {
client, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
log.Critical("redis dial failed.")
}
defer client.Close()
_, err = client.Do("Publish", "test_chan1", "hello")
if err != nil {
log.Critical("redis Publish failed.")
}
_, err = client.Do("Publish", "test_chan2", "hello")
if err != nil {
log.Critical("redis Publish failed.")
}
_, err = client.Do("Publish", "test_chan3", "hello")
if err != nil {
log.Critical("redis Publish failed.")
}
}
PubSubConn
定义
type PubSubConn struct { Conn Conn}
提供的方法:
1.Close 关闭连接func (c PubSubConn) Close() error
2.PSubscribe PSubscribe发布func (c PubSubConn) PSubscribe(channel ...interface{}) error
3.PUnsubscribe 取消发布, 如果没有给定, 则取消所有func (c PubSubConn) PUnsubscribe(channel ...interface{}) error
4.Ping 指定的数据向服务器发送PING 调用此方法时,连接必须至少订阅一个通道或模式func (c PubSubConn) Ping(data string) error
5.Receive 获取消息func (c PubSubConn) Receive() interface{}
6.ReceiveWithTimeout 带有超时时间的获取消息函数func (c PubSubConn) ReceiveWithTimeout(timeout time.Duration) interface{}
7.Subscribe 订阅func (c PubSubConn) Subscribe(channel ...interface{}) error
8.Unsubscribe 取消订阅func (c PubSubConn) Unsubscribe(channel ...interface{}) error
- 本文固定链接: https://phpmianshi.com/?id=5130
- 转载请注明: admin 于 PHP面试网 发表
《本文》有 0 条评论